#!/usr/bin/env python import threading from dm.common.utility.loggingManager import LoggingManager class FileProcessingThread(threading.Thread): def __init__ (self, name, eventFlag, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict): threading.Thread.__init__(self) self.setName(name) self.exitFlag = False self.eventFlag = eventFlag self.fileProcessorDict = fileProcessorDict self.fileProcessorKeyList = fileProcessorKeyList self.fileProcessingQueue = fileProcessingQueue self.processedFileDict = processedFileDict self.unprocessedFileDict = unprocessedFileDict self.logger = LoggingManager.getInstance().getLogger(name) def run(self): self.logger.debug('Starting thread: %s' % self.getName()) while True: if self.exitFlag: self.logger.debug('Exit flag set, %s done' % self.getName()) break while True: file = self.fileProcessingQueue.pop() if file is None: break filePath = file.getFilePath() daqPath = file.getDaqPath() experiment = file.getExperiment() try: for processorKey in self.fileProcessorKeyList: processor = self.fileProcessorDict.get(processorKey) processorName = processor.__class__.__name__ self.logger.debug('%s is about to process file %s ' % (processorName, file)) try: processor.processFile(filePath, daqPath, experiment) self.logger.debug('%s processed file %s ' % (processorName, file)) except Exception, ex: self.logger.exception(ex) self.logger.debug('%s processing failed for file %s ' % (processorName, file)) file[processorName] = {'error' : ex} self.unprocessedFileDict[filePath] = file except Exception, ex: self.logger.exception(ex) self.eventFlag.wait() def stop(self): self.exitFlag = True #################################################################### # Testing if __name__ == '__main__': pass