Newer
Older
#!/usr/bin/env python
import threading
from dm.common.utility.loggingManager import LoggingManager
class FileProcessingThread(threading.Thread):

sveseli
committed
THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0
def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue):
threading.Thread.__init__(self)
self.setName(name)
self.exitFlag = False

sveseli
committed
self.fileProcessingManager = fileProcessingManager
self.fileProcessorDict = fileProcessorDict
self.fileProcessorKeyList = fileProcessorKeyList
self.fileProcessingQueue = fileProcessingQueue
self.logger = LoggingManager.getInstance().getLogger(name)
def processFile(self, fileInfo):
if not len(self.fileProcessorKeyList):
return
filePath = fileInfo.get('filePath')
if not filePath:
self.logger.warn('Refusing to process empty file path')
return
try:
statusMonitor = fileInfo.get('statusMonitor')
statusMonitorId = ''
if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
self.logger.debug('File %s processing is cancelled' % (filePath))
endProcessingTime = time.time()
statusMonitor.fileProcessingCancelled(filePath, endProcessingTime)
return
if statusMonitor:
statusMonitorId = statusMonitor.get('id', '')
self.logger.debug('Starting to process file %s (upload or DAQ id: %s)' % (filePath, statusMonitorId))
startProcessingTime = fileInfo.get('startProcessingTime', time.time())
fileInfo['startProcessingTime'] = startProcessingTime
nProcessors = len(self.fileProcessorKeyList)
processedByDict = fileInfo.get('processedByDict', {})
fileInfo['processedByDict'] = processedByDict
processorNumber = 0
for processorKey in self.fileProcessorKeyList:
processorNumber += 1
processor = self.fileProcessorDict.get(processorKey)
if processorName in skipPlugins:
self.logger.debug('%s will be skipped by %s ' % (filePath, processorName))
continue
if processedByDict.has_key(processorName):
self.logger.debug('%s has already been processed by %s ' % (filePath, processorName))
continue
self.logger.debug('%s is about to process file %s ' % (processorName, filePath))
try:
processor.processFile(fileInfo)
processedByDict[processorName] = True
self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
if processorNumber == nProcessors:
endProcessingTime = time.time()
self.logger.debug('File %s processing is complete (upload or DAQ id: %s)' % (filePath, statusMonitorId))
if statusMonitor:
statusMonitor.fileProcessed(filePath, endProcessingTime)
nProcessedFiles = statusMonitor.get('nProcessedFiles', 0)
self.logger.debug('Upload or DAQ id %s has processed %s files so far)' % (statusMonitorId, nProcessedFiles))
except Exception, ex:
self.logger.exception(ex)
processingError = '%s processing error: %s' % (processorName, str(ex))
self.logger.debug(processingError)
processingDict = fileInfo.get('processingDict', {})
fileInfo['processingDict'] = processingDict
processorDict = processingDict.get(processorName, {})
processingDict[processorName] = processorDict
processorDict['lastError'] = str(ex)
nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries())
self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
if nRetriesLeft <= 0:
endProcessingTime = time.time()
if not processor.getSkipOnFailure():
if statusMonitor:
statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime)
statusMonitor.updateStatus()
self.logger.debug('No more %s retries left for file %s, remaining plugins will not process it' % (processorName, filePath))
else:
if statusMonitor:
statusMonitor.fileProcessingSkipped(processorName, filePath, processingError, endProcessingTime)
statusMonitor.updateStatus()
self.logger.debug('No more %s retries left for file %s, skipping it' % (processorName, filePath))
else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s may retry processing file %s after at least %s seconds' % (processorName, filePath, retryWaitPeriod))
self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
# Do not process this file further until
# this plugin is done
break
except Exception, ex:
self.logger.exception(ex)
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
while True:

sveseli
committed
self.fileProcessingManager.clearEvent()
if self.exitFlag:

sveseli
committed
self.logger.debug('Exit flag is set')
break
while True:
try:
fileInfo = self.fileProcessingQueue.pop()
if fileInfo is None:
break
self.logger.debug('Processing queue depth after pop: %s', self.fileProcessingQueue.getLength())
self.processFile(fileInfo)
except Exception, ex:
self.logger.exception(ex)

sveseli
committed
self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS)
self.logger.debug('%s is done' % self.getName())