Skip to content
Snippets Groups Projects
fileProcessingThread.py 4.43 KiB
Newer Older
#!/usr/bin/env python

import threading

from dm.common.utility.loggingManager import LoggingManager

class FileProcessingThread(threading.Thread):

    THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0

    def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict):
    
        threading.Thread.__init__(self)
        self.setName(name)
        self.exitFlag = False
        self.fileProcessingManager = fileProcessingManager
        self.fileProcessorDict = fileProcessorDict
        self.fileProcessorKeyList = fileProcessorKeyList
        self.fileProcessingQueue = fileProcessingQueue
        self.processedFileDict = processedFileDict 
        self.unprocessedFileDict = unprocessedFileDict 
        self.logger = LoggingManager.getInstance().getLogger(name)

    def run(self):
        self.logger.debug('Starting thread: %s' % self.getName())
        while True:
            self.fileProcessingManager.clearEvent()
                fileInfo = self.fileProcessingQueue.pop()
                if fileInfo is None:
                try:
                    for processorKey in self.fileProcessorKeyList: 
                        processor = self.fileProcessorDict.get(processorKey)
                        processorName = processor.__class__.__name__
                        fileProcessedByDict = fileInfo.get('processedByDict', {})
                        fileInfo['processedByDict'] = fileProcessedByDict

                        if fileProcessedByDict.has_key(processorName):
                            self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName))
                        self.logger.debug('%s is about to process file %s ' % (processorName, fileInfo))
                            fileProcessedByDict[processorName] = True
                            self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
                        except Exception, ex:
                            self.logger.exception(ex)
                            errorMsg = '%s processing failed for file %s: %s' % (processorName, filePath, str(ex)))
                            self.logger.debug(errorMsg)
                            fileProcessingDict = fileInfo.get('processingDict', {})
                            fileInfo['processingDict'] = fileProcessingDict
                            processorDict = fileProcessingDict.get(processorName, {}) 
                            fileProcessingDict[processorName] = processorDict

                            processorDict['lastError'] = str(ex)
                            nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) 
                            self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
                            processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
                            if nRetriesLeft <= 0:
                                self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo))
                                self.unprocessedFileDict[filePath] = fileInfo
                                fileInfo['processingError'] = errorMsg
                            else:
                                retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
                                self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
                                self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
                            # Do not process this file further until 
                            # this plugin is done
                            break
            self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS)
        self.logger.debug('%s is done' % self.getName())

    def stop(self):
        self.exitFlag = True

####################################################################
# Testing

if __name__ == '__main__':
    pass