Skip to content
Snippets Groups Projects
Forked from DM / dm-docs
261 commits behind, 467 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
fileProcessingManager.py 4.99 KiB
#!/usr/bin/env python

import threading
import time

from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue
from dm.common.utility.singleton import Singleton
from fileProcessingThread import FileProcessingThread

class FileProcessingManager(threading.Thread,Singleton):

    CONFIG_SECTION_NAME = 'FileProcessingManager'
    NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads'
    DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries'
    DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds'
    FILE_PROCESSOR_KEY = 'fileprocessor'

    # Singleton.
    __instanceLock = threading.RLock()
    __instance = None

    def __init__(self):
        FileProcessingManager.__instanceLock.acquire()
        try:
            if FileProcessingManager.__instance:
                return
            FileProcessingManager.__instance = self
            self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)

            self.logger.debug('Initializing')
            self.lock = threading.RLock()
            self.fileProcessingThreadDict = {}
            self.eventFlag = threading.Event()
            self.fileProcessorDict = {}
            self.fileProcessorKeyList = []
            self.fileProcessingQueue = TimeBasedProcessingQueue()
            self.processedFileDict = {}
            self.unprocessedFileDict = {}
            self.__configure()
            self.logger.debug('Initialization complete')
        finally:
            FileProcessingManager.__instanceLock.release()

    def __configure(self):
        cm = ConfigurationManager.getInstance()
        configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME)
        self.logger.debug('Got config items: %s' % configItems)
        self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY))
        self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY))
        self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))

        # Create processors  
        for (key,value) in configItems:
            if key.startswith(FileProcessingManager.FILE_PROCESSOR_KEY):
                (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value)
                self.logger.debug('Creating file processor instance of class %s' % className)
                fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor)
                self.logger.debug('Configuring file processor %s' % fileProcessor)
                fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
                fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
                fileProcessor.configure()
                self.fileProcessorDict[key] = fileProcessor
        self.fileProcessorKeyList = self.fileProcessorDict.keys()
        self.fileProcessorKeyList.sort()
        self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)

    def processFile(self, fileInfo):
        self.fileProcessingQueue.push(fileInfo)
        self.eventFlag.set()

    def start(self):
        self.lock.acquire()
        try:
            self.logger.debug('Starting file processing threads')
            for i in range(0, self.nProcessingThreads):
                tName = 'FileProcessingThread-%s' % i
                t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue, self.processedFileDict, self.unprocessedFileDict)
                t.start()
                self.fileProcessingThreadDict[tName] = t
        finally:
            self.lock.release()

    def stop(self):
        self.logger.debug('Stopping file processing threads')
        for (tName, t) in self.fileProcessingThreadDict.items():
            t.stop()
        self.lock.acquire()
        try:
            self.eventFlag.set()
        finally:
             self.lock.release()
        for (tName, t) in self.fileProcessingThreadDict.items():
            t.join()

    def setEvent(self):
        self.lock.acquire()
        try:
             self.eventFlag.set()
        finally:
            self.lock.release()

    def clearEvent(self):
        self.lock.acquire()
        try:
            self.eventFlag.clear()
        finally:
            self.lock.release()

    def waitOnEvent(self, timeoutInSeconds=None):
        self.eventFlag.wait(timeoutInSeconds)

####################################################################
# Testing

if __name__ == '__main__':
    fp = FileProcessingManager.getInstance()
    print fp
    #fp.start()
    #time.sleep(30)
    #fp.stop()