#!/usr/bin/env python import threading import time from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.configurationManager import ConfigurationManager from dm.common.utility.objectUtility import ObjectUtility from dm.common.utility.threadSafeQueue import ThreadSafeQueue from dm.common.utility.singleton import Singleton from dm.daq_web_service.service.impl.fileProcessingThread import FileProcessingThread class FileProcessingManager(threading.Thread,Singleton): CONFIG_SECTION_NAME = 'FileProcessingManager' N_PROCESSING_THREADS_KEY = 'nprocessingthreads' FILE_PROCESSOR_KEY = 'fileprocessor' # Singleton. __instanceLock = threading.RLock() __instance = None def __init__(self): FileProcessingManager.__instanceLock.acquire() try: if FileProcessingManager.__instance: return FileProcessingManager.__instance = self self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) self.logger.debug('Initializing') self.lock = threading.RLock() self.fileProcessingThreadDict = {} self.eventFlag = threading.Event() self.fileProcessorDict = {} self.fileProcessorKeyList = [] self.fileProcessingQueue = ThreadSafeQueue() self.processedFileDict = {} self.unprocessedFileDict = {} self.__configure() self.logger.debug('Initialization complete') finally: FileProcessingManager.__instanceLock.release() def __configure(self): cm = ConfigurationManager.getInstance() configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME) self.logger.debug('Got config items: %s' % configItems) self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.N_PROCESSING_THREADS_KEY)) # Create processors for (key,value) in configItems: if key.startswith(FileProcessingManager.FILE_PROCESSOR_KEY): (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value) self.logger.debug('Creating file processor instance of class %s' % className) fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor) self.fileProcessorDict[key] = fileProcessor self.fileProcessorKeyList = self.fileProcessorDict.keys() self.fileProcessorKeyList.sort() self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList) def processObservedFile(self, observedFile): self.fileProcessingQueue.push(observedFile) self.eventFlag.set() def start(self): self.lock.acquire() try: self.logger.debug('Starting file processing threads') for i in range(0, self.nProcessingThreads): tName = 'FileProcessingThread-%s' % i t = FileProcessingThread(tName, self.eventFlag, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue, self.processedFileDict, self.unprocessedFileDict) t.start() self.fileProcessingThreadDict[tName] = t finally: self.lock.release() def stop(self): self.lock.acquire() try: self.logger.debug('Stopping file processing threads') for (tName, t) in self.fileProcessingThreadDict.items(): t.stop() self.eventFlag.set() for (tName, t) in self.fileProcessingThreadDict.items(): t.join() finally: self.lock.release() def setEvent(self): self.lock.acquire() try: self.eventFlag.set() finally: self.lock.release() def clearEvent(self): self.lock.acquire() try: self.eventFlag.clear() finally: self.lock.release() #################################################################### # Testing if __name__ == '__main__': fp = FileProcessingManager.getInstance() print fp #fp.start() #time.sleep(30) #fp.stop()