diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index 173b8e5ca8f354ba7c1dab3be2c981218e9c526c..ddd77402095316f03b065c37b78c6a56cd36ff24 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -58,7 +58,7 @@ class FileProcessingManager(threading.Thread,Singleton): (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value) self.logger.debug('Creating file processor instance of class %s' % className) fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor) - self.logger.debug('Configring file processor %s' % fileProcessor) + self.logger.debug('Configuring file processor %s' % fileProcessor) fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries) fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds) fileProcessor.configure() @@ -67,8 +67,8 @@ class FileProcessingManager(threading.Thread,Singleton): self.fileProcessorKeyList.sort() self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList) - def processObservedFile(self, observedFile): - self.fileProcessingQueue.push(observedFile) + def processFile(self, fileInfo): + self.fileProcessingQueue.push(fileInfo) self.eventFlag.set() def start(self): @@ -77,24 +77,23 @@ class FileProcessingManager(threading.Thread,Singleton): self.logger.debug('Starting file processing threads') for i in range(0, self.nProcessingThreads): tName = 'FileProcessingThread-%s' % i - t = FileProcessingThread(tName, self.eventFlag, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue, self.processedFileDict, self.unprocessedFileDict) + t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue, self.processedFileDict, self.unprocessedFileDict) t.start() self.fileProcessingThreadDict[tName] = t finally: self.lock.release() def stop(self): + self.logger.debug('Stopping file processing threads') + for (tName, t) in self.fileProcessingThreadDict.items(): + t.stop() self.lock.acquire() try: - self.logger.debug('Stopping file processing threads') - for (tName, t) in self.fileProcessingThreadDict.items(): - t.stop() self.eventFlag.set() - for (tName, t) in self.fileProcessingThreadDict.items(): - t.join() finally: self.lock.release() - + for (tName, t) in self.fileProcessingThreadDict.items(): + t.join() def setEvent(self): self.lock.acquire() @@ -110,6 +109,9 @@ class FileProcessingManager(threading.Thread,Singleton): finally: self.lock.release() + def waitOnEvent(self, timeoutInSeconds=None): + self.eventFlag.wait(timeoutInSeconds) + #################################################################### # Testing diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index 0f548ac835295e1073030b0e867480070abdb4ae..61e5c2f122a852f7af1bbd8852c218bdf56504dd 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -6,12 +6,14 @@ from dm.common.utility.loggingManager import LoggingManager class FileProcessingThread(threading.Thread): - def __init__ (self, name, eventFlag, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict): + THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0 + + def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict): threading.Thread.__init__(self) self.setName(name) self.exitFlag = False - self.eventFlag = eventFlag + self.fileProcessingManager = fileProcessingManager self.fileProcessorDict = fileProcessorDict self.fileProcessorKeyList = fileProcessorKeyList self.fileProcessingQueue = fileProcessingQueue @@ -22,55 +24,58 @@ class FileProcessingThread(threading.Thread): def run(self): self.logger.debug('Starting thread: %s' % self.getName()) while True: + self.fileProcessingManager.clearEvent() if self.exitFlag: - self.logger.debug('Exit flag set, %s done' % self.getName()) + self.logger.debug('Exit flag is set') break while True: - file = self.fileProcessingQueue.pop() - if file is None: + fileInfo = self.fileProcessingQueue.pop() + if fileInfo is None: break - filePath = file.getFilePath() - daqPath = file.getDaqPath() - experiment = file.getExperiment() + filePath = fileInfo.get('filePath') try: for processorKey in self.fileProcessorKeyList: processor = self.fileProcessorDict.get(processorKey) processorName = processor.__class__.__name__ - fileProcessedByDict = file.get('processedByDict', {}) + fileProcessedByDict = fileInfo.get('processedByDict', {}) + fileInfo['processedByDict'] = fileProcessedByDict + if fileProcessedByDict.has_key(processorName): - self.logger.debug('%s has already been processed by %s ' % (file, processorName)) + self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName)) continue - self.logger.debug('%s is about to process file %s ' % (processorName, file)) + self.logger.debug('%s is about to process file %s ' % (processorName, fileInfo)) try: - processor.processFile(filePath, daqPath, experiment) + processor.processFile(fileInfo) fileProcessedByDict[processorName] = True - file['processedByDict'] = fileProcessedByDict - self.logger.debug('%s processed file %s ' % (processorName, file)) + self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) except Exception, ex: self.logger.exception(ex) - self.logger.debug('%s processing failed for file %s ' % (processorName, file)) - fileProcessingDict = file.get('processingDict', {}) - file['processingDict'] = fileProcessingDict + self.logger.debug('%s processing failed for file at path %s ' % (processorName, filePath)) + fileProcessingDict = fileInfo.get('processingDict', {}) + fileInfo['processingDict'] = fileProcessingDict processorDict = fileProcessingDict.get(processorName, {}) - processorDict['lastError'] = ex + fileProcessingDict[processorName] = processorDict + + processorDict['lastError'] = str(ex) nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) - self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, file, nRetriesLeft)) + self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft)) processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1 if nRetriesLeft <= 0: - self.logger.debug('No more %s retries left for file %s' % (processorName, file)) - self.unprocessedFileDict[filePath] = file + self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo)) + self.unprocessedFileDict[filePath] = fileInfo else: retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() - self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, file, retryWaitPeriod)) - self.fileProcessingQueue.push(file, retryWaitPeriod) + self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) + self.fileProcessingQueue.push(fileInfo, retryWaitPeriod) except Exception, ex: self.logger.exception(ex) - self.eventFlag.wait() + self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS) + self.logger.debug('%s is done' % self.getName()) def stop(self): self.exitFlag = True