Skip to content
Snippets Groups Projects
fileProcessingThread.py 5.46 KiB
Newer Older
import time

from dm.common.utility.loggingManager import LoggingManager

class FileProcessingThread(threading.Thread):

    def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue):
    
        threading.Thread.__init__(self)
        self.setName(name)
        self.exitFlag = False
        self.fileProcessingManager = fileProcessingManager
        self.fileProcessorDict = fileProcessorDict
        self.fileProcessorKeyList = fileProcessorKeyList
        self.fileProcessingQueue = fileProcessingQueue
        self.logger = LoggingManager.getInstance().getLogger(name)

        if not len(self.fileProcessorKeyList): 
            return 

        if not filePath:
            self.logger.warn('Refusing to process empty file path')
            return

            statusMonitor = fileInfo.get('statusMonitor')
            if statusMonitor and statusMonitor.get('status') == 'aborting':
                self.logger.debug('File %s processing is cancelled' % (filePath))
                endProcessingTime = time.time() 
                statusMonitor.fileProcessingCancelled(filePath, endProcessingTime) 
                return 
            self.logger.debug('Starting processing file %s' % filePath)
            fileInfo['startProcessingTime'] = time.time() 
            nProcessors = len(self.fileProcessorKeyList)
            processedByDict = fileInfo.get('processedByDict', {})
            fileInfo['processedByDict'] = processedByDict
            processorNumber = 0
            for processorKey in self.fileProcessorKeyList: 
                processorNumber += 1
                processor = self.fileProcessorDict.get(processorKey)
                processorName = '%s-%s' % (processor.__class__.__name__,processorNumber)

                if processedByDict.has_key(processorName):
                    self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName))
                    continue

                self.logger.debug('%s is about to process file %s ' % (processorName, filePath))
                try:
                    processor.processFile(fileInfo)
                    processedByDict[processorName] = True
                    self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
                    if processorNumber == nProcessors:
                        self.logger.debug('File %s processing is complete' % (filePath))
                        endProcessingTime = time.time() 
                        if statusMonitor:
                            statusMonitor.fileProcessed(filePath, endProcessingTime) 
                except Exception, ex:
                    self.logger.exception(ex)
                    processingError = '%s processing error: %s' % (processorName, str(ex))
                    self.logger.debug(processingError)
                    processingDict = fileInfo.get('processingDict', {})
                    fileInfo['processingDict'] = processingDict
                    processorDict = processingDict.get(processorName, {}) 
                    processingDict[processorName] = processorDict

                    processorDict['lastError'] = str(ex)
                    nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) 
                    self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
                    processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
                    if nRetriesLeft <= 0:
                        endProcessingTime = time.time() 
                        if statusMonitor:
                            statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime) 
                        self.logger.debug('No more %s retries left for file %s' % (processorName, filePath))
                    else:
                        retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
                        self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
                        self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
                        # Do not process this file further until 
                        # this plugin is done
                        break

        except Exception, ex:
            self.logger.exception(ex)

    def run(self):
        self.logger.debug('Starting thread: %s' % self.getName())
        while True:
            self.fileProcessingManager.clearEvent()
                    fileInfo = self.fileProcessingQueue.pop()
                    if fileInfo is None:
                        break
                    self.logger.debug('Processing queue depth after pop: %s', self.fileProcessingQueue.getLength())
                    self.processFile(fileInfo)
                except Exception, ex:
                    self.logger.exception(ex)

            self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS)
        self.logger.debug('%s is done' % self.getName())

    def stop(self):
        self.exitFlag = True

####################################################################
# Testing

if __name__ == '__main__':
    pass