Skip to content
Snippets Groups Projects
fileProcessingThread.py 6.97 KiB
Newer Older
import time
sveseli's avatar
sveseli committed
from dm.common.constants import dmProcessingStatus

from dm.common.utility.loggingManager import LoggingManager

class FileProcessingThread(threading.Thread):

    def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue):
    
        threading.Thread.__init__(self)
        self.setName(name)
        self.exitFlag = False
        self.fileProcessingManager = fileProcessingManager
        self.fileProcessorDict = fileProcessorDict
        self.fileProcessorKeyList = fileProcessorKeyList
        self.fileProcessingQueue = fileProcessingQueue
        self.logger = LoggingManager.getInstance().getLogger(name)

        if not len(self.fileProcessorKeyList): 
            return 

        if not filePath:
            self.logger.warn('Refusing to process empty file path')
            return

            statusMonitor = fileInfo.get('statusMonitor')
sveseli's avatar
sveseli committed
            statusMonitorId = ''
            if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
                self.logger.debug('File %s processing is cancelled' % (filePath))
                endProcessingTime = time.time() 
                statusMonitor.fileProcessingCancelled(filePath, endProcessingTime) 
                statusMonitor.updateStatus()
sveseli's avatar
sveseli committed
            if statusMonitor:
                statusMonitorId = statusMonitor.get('id', '')
            self.logger.debug('Starting to process file %s (upload or DAQ id: %s)' % (filePath, statusMonitorId))
            startProcessingTime = fileInfo.get('startProcessingTime', time.time())
            fileInfo['startProcessingTime'] = startProcessingTime
            nProcessors = len(self.fileProcessorKeyList)
            processedByDict = fileInfo.get('processedByDict', {})
            fileInfo['processedByDict'] = processedByDict
            skipPlugins = fileInfo.get('skipPlugins', [])
            processorNumber = 0
            for processorKey in self.fileProcessorKeyList: 
                processorNumber += 1
                processor = self.fileProcessorDict.get(processorKey)
                processorName = processor.name
                if processorName in skipPlugins:
                    self.logger.debug('%s will be skipped by %s ' % (filePath, processorName))
                    continue

                if processedByDict.has_key(processorName):
                    self.logger.debug('%s has already been processed by %s ' % (filePath, processorName))
                    continue

                self.logger.debug('%s is about to process file %s ' % (processorName, filePath))
                try:
                    processor.processFile(fileInfo)
                    processedByDict[processorName] = True
                    self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
                    if processorNumber == nProcessors:
                        endProcessingTime = time.time() 
sveseli's avatar
sveseli committed
                        self.logger.debug('File %s processing is complete (upload or DAQ id: %s)' % (filePath, statusMonitorId))
                        if statusMonitor:
                            statusMonitor.fileProcessed(filePath, endProcessingTime) 
                            statusMonitor.updateStatus()
sveseli's avatar
sveseli committed
                            nProcessedFiles = statusMonitor.get('nProcessedFiles', 0)
                            self.logger.debug('Upload or DAQ id %s has processed %s files so far)' % (statusMonitorId, nProcessedFiles))
                except Exception, ex:
                    self.logger.exception(ex)
                    processingError = '%s processing error: %s' % (processorName, str(ex))
                    self.logger.debug(processingError)
                    processingDict = fileInfo.get('processingDict', {})
                    fileInfo['processingDict'] = processingDict
                    processorDict = processingDict.get(processorName, {}) 
                    processingDict[processorName] = processorDict

                    processorDict['lastError'] = str(ex)
                    nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) 
                    self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
                    processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
                    if nRetriesLeft <= 0:
                        endProcessingTime = time.time() 
                        if not processor.getSkipOnFailure():
                            if statusMonitor:
                                statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime) 
                                statusMonitor.updateStatus()
                            self.logger.debug('No more %s retries left for file %s, remaining plugins will not process it' % (processorName, filePath))
                        else:
                            if statusMonitor:
                                statusMonitor.fileProcessingSkipped(processorName, filePath, processingError, endProcessingTime) 
                                statusMonitor.updateStatus()
                            self.logger.debug('No more %s retries left for file %s, skipping it' % (processorName, filePath))
                    else:
                        retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
sveseli's avatar
sveseli committed
                        self.logger.debug('%s may retry processing file %s after at least %s seconds' % (processorName, filePath, retryWaitPeriod))
                        self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
                        # Do not process this file further until 
                        # this plugin is done
                        break

        except Exception, ex:
            self.logger.exception(ex)

    def run(self):
        self.logger.debug('Starting thread: %s' % self.getName())
        while True:
            self.fileProcessingManager.clearEvent()
                    fileInfo = self.fileProcessingQueue.pop()
                    if fileInfo is None:
                        break
                    self.logger.debug('Processing queue depth after pop: %s', self.fileProcessingQueue.getLength())
                    self.processFile(fileInfo)
                except Exception, ex:
                    self.logger.exception(ex)

            self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS)
        self.logger.debug('%s is done' % self.getName())

    def stop(self):
        self.exitFlag = True

####################################################################
# Testing

if __name__ == '__main__':
    pass