Newer
Older
#!/usr/bin/env python
import threading
from dm.common.utility.loggingManager import LoggingManager
class FileProcessingThread(threading.Thread):

sveseli
committed
THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0
def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict):
threading.Thread.__init__(self)
self.setName(name)
self.exitFlag = False

sveseli
committed
self.fileProcessingManager = fileProcessingManager
self.fileProcessorDict = fileProcessorDict
self.fileProcessorKeyList = fileProcessorKeyList
self.fileProcessingQueue = fileProcessingQueue
self.processedFileDict = processedFileDict
self.unprocessedFileDict = unprocessedFileDict
self.logger = LoggingManager.getInstance().getLogger(name)
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
while True:

sveseli
committed
self.fileProcessingManager.clearEvent()
if self.exitFlag:

sveseli
committed
self.logger.debug('Exit flag is set')
break
while True:

sveseli
committed
fileInfo = self.fileProcessingQueue.pop()
if fileInfo is None:
break

sveseli
committed
filePath = fileInfo.get('filePath')
try:
fileInfo['startProcessingTime'] = time.time()
processingInfo = fileInfo.get('processingInfo')
processingInfo['startProcessingTime'] = fileInfo['startProcessingTime']
processorNumber = 0
nProcessors = len(self.fileProcessorKeyList)
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
processorName = processor.__class__.__name__

sveseli
committed
fileProcessedByDict = fileInfo.get('processedByDict', {})
fileInfo['processedByDict'] = fileProcessedByDict
if fileProcessedByDict.has_key(processorName):

sveseli
committed
self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName))

sveseli
committed
self.logger.debug('%s is about to process file %s ' % (processorName, fileInfo))
try:

sveseli
committed
processor.processFile(fileInfo)
fileProcessedByDict[processorName] = True

sveseli
committed
self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
if processorNumber == nProcessors:
self.logger.debug('File %s processing is complete' % (filePath))
fileInfo['endProcessingTime'] = time.time()
processingInfo['endProcessingTime'] = fileInfo['endProcessingTime']
processingInfo['processed'] = True
except Exception, ex:
self.logger.exception(ex)
errorMsg = '%s processing error: %s' % (processorName, str(ex))

sveseli
committed
fileProcessingDict = fileInfo.get('processingDict', {})
fileInfo['processingDict'] = fileProcessingDict
processorDict = fileProcessingDict.get(processorName, {})

sveseli
committed
fileProcessingDict[processorName] = processorDict
processorDict['lastError'] = str(ex)
nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries())

sveseli
committed
self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
if nRetriesLeft <= 0:

sveseli
committed
self.unprocessedFileDict[filePath] = fileInfo
processingInfo['endProcessingTime'] = fileInfo['endProcessingTime']
processingInfo['processingError'] = errorMsg
self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo))
else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()

sveseli
committed
self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
# Do not process this file further until
except Exception, ex:
self.logger.exception(ex)

sveseli
committed
self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS)
self.logger.debug('%s is done' % self.getName())