#!/usr/bin/env python import threading import time import os from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.configurationManager import ConfigurationManager from dm.common.utility.objectUtility import ObjectUtility from dm.common.utility.valueUtility import ValueUtility from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue from dm.common.utility.singleton import Singleton from fileProcessingThread import FileProcessingThread class FileProcessingManager(threading.Thread,Singleton): CONFIG_SECTION_NAME = 'FileProcessingManager' NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads' DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries' DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds' FILE_PROCESSOR_KEY = 'fileprocessor' # Singleton. __instanceLock = threading.RLock() __instance = None def __init__(self): FileProcessingManager.__instanceLock.acquire() try: if FileProcessingManager.__instance: return FileProcessingManager.__instance = self self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) self.logger.debug('Initializing') self.lock = threading.RLock() self.fileProcessingThreadDict = {} self.eventFlag = threading.Event() self.fileProcessorDict = {} self.fileProcessorKeyList = [] self.fileProcessingQueue = TimeBasedProcessingQueue() self.__configure() self.logger.debug('Initialization complete') finally: FileProcessingManager.__instanceLock.release() def __configure(self): cm = ConfigurationManager.getInstance() configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME) self.logger.debug('Got config items: %s' % configItems) self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY)) self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY)) self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY)) # Create processors for (key,value) in configItems: if key.startswith(FileProcessingManager.FILE_PROCESSOR_KEY): (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value) self.logger.debug('Creating file processor instance of class %s' % className) fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor) self.logger.debug('Configuring file processor %s' % fileProcessor) fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries) fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds) fileProcessor.configure() self.fileProcessorDict[key] = fileProcessor self.fileProcessorKeyList = self.fileProcessorDict.keys() self.fileProcessorKeyList.sort() self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList) # Assign processor names processorNumber = 0 for processorKey in self.fileProcessorKeyList: processorNumber += 1 processor = self.fileProcessorDict.get(processorKey) processorName = '%s-%s' % (processor.__class__.__name__,processorNumber) processor.name = processorName # Corect processor dependenciens for processorKey in self.fileProcessorKeyList: self.logger.debug('Determining dependencies for processor %s' % (processorKey)) processor = self.fileProcessorDict.get(processorKey) dependsOn = [] for depProcessorKey in processor.dependsOn: depProcessor = self.fileProcessorDict.get(depProcessorKey.lower()) if depProcessor: dependsOn.append(depProcessor.name) processor.dependsOn = dependsOn self.logger.debug('Processor %s depends on: %s' % (processor.name, processor.dependsOn)) # Remove hidden files from dictionary of files to be processed def removeHiddenFilesFromProcessing(self, filePathsDict, uploadInfo): if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')): del uploadInfo['processHiddenFiles'] return filePathsDict self.logger.debug('Checking for hidden files') nRemoved = 0 for (filePath,filePathDict) in filePathsDict.items(): fileName = os.path.basename(filePath) if fileName.startswith('.'): self.logger.debug('File path %s is hidden file, will not process it' % filePath) del filePathsDict[filePath] nRemoved += 1 self.logger.debug('Removed %s hidden files, %s candidates remaining' % (nRemoved, len(filePathsDict))) return filePathsDict # Each plugin calculates list of files that need to be processed # Final result is union of all plugins def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): if ValueUtility.toBoolean(uploadInfo.get('reprocessFiles')): del uploadInfo['reprocessFiles'] return filePathsDict self.logger.debug('Checking files with processor plugins') checkedFilePathsDict = {} for processorKey in self.fileProcessorKeyList: processor = self.fileProcessorDict.get(processorKey) # Processor will return list of files it must process pluginFilePathsDict = processor.checkUploadFilesForProcessing(filePathsDict, uploadInfo) if len(pluginFilePathsDict): checkedFilePathsDict.update(pluginFilePathsDict) self.logger.debug('There are %s processing candidates remaining' % (len(filePathsDict))) return checkedFilePathsDict def processFile(self, fileInfo): self.fileProcessingQueue.push(fileInfo) self.eventFlag.set() def appendFileProcessor(self, fileProcessor): key = fileProcessor.__class__.__name__ self.logger.debug('Adding file processor: %s' % key) self.fileProcessorDict[key] = fileProcessor self.fileProcessorKeyList.append(key) self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList) def start(self): self.lock.acquire() try: self.logger.debug('Starting file processing threads') for i in range(0, self.nProcessingThreads): tName = 'FileProcessingThread-%s' % i t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue) t.start() self.fileProcessingThreadDict[tName] = t finally: self.lock.release() def stop(self): self.logger.debug('Stopping file processing threads') for (tName, t) in self.fileProcessingThreadDict.items(): t.stop() self.lock.acquire() try: self.eventFlag.set() finally: self.lock.release() for (tName, t) in self.fileProcessingThreadDict.items(): t.join() def setEvent(self): self.lock.acquire() try: self.eventFlag.set() finally: self.lock.release() def clearEvent(self): self.lock.acquire() try: self.eventFlag.clear() finally: self.lock.release() def waitOnEvent(self, timeoutInSeconds=None): self.eventFlag.wait(timeoutInSeconds) #################################################################### # Testing if __name__ == '__main__': fp = FileProcessingManager.getInstance() print fp #fp.start() #time.sleep(30) #fp.stop()