Input Generation Stub is executed as below and the input is send to nc,. This Stub is used for generating Input for this Spark Streaming Application
Program starts in Main()
Please note the YAML files(given below) should not have any TAB characters. Configure your editor to replace TAB by Spaces (Editors like VI, Eclipse have options to configure this setting)
python3.5 ./a21_stub.py inputfile=./c15_input2.yaml
Press ENTER when Ready...
[ 0]
[.9] => data=lion || secondscounter=0.9
[ 1]
[.8] => data=lion || secondscounter=1.8
[ 2]
[.7] => data=lion || secondscounter=2.7
[ 3]
[.6] => data=lion || secondscounter=3.6
[ 4]
[.5] => data=lion || secondscounter=4.5
[ 5]
[.4] => data=lion || secondscounter=5.4
[ 6]
[.4] => data=lion || secondscounter=6.4
[ 7]
[.2] => data=lion || secondscounter=7.2
[ 8]
[.1] => data=lion || secondscounter=8.1
Program starts in Main()
'''================================================================== Thread that sends data to 'nc' Reference : http://www.techbeamers.com/python-multithreading-concepts/ https://docs.python.org/3/library/queue.html http://stackoverflow.com/questions/11829982/piping-data-from-python-to-an-external-command ==================================================================''' # a11 > a21_stub.py import sys import threading import time import yaml import logging.config from a22_Formatter import Formatter def Get_arguments(): log = logging.getLogger(__name__) argDny = {} for arg in sys.argv[1:]: log.debug("Arg -> %s", arg) key, value = arg.split("=") argDny[key] = value return argDny def Load_input_file(argDny): with open(argDny['inputfile'], 'rt') as fin: text = fin.read() inputDny = yaml.load(text) return inputDny def Load_meta_info(inputList): metaDny = inputList[0] return metaDny def Load_data(inputList): dataLst = inputList[1:] return dataLst def Setup_logger(): with open("./z11_logging.yaml", "rt") as fin: config = yaml.load(fin.read()) logging.root.handlers = [] logging.config.dictConfig(config) log = logging.getLogger(__name__) def Main(): Setup_logger() log = logging.getLogger(__name__) # Get command line arguments argDny = Get_arguments() log.debug("argDny -> %s", argDny) # Load input file into Dictionary inputList = Load_input_file(argDny) log.debug("inputDny -> %s", inputList) # Get Meta data from inputList metaDny = Load_meta_info(inputList) log.debug("metaDny -> %s", metaDny) # Get Data List dataLst = Load_data(inputList) log.debug("dataList -> %s", dataLst) if metaDny['type'] == "simple": #thrd = MinFormatter(metaDny, dataLst) thrd= Formatter(metaDny, dataLst) log.info("Press ENTER when Ready...") input("") thrd.start() thrd.Place_data_in_queue() thrd.join() Main()
# a11 > a22_Formatter.py from datetime import datetime, timedelta import logging.config import time import threading import queue from math import floor, modf import sys import subprocess import re '''=================================================================== Formatter - Generates dummy input to be sent to netcat (nc) ===================================================================''' class Formatter(threading.Thread): '''=============================================================== __init__() ===============================================================''' def __init__(self, metaDny, dataLst): threading.Thread.__init__(self) self.log = logging.getLogger(__name__) self.fifoQ = queue.Queue() self.metaDny = metaDny self.dataLst = dataLst self.counter = 0 self.diffInSec = 0 self.decimalSeconds = 0 self.firstDtime = datetime.now() self.curDtime = datetime.now() self.process = self.Connect_to_nc() '''=============================================================== Connect_to_nc() ===============================================================''' def Connect_to_nc(self): ncCommand = "nc -lk 9999" splitted = re.split(" ", ncCommand) process = subprocess.Popen(splitted, stdin=subprocess.PIPE, \ stdout=subprocess.PIPE, \ stderr=subprocess.PIPE, \ universal_newlines=True) return process '''=============================================================== Increment_counter() ===============================================================''' def Increment_counter(self, delta): for no in range(delta): self.counter += 1 self.Print_counter() '''=============================================================== Print_counter() ===============================================================''' def Print_counter(self): minNsec = self.curDtime.strftime("[%M][%S]") '''if self.counter == 0: self.log.info("%s%s", "[min][sec]" + " " * 1, \ "[seconds-counter]") self.log.info("")''' '''self.log.info("%s : [%s]", minNsec, \ (" " * 3 + str(self.counter))[-3:])''' self.log.info("[%s]", (" " * 3 + str(self.counter))[-3:]) '''=============================================================== Print_data() ===============================================================''' def Print_data(self, data): self.log.debug("self.diffInSec -> %s", self.diffInSec) '''self.log.info("%s => %s", \ (" " * 16 + "[." + self.decimalSeconds + "]"),\ data)''' self.log.info("%s => %s", \ (" " * 5 + "[." + self.decimalSeconds + "]"),\ data) '''=============================================================== Get_element_fifoq() ===============================================================''' def Get_element_from_fifoq(self, size): dataList = [] for inc in range(size): data = self.fifoQ.get_nowait() dataList.append(data) return dataList '''=============================================================== Add_meta_info() ===============================================================''' def Add_meta_info(self, data): dataNmeta = [] dataNmeta.append("data=" + data) metaList = self.metaDny['addtodata'] for meta in metaList: self.log.debug("Meta -> %s", meta) if(meta == "minsec") : minsec = self.curDtime.\ strftime("[%M][%S.") minsec = minsec + str(self.decimalSeconds) + "]" dataNmeta.append("minsec=" + minsec) if(meta == "secondscounter"): dataNmeta.append("secondscounter=" + \ str(self.counter) + \ "." + str(self.decimalSeconds)) return dataNmeta '''=============================================================== Send_data_to_nc() ===============================================================''' def Send_data_to_nc(self, dataList): for data in dataList: dataNmeta = self.Add_meta_info(data) self.log.debug("dataNmeta -> %s", dataNmeta) formattedData = ' || '.join(dataNmeta) self.log.debug("formattedData -> %s", formattedData) self.Print_data(formattedData) self.process.stdin.write(formattedData + "\n") self.process.stdin.flush() '''=============================================================== Place_data_in_queue() ===============================================================''' def Place_data_in_queue(self): while True : for element in self.dataLst: delay = element['delay'] data = element['data'] #self.log.debug("element -> %s", element) #self.log.debug("data -> %s", data) #self.log.debug("delay -> %s", delay) if delay.endswith('s'): delayTime = delay[:-1] #self.log.debug("Delaying by : %s", delayTime) time.sleep(float(delayTime)) else: sys.exit(1) if data != 'None': self.fifoQ.put(data) if self.metaDny['mode'] == "singlepassmode": break '''=============================================================== run() ===============================================================''' def run(self): floored = 0 self.firstDtime = datetime.now() self.Print_counter() while True: time.sleep(0.1) self.curDtime = datetime.now() self.diffInSec = (self.curDtime - self.firstDtime).\ total_seconds() self.log.debug("diffInSec -> %s", self.diffInSec) #rounded = round(self.diffInSec, 1) #self.log.debug("rounded -> %s", rounded) #self.decimalSeconds = str(rounded).split(".")[1] self.decimalSeconds = str(self.diffInSec).\ split(".")[1][:1] self.log.debug("decimalSeconds -> %s", \ self.decimalSeconds) self.log.debug("self.diffInSec -> %s", self.diffInSec) floored = floor(self.diffInSec) self.log.debug("floored -> %s", floored) delta = floored - self.counter if delta >= 1: self.Increment_counter(delta) size = self.fifoQ.qsize() if(size > 0): dataList = self.Get_element_from_fifoq(size) self.Send_data_to_nc(dataList)
Please note the YAML files(given below) should not have any TAB characters. Configure your editor to replace TAB by Spaces (Editors like VI, Eclipse have options to configure this setting)
# a11 > z11_logging.yaml version : 1 formatters : simple : #format : "%(levelname)-7s || %(asctime)s || [ %(filename)s ] [ %(funcName)s() ] [ %(lineno)s ] || %(message)s" #format : "%(levelname)-7s || %(asctime)s || %(filename)s > %(funcName)s() > %(lineno)s ] || %(message)s" format : "%(message)s" handlers : console : class : logging.StreamHandler level : DEBUG formatter : simple stream : ext://sys.stdout file_handler : class : logging.handlers.RotatingFileHandler level : DEBUG formatter : simple maxBytes : 10485760 backupCount : 2 encoding : utf8 filename : purge.log root : level : INFO handlers : [console, file_handler]
# a11 > c15_input2.yaml - meta: type : simple # Can be minsec (or) secondscounter addtodata : - secondscounter # Can be singpassmode (or) loopmode mode : loopmode # 'None' is used to provide delay without sending data to the # 'nc' server - data : lion delay : 0.9s