#!/usr/bin/env python import threading import time from dm.common.constants import dmProcessingStatus from dm.common.utility.loggingManager import LoggingManager class FileProcessingThread(threading.Thread): THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0 def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue): threading.Thread.__init__(self) self.setName(name) self.exitFlag = False self.fileProcessingManager = fileProcessingManager self.fileProcessorDict = fileProcessorDict self.fileProcessorKeyList = fileProcessorKeyList self.fileProcessingQueue = fileProcessingQueue self.logger = LoggingManager.getInstance().getLogger(name) def processFile(self, fileInfo): if not len(self.fileProcessorKeyList): return filePath = fileInfo.get('filePath') if not filePath: self.logger.warn('Refusing to process empty file path') return try: statusMonitor = fileInfo.get('statusMonitor') statusMonitorId = '' if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING: self.logger.debug('File %s processing is cancelled' % (filePath)) endProcessingTime = time.time() statusMonitor.fileProcessingCancelled(filePath, endProcessingTime) statusMonitor.updateStatus() return if statusMonitor: statusMonitorId = statusMonitor.get('id', '') self.logger.debug('Starting to process file %s (upload or DAQ id: %s)' % (filePath, statusMonitorId)) startProcessingTime = fileInfo.get('startProcessingTime', time.time()) fileInfo['startProcessingTime'] = startProcessingTime nProcessors = len(self.fileProcessorKeyList) processedByDict = fileInfo.get('processedByDict', {}) fileInfo['processedByDict'] = processedByDict skipPlugins = fileInfo.get('skipPlugins', []) processorNumber = 0 for processorKey in self.fileProcessorKeyList: processorNumber += 1 processor = self.fileProcessorDict.get(processorKey) processorName = processor.name if processorName in skipPlugins: self.logger.debug('%s will be skipped by %s ' % (filePath, processorName)) continue if processedByDict.has_key(processorName): self.logger.debug('%s has already been processed by %s ' % (filePath, processorName)) continue self.logger.debug('%s is about to process file %s ' % (processorName, filePath)) try: processor.processFile(fileInfo) processedByDict[processorName] = True self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) if processorNumber == nProcessors: endProcessingTime = time.time() self.logger.debug('File %s processing is complete (upload or DAQ id: %s)' % (filePath, statusMonitorId)) if statusMonitor: statusMonitor.fileProcessed(filePath, endProcessingTime) statusMonitor.updateStatus() nProcessedFiles = statusMonitor.get('nProcessedFiles', 0) self.logger.debug('Upload or DAQ id %s has processed %s files so far)' % (statusMonitorId, nProcessedFiles)) except Exception, ex: self.logger.exception(ex) processingError = '%s processing error: %s' % (processorName, str(ex)) self.logger.debug(processingError) processingDict = fileInfo.get('processingDict', {}) fileInfo['processingDict'] = processingDict processorDict = processingDict.get(processorName, {}) processingDict[processorName] = processorDict processorDict['lastError'] = str(ex) nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft)) processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1 if nRetriesLeft <= 0: endProcessingTime = time.time() if statusMonitor: statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime) statusMonitor.updateStatus() self.logger.debug('No more %s retries left for file %s, remaining plugins will not process it' % (processorName, filePath)) return else: retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() self.logger.debug('%s may retry processing file %s after at least %s seconds' % (processorName, filePath, retryWaitPeriod)) self.fileProcessingQueue.push(fileInfo, retryWaitPeriod) # Do not process this file further until # this plugin is done break except Exception, ex: self.logger.exception(ex) def run(self): self.logger.debug('Starting thread: %s' % self.getName()) while True: self.fileProcessingManager.clearEvent() if self.exitFlag: self.logger.debug('Exit flag is set') break while True: try: fileInfo = self.fileProcessingQueue.pop() if fileInfo is None: break self.logger.debug('Processing queue depth after pop: %s', self.fileProcessingQueue.getLength()) self.processFile(fileInfo) except Exception, ex: self.logger.exception(ex) self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS) self.logger.debug('%s is done' % self.getName()) def stop(self): self.exitFlag = True #################################################################### # Testing if __name__ == '__main__': pass