#!/usr/bin/env python import threading from dm.common.utility.loggingManager import LoggingManager class FileProcessingThread(threading.Thread): THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0 def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict): threading.Thread.__init__(self) self.setName(name) self.exitFlag = False self.fileProcessingManager = fileProcessingManager self.fileProcessorDict = fileProcessorDict self.fileProcessorKeyList = fileProcessorKeyList self.fileProcessingQueue = fileProcessingQueue self.processedFileDict = processedFileDict self.unprocessedFileDict = unprocessedFileDict self.logger = LoggingManager.getInstance().getLogger(name) def run(self): self.logger.debug('Starting thread: %s' % self.getName()) while True: self.fileProcessingManager.clearEvent() if self.exitFlag: self.logger.debug('Exit flag is set') break while True: fileInfo = self.fileProcessingQueue.pop() if fileInfo is None: break filePath = fileInfo.get('filePath') try: for processorKey in self.fileProcessorKeyList: processor = self.fileProcessorDict.get(processorKey) processorName = processor.__class__.__name__ fileProcessedByDict = fileInfo.get('processedByDict', {}) fileInfo['processedByDict'] = fileProcessedByDict if fileProcessedByDict.has_key(processorName): self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName)) continue self.logger.debug('%s is about to process file %s ' % (processorName, fileInfo)) try: processor.processFile(fileInfo) fileProcessedByDict[processorName] = True self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) except Exception, ex: self.logger.exception(ex) errorMsg = '%s processing error: %s' % (processorName, str(ex)) self.logger.debug(errorMsg) fileProcessingDict = fileInfo.get('processingDict', {}) fileInfo['processingDict'] = fileProcessingDict processorDict = fileProcessingDict.get(processorName, {}) fileProcessingDict[processorName] = processorDict processorDict['lastError'] = str(ex) nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft)) processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1 if nRetriesLeft <= 0: self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo)) self.unprocessedFileDict[filePath] = fileInfo fileInfo['processingError'] = errorMsg else: retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) self.fileProcessingQueue.push(fileInfo, retryWaitPeriod) # Do not process this file further until # this plugin is done break except Exception, ex: self.logger.exception(ex) self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS) self.logger.debug('%s is done' % self.getName()) def stop(self): self.exitFlag = True #################################################################### # Testing if __name__ == '__main__': pass