Skip to content
Snippets Groups Projects
Commit 09b8150a authored by sveseli's avatar sveseli
Browse files

reworked file processing manager and thread to shutdown cleanly and avoid busy wait

parent a8559e3f
No related branches found
No related tags found
No related merge requests found
...@@ -58,7 +58,7 @@ class FileProcessingManager(threading.Thread,Singleton): ...@@ -58,7 +58,7 @@ class FileProcessingManager(threading.Thread,Singleton):
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value) (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value)
self.logger.debug('Creating file processor instance of class %s' % className) self.logger.debug('Creating file processor instance of class %s' % className)
fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor) fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.logger.debug('Configring file processor %s' % fileProcessor) self.logger.debug('Configuring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries) fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds) fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
fileProcessor.configure() fileProcessor.configure()
...@@ -67,8 +67,8 @@ class FileProcessingManager(threading.Thread,Singleton): ...@@ -67,8 +67,8 @@ class FileProcessingManager(threading.Thread,Singleton):
self.fileProcessorKeyList.sort() self.fileProcessorKeyList.sort()
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList) self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
def processObservedFile(self, observedFile): def processFile(self, fileInfo):
self.fileProcessingQueue.push(observedFile) self.fileProcessingQueue.push(fileInfo)
self.eventFlag.set() self.eventFlag.set()
def start(self): def start(self):
...@@ -77,24 +77,23 @@ class FileProcessingManager(threading.Thread,Singleton): ...@@ -77,24 +77,23 @@ class FileProcessingManager(threading.Thread,Singleton):
self.logger.debug('Starting file processing threads') self.logger.debug('Starting file processing threads')
for i in range(0, self.nProcessingThreads): for i in range(0, self.nProcessingThreads):
tName = 'FileProcessingThread-%s' % i tName = 'FileProcessingThread-%s' % i
t = FileProcessingThread(tName, self.eventFlag, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue, self.processedFileDict, self.unprocessedFileDict) t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue, self.processedFileDict, self.unprocessedFileDict)
t.start() t.start()
self.fileProcessingThreadDict[tName] = t self.fileProcessingThreadDict[tName] = t
finally: finally:
self.lock.release() self.lock.release()
def stop(self): def stop(self):
self.logger.debug('Stopping file processing threads')
for (tName, t) in self.fileProcessingThreadDict.items():
t.stop()
self.lock.acquire() self.lock.acquire()
try: try:
self.logger.debug('Stopping file processing threads')
for (tName, t) in self.fileProcessingThreadDict.items():
t.stop()
self.eventFlag.set() self.eventFlag.set()
for (tName, t) in self.fileProcessingThreadDict.items():
t.join()
finally: finally:
self.lock.release() self.lock.release()
for (tName, t) in self.fileProcessingThreadDict.items():
t.join()
def setEvent(self): def setEvent(self):
self.lock.acquire() self.lock.acquire()
...@@ -110,6 +109,9 @@ class FileProcessingManager(threading.Thread,Singleton): ...@@ -110,6 +109,9 @@ class FileProcessingManager(threading.Thread,Singleton):
finally: finally:
self.lock.release() self.lock.release()
def waitOnEvent(self, timeoutInSeconds=None):
self.eventFlag.wait(timeoutInSeconds)
#################################################################### ####################################################################
# Testing # Testing
......
...@@ -6,12 +6,14 @@ from dm.common.utility.loggingManager import LoggingManager ...@@ -6,12 +6,14 @@ from dm.common.utility.loggingManager import LoggingManager
class FileProcessingThread(threading.Thread): class FileProcessingThread(threading.Thread):
def __init__ (self, name, eventFlag, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict): THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0
def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.setName(name) self.setName(name)
self.exitFlag = False self.exitFlag = False
self.eventFlag = eventFlag self.fileProcessingManager = fileProcessingManager
self.fileProcessorDict = fileProcessorDict self.fileProcessorDict = fileProcessorDict
self.fileProcessorKeyList = fileProcessorKeyList self.fileProcessorKeyList = fileProcessorKeyList
self.fileProcessingQueue = fileProcessingQueue self.fileProcessingQueue = fileProcessingQueue
...@@ -22,55 +24,58 @@ class FileProcessingThread(threading.Thread): ...@@ -22,55 +24,58 @@ class FileProcessingThread(threading.Thread):
def run(self): def run(self):
self.logger.debug('Starting thread: %s' % self.getName()) self.logger.debug('Starting thread: %s' % self.getName())
while True: while True:
self.fileProcessingManager.clearEvent()
if self.exitFlag: if self.exitFlag:
self.logger.debug('Exit flag set, %s done' % self.getName()) self.logger.debug('Exit flag is set')
break break
while True: while True:
file = self.fileProcessingQueue.pop() fileInfo = self.fileProcessingQueue.pop()
if file is None: if fileInfo is None:
break break
filePath = file.getFilePath() filePath = fileInfo.get('filePath')
daqPath = file.getDaqPath()
experiment = file.getExperiment()
try: try:
for processorKey in self.fileProcessorKeyList: for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey) processor = self.fileProcessorDict.get(processorKey)
processorName = processor.__class__.__name__ processorName = processor.__class__.__name__
fileProcessedByDict = file.get('processedByDict', {}) fileProcessedByDict = fileInfo.get('processedByDict', {})
fileInfo['processedByDict'] = fileProcessedByDict
if fileProcessedByDict.has_key(processorName): if fileProcessedByDict.has_key(processorName):
self.logger.debug('%s has already been processed by %s ' % (file, processorName)) self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName))
continue continue
self.logger.debug('%s is about to process file %s ' % (processorName, file)) self.logger.debug('%s is about to process file %s ' % (processorName, fileInfo))
try: try:
processor.processFile(filePath, daqPath, experiment) processor.processFile(fileInfo)
fileProcessedByDict[processorName] = True fileProcessedByDict[processorName] = True
file['processedByDict'] = fileProcessedByDict self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
self.logger.debug('%s processed file %s ' % (processorName, file))
except Exception, ex: except Exception, ex:
self.logger.exception(ex) self.logger.exception(ex)
self.logger.debug('%s processing failed for file %s ' % (processorName, file)) self.logger.debug('%s processing failed for file at path %s ' % (processorName, filePath))
fileProcessingDict = file.get('processingDict', {}) fileProcessingDict = fileInfo.get('processingDict', {})
file['processingDict'] = fileProcessingDict fileInfo['processingDict'] = fileProcessingDict
processorDict = fileProcessingDict.get(processorName, {}) processorDict = fileProcessingDict.get(processorName, {})
processorDict['lastError'] = ex fileProcessingDict[processorName] = processorDict
processorDict['lastError'] = str(ex)
nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries())
self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, file, nRetriesLeft)) self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1 processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
if nRetriesLeft <= 0: if nRetriesLeft <= 0:
self.logger.debug('No more %s retries left for file %s' % (processorName, file)) self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo))
self.unprocessedFileDict[filePath] = file self.unprocessedFileDict[filePath] = fileInfo
else: else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, file, retryWaitPeriod)) self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
self.fileProcessingQueue.push(file, retryWaitPeriod) self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
except Exception, ex: except Exception, ex:
self.logger.exception(ex) self.logger.exception(ex)
self.eventFlag.wait() self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS)
self.logger.debug('%s is done' % self.getName())
def stop(self): def stop(self):
self.exitFlag = True self.exitFlag = True
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment