Saturday, April 30, 2016

Python Input Generation Stub for netcat (nc)

Input Generation Stub is executed as below and the  input is send to nc,. This Stub is used for generating Input for this Spark Streaming Application

python3.5 ./a21_stub.py inputfile=./c15_input2.yaml 
Press ENTER when Ready...
[  0]
     [.9] => data=lion || secondscounter=0.9
[  1]
     [.8] => data=lion || secondscounter=1.8
[  2]
     [.7] => data=lion || secondscounter=2.7
[  3]
     [.6] => data=lion || secondscounter=3.6
[  4]
     [.5] => data=lion || secondscounter=4.5
[  5]
     [.4] => data=lion || secondscounter=5.4
[  6]
     [.4] => data=lion || secondscounter=6.4
[  7]
     [.2] => data=lion || secondscounter=7.2
[  8]
     [.1] => data=lion || secondscounter=8.1

Program starts in Main()
'''==================================================================
Thread that sends data to 'nc'
Reference :
    http://www.techbeamers.com/python-multithreading-concepts/
    https://docs.python.org/3/library/queue.html
    http://stackoverflow.com/questions/11829982/piping-data-from-python-to-an-external-command
=================================================================='''
# a11 > a21_stub.py
import sys
import threading
import time
import yaml
import logging.config
from a22_Formatter import Formatter

def Get_arguments():
    log = logging.getLogger(__name__)
    argDny = {}
    for arg in sys.argv[1:]:
        log.debug("Arg -> %s", arg)
        key, value = arg.split("=")
        argDny[key] = value
    return argDny

def Load_input_file(argDny):
    with open(argDny['inputfile'], 'rt') as fin:
        text = fin.read()
    inputDny = yaml.load(text)
    return inputDny    
 
def Load_meta_info(inputList):
     metaDny = inputList[0]
     return metaDny
 
def Load_data(inputList):
     dataLst = inputList[1:]
     return dataLst
 
        
def Setup_logger():
    with open("./z11_logging.yaml", "rt") as fin:
        config = yaml.load(fin.read())
    logging.root.handlers = []
    logging.config.dictConfig(config)
    
    log = logging.getLogger(__name__)

def Main():    
    
    Setup_logger()

    log = logging.getLogger(__name__)
    
    # Get command line arguments
    argDny = Get_arguments()
    log.debug("argDny -> %s", argDny)
    
    # Load input file into Dictionary
    inputList = Load_input_file(argDny)
    log.debug("inputDny -> %s", inputList)
    
    # Get Meta data from inputList
    metaDny = Load_meta_info(inputList)
    log.debug("metaDny -> %s", metaDny)
    
    # Get Data List
    dataLst = Load_data(inputList)
    log.debug("dataList -> %s", dataLst)

    if metaDny['type'] == "simple":
        #thrd = MinFormatter(metaDny, dataLst)
        thrd= Formatter(metaDny, dataLst)
        log.info("Press ENTER when Ready...")
        input("")
        thrd.start()
        thrd.Place_data_in_queue()
        thrd.join()
            
Main()

# a11 > a22_Formatter.py
from datetime import datetime, timedelta
import logging.config
import time
import threading
import queue
from math import floor, modf
import sys
import subprocess
import re

'''===================================================================
Formatter
    -    Generates dummy input to be sent to netcat (nc)
==================================================================='''
class Formatter(threading.Thread):
    
    '''===============================================================
    __init__()
    ==============================================================='''
    def __init__(self, metaDny, dataLst):
        threading.Thread.__init__(self)
        self.log = logging.getLogger(__name__)
        self.fifoQ = queue.Queue()
        self.metaDny = metaDny
        self.dataLst = dataLst
        self.counter = 0
        self.diffInSec = 0
        self.decimalSeconds = 0
        self.firstDtime = datetime.now()
        self.curDtime = datetime.now()
        self.process = self.Connect_to_nc()

    '''===============================================================
    Connect_to_nc()
    ==============================================================='''
    def Connect_to_nc(self):
        ncCommand = "nc -lk 9999"
        splitted = re.split(" ", ncCommand)
        process = subprocess.Popen(splitted, 
                                   stdin=subprocess.PIPE,   \
                                   stdout=subprocess.PIPE,  \
                                   stderr=subprocess.PIPE,  \
                                   universal_newlines=True)
        
        return process
    '''===============================================================
    Increment_counter()
    ==============================================================='''
    def Increment_counter(self, delta):
        for no in range(delta):
            self.counter += 1
            self.Print_counter()

    '''===============================================================
    Print_counter()
    ==============================================================='''
    def Print_counter(self):
        minNsec = self.curDtime.strftime("[%M][%S]")
        '''if self.counter == 0:
            self.log.info("%s%s", "[min][sec]" + " " * 1, \
                                            "[seconds-counter]")
            self.log.info("")'''
        '''self.log.info("%s : [%s]", minNsec, \
                      (" " * 3 + str(self.counter))[-3:])'''
        self.log.info("[%s]", (" " * 3 + str(self.counter))[-3:])


    '''===============================================================
    Print_data()
    ==============================================================='''            
    def Print_data(self, data):
        self.log.debug("self.diffInSec -> %s", self.diffInSec)
        '''self.log.info("%s => %s", \
                    (" " * 16 + "[." + self.decimalSeconds + "]"),\
                                                     data)'''
        self.log.info("%s => %s", \
                    (" " * 5 + "[." + self.decimalSeconds + "]"),\
                                                     data)
        
    '''===============================================================
    Get_element_fifoq()
    ==============================================================='''   
    def Get_element_from_fifoq(self, size):
        dataList = []
        for inc in range(size):
            data = self.fifoQ.get_nowait()
            dataList.append(data)
        return dataList    
    
    '''===============================================================
    Add_meta_info()
    ==============================================================='''            
    def Add_meta_info(self, data):
        dataNmeta = []
        dataNmeta.append("data=" + data)
        metaList = self.metaDny['addtodata']
        for meta in metaList:
            self.log.debug("Meta -> %s", meta)
            if(meta == "minsec") :
                minsec = self.curDtime.\
                        strftime("[%M][%S.")
                minsec = minsec + str(self.decimalSeconds) + "]"
                dataNmeta.append("minsec=" + minsec)        
            if(meta == "secondscounter"):
                dataNmeta.append("secondscounter=" + \
                                 str(self.counter) + \
                                 "." + str(self.decimalSeconds))
        return dataNmeta
    
    
    '''===============================================================
    Send_data_to_nc()
    ==============================================================='''            
    def Send_data_to_nc(self, dataList):
        for data in dataList:
            dataNmeta = self.Add_meta_info(data)
            self.log.debug("dataNmeta -> %s", dataNmeta)
            formattedData = ' || '.join(dataNmeta)
            self.log.debug("formattedData -> %s", formattedData)
            self.Print_data(formattedData)
            self.process.stdin.write(formattedData + "\n")
            self.process.stdin.flush()
            
    '''===============================================================
    Place_data_in_queue()
    ==============================================================='''                
    def Place_data_in_queue(self):
        while True :
            for element in self.dataLst:
                delay = element['delay']
                data = element['data']
                #self.log.debug("element -> %s", element)
                #self.log.debug("data -> %s", data)
                #self.log.debug("delay -> %s", delay)
                
                if delay.endswith('s'):
                    delayTime = delay[:-1]
                    #self.log.debug("Delaying by : %s", delayTime)
                    time.sleep(float(delayTime))
                else:
                    sys.exit(1)
                
                if data != 'None':
                    self.fifoQ.put(data)
            if self.metaDny['mode'] == "singlepassmode":
                break            
    '''===============================================================
    run()
    ==============================================================='''                
    def run(self):
        floored = 0
        self.firstDtime = datetime.now()
        self.Print_counter()
        while True:
            time.sleep(0.1)
            self.curDtime = datetime.now()
            self.diffInSec = (self.curDtime - self.firstDtime).\
                                            total_seconds()
            self.log.debug("diffInSec -> %s", self.diffInSec)
            #rounded = round(self.diffInSec, 1)
            #self.log.debug("rounded -> %s", rounded)
            #self.decimalSeconds = str(rounded).split(".")[1]
            self.decimalSeconds = str(self.diffInSec).\
                                        split(".")[1][:1]
            self.log.debug("decimalSeconds -> %s", \
                           self.decimalSeconds)
            self.log.debug("self.diffInSec -> %s", self.diffInSec)
            floored = floor(self.diffInSec)
            self.log.debug("floored -> %s", floored)
            delta = floored - self.counter
            if delta >= 1:
                self.Increment_counter(delta)
            size = self.fifoQ.qsize()
            if(size > 0):  
                dataList = self.Get_element_from_fifoq(size)
                self.Send_data_to_nc(dataList)


Please note the YAML files(given below) should not have any TAB characters. Configure your editor to replace TAB by Spaces (Editors like VI, Eclipse have options to configure this setting)

# a11 > z11_logging.yaml
version         : 1
formatters      :
  simple        :
    #format      : "%(levelname)-7s || %(asctime)s || [ %(filename)s ] [ %(funcName)s() ] [ %(lineno)s ] || %(message)s"
    #format      : "%(levelname)-7s || %(asctime)s || %(filename)s > %(funcName)s() > %(lineno)s ] || %(message)s"
    format      : "%(message)s"
handlers        :
  console       :
    class       : logging.StreamHandler
    level       : DEBUG
    formatter   : simple
    stream      : ext://sys.stdout
    
  file_handler  :
    class       : logging.handlers.RotatingFileHandler
    level       : DEBUG
    formatter   : simple
    maxBytes    : 10485760
    backupCount : 2
    encoding    : utf8
    filename    : purge.log
    
root            :
    level       : INFO
    handlers    : [console, file_handler]           

# a11 > c15_input2.yaml
- meta:
  type            : simple
  # Can be minsec (or) secondscounter
  addtodata       : 
    - secondscounter
  # Can be singpassmode (or) loopmode  
  mode            : loopmode    
  
  # 'None' is used to provide delay without sending data to the
  # 'nc' server
- data  : lion
  delay : 0.9s