Input Generation Stub is executed as below and the input is send to nc,. This Stub is used for generating Input for this Spark Streaming Application
Program starts in Main()
Please note the YAML files(given below) should not have any TAB characters. Configure your editor to replace TAB by Spaces (Editors like VI, Eclipse have options to configure this setting)
python3.5 ./a21_stub.py inputfile=./c15_input2.yaml
Press ENTER when Ready...
[ 0]
[.9] => data=lion || secondscounter=0.9
[ 1]
[.8] => data=lion || secondscounter=1.8
[ 2]
[.7] => data=lion || secondscounter=2.7
[ 3]
[.6] => data=lion || secondscounter=3.6
[ 4]
[.5] => data=lion || secondscounter=4.5
[ 5]
[.4] => data=lion || secondscounter=5.4
[ 6]
[.4] => data=lion || secondscounter=6.4
[ 7]
[.2] => data=lion || secondscounter=7.2
[ 8]
[.1] => data=lion || secondscounter=8.1
Program starts in Main()
'''==================================================================
Thread that sends data to 'nc'
Reference :
http://www.techbeamers.com/python-multithreading-concepts/
https://docs.python.org/3/library/queue.html
http://stackoverflow.com/questions/11829982/piping-data-from-python-to-an-external-command
=================================================================='''
# a11 > a21_stub.py
import sys
import threading
import time
import yaml
import logging.config
from a22_Formatter import Formatter
def Get_arguments():
log = logging.getLogger(__name__)
argDny = {}
for arg in sys.argv[1:]:
log.debug("Arg -> %s", arg)
key, value = arg.split("=")
argDny[key] = value
return argDny
def Load_input_file(argDny):
with open(argDny['inputfile'], 'rt') as fin:
text = fin.read()
inputDny = yaml.load(text)
return inputDny
def Load_meta_info(inputList):
metaDny = inputList[0]
return metaDny
def Load_data(inputList):
dataLst = inputList[1:]
return dataLst
def Setup_logger():
with open("./z11_logging.yaml", "rt") as fin:
config = yaml.load(fin.read())
logging.root.handlers = []
logging.config.dictConfig(config)
log = logging.getLogger(__name__)
def Main():
Setup_logger()
log = logging.getLogger(__name__)
# Get command line arguments
argDny = Get_arguments()
log.debug("argDny -> %s", argDny)
# Load input file into Dictionary
inputList = Load_input_file(argDny)
log.debug("inputDny -> %s", inputList)
# Get Meta data from inputList
metaDny = Load_meta_info(inputList)
log.debug("metaDny -> %s", metaDny)
# Get Data List
dataLst = Load_data(inputList)
log.debug("dataList -> %s", dataLst)
if metaDny['type'] == "simple":
#thrd = MinFormatter(metaDny, dataLst)
thrd= Formatter(metaDny, dataLst)
log.info("Press ENTER when Ready...")
input("")
thrd.start()
thrd.Place_data_in_queue()
thrd.join()
Main()
# a11 > a22_Formatter.py
from datetime import datetime, timedelta
import logging.config
import time
import threading
import queue
from math import floor, modf
import sys
import subprocess
import re
'''===================================================================
Formatter
- Generates dummy input to be sent to netcat (nc)
==================================================================='''
class Formatter(threading.Thread):
'''===============================================================
__init__()
==============================================================='''
def __init__(self, metaDny, dataLst):
threading.Thread.__init__(self)
self.log = logging.getLogger(__name__)
self.fifoQ = queue.Queue()
self.metaDny = metaDny
self.dataLst = dataLst
self.counter = 0
self.diffInSec = 0
self.decimalSeconds = 0
self.firstDtime = datetime.now()
self.curDtime = datetime.now()
self.process = self.Connect_to_nc()
'''===============================================================
Connect_to_nc()
==============================================================='''
def Connect_to_nc(self):
ncCommand = "nc -lk 9999"
splitted = re.split(" ", ncCommand)
process = subprocess.Popen(splitted,
stdin=subprocess.PIPE, \
stdout=subprocess.PIPE, \
stderr=subprocess.PIPE, \
universal_newlines=True)
return process
'''===============================================================
Increment_counter()
==============================================================='''
def Increment_counter(self, delta):
for no in range(delta):
self.counter += 1
self.Print_counter()
'''===============================================================
Print_counter()
==============================================================='''
def Print_counter(self):
minNsec = self.curDtime.strftime("[%M][%S]")
'''if self.counter == 0:
self.log.info("%s%s", "[min][sec]" + " " * 1, \
"[seconds-counter]")
self.log.info("")'''
'''self.log.info("%s : [%s]", minNsec, \
(" " * 3 + str(self.counter))[-3:])'''
self.log.info("[%s]", (" " * 3 + str(self.counter))[-3:])
'''===============================================================
Print_data()
==============================================================='''
def Print_data(self, data):
self.log.debug("self.diffInSec -> %s", self.diffInSec)
'''self.log.info("%s => %s", \
(" " * 16 + "[." + self.decimalSeconds + "]"),\
data)'''
self.log.info("%s => %s", \
(" " * 5 + "[." + self.decimalSeconds + "]"),\
data)
'''===============================================================
Get_element_fifoq()
==============================================================='''
def Get_element_from_fifoq(self, size):
dataList = []
for inc in range(size):
data = self.fifoQ.get_nowait()
dataList.append(data)
return dataList
'''===============================================================
Add_meta_info()
==============================================================='''
def Add_meta_info(self, data):
dataNmeta = []
dataNmeta.append("data=" + data)
metaList = self.metaDny['addtodata']
for meta in metaList:
self.log.debug("Meta -> %s", meta)
if(meta == "minsec") :
minsec = self.curDtime.\
strftime("[%M][%S.")
minsec = minsec + str(self.decimalSeconds) + "]"
dataNmeta.append("minsec=" + minsec)
if(meta == "secondscounter"):
dataNmeta.append("secondscounter=" + \
str(self.counter) + \
"." + str(self.decimalSeconds))
return dataNmeta
'''===============================================================
Send_data_to_nc()
==============================================================='''
def Send_data_to_nc(self, dataList):
for data in dataList:
dataNmeta = self.Add_meta_info(data)
self.log.debug("dataNmeta -> %s", dataNmeta)
formattedData = ' || '.join(dataNmeta)
self.log.debug("formattedData -> %s", formattedData)
self.Print_data(formattedData)
self.process.stdin.write(formattedData + "\n")
self.process.stdin.flush()
'''===============================================================
Place_data_in_queue()
==============================================================='''
def Place_data_in_queue(self):
while True :
for element in self.dataLst:
delay = element['delay']
data = element['data']
#self.log.debug("element -> %s", element)
#self.log.debug("data -> %s", data)
#self.log.debug("delay -> %s", delay)
if delay.endswith('s'):
delayTime = delay[:-1]
#self.log.debug("Delaying by : %s", delayTime)
time.sleep(float(delayTime))
else:
sys.exit(1)
if data != 'None':
self.fifoQ.put(data)
if self.metaDny['mode'] == "singlepassmode":
break
'''===============================================================
run()
==============================================================='''
def run(self):
floored = 0
self.firstDtime = datetime.now()
self.Print_counter()
while True:
time.sleep(0.1)
self.curDtime = datetime.now()
self.diffInSec = (self.curDtime - self.firstDtime).\
total_seconds()
self.log.debug("diffInSec -> %s", self.diffInSec)
#rounded = round(self.diffInSec, 1)
#self.log.debug("rounded -> %s", rounded)
#self.decimalSeconds = str(rounded).split(".")[1]
self.decimalSeconds = str(self.diffInSec).\
split(".")[1][:1]
self.log.debug("decimalSeconds -> %s", \
self.decimalSeconds)
self.log.debug("self.diffInSec -> %s", self.diffInSec)
floored = floor(self.diffInSec)
self.log.debug("floored -> %s", floored)
delta = floored - self.counter
if delta >= 1:
self.Increment_counter(delta)
size = self.fifoQ.qsize()
if(size > 0):
dataList = self.Get_element_from_fifoq(size)
self.Send_data_to_nc(dataList)
Please note the YAML files(given below) should not have any TAB characters. Configure your editor to replace TAB by Spaces (Editors like VI, Eclipse have options to configure this setting)
# a11 > z11_logging.yaml
version : 1
formatters :
simple :
#format : "%(levelname)-7s || %(asctime)s || [ %(filename)s ] [ %(funcName)s() ] [ %(lineno)s ] || %(message)s"
#format : "%(levelname)-7s || %(asctime)s || %(filename)s > %(funcName)s() > %(lineno)s ] || %(message)s"
format : "%(message)s"
handlers :
console :
class : logging.StreamHandler
level : DEBUG
formatter : simple
stream : ext://sys.stdout
file_handler :
class : logging.handlers.RotatingFileHandler
level : DEBUG
formatter : simple
maxBytes : 10485760
backupCount : 2
encoding : utf8
filename : purge.log
root :
level : INFO
handlers : [console, file_handler]
# a11 > c15_input2.yaml
- meta:
type : simple
# Can be minsec (or) secondscounter
addtodata :
- secondscounter
# Can be singpassmode (or) loopmode
mode : loopmode
# 'None' is used to provide delay without sending data to the
# 'nc' server
- data : lion
delay : 0.9s