Skip to content
Snippets Groups Projects
Commit b6405e0b authored by sveseli's avatar sveseli
Browse files

monitoring fixes from trunk

parent 04d808b7
No related branches found
No related tags found
No related merge requests found
...@@ -36,6 +36,7 @@ class FileProcessingThread(threading.Thread): ...@@ -36,6 +36,7 @@ class FileProcessingThread(threading.Thread):
self.logger.debug('File %s processing is cancelled' % (filePath)) self.logger.debug('File %s processing is cancelled' % (filePath))
endProcessingTime = time.time() endProcessingTime = time.time()
statusMonitor.fileProcessingCancelled(filePath, endProcessingTime) statusMonitor.fileProcessingCancelled(filePath, endProcessingTime)
statusMonitor.updateStatus()
return return
self.logger.debug('Starting to process file %s' % filePath) self.logger.debug('Starting to process file %s' % filePath)
startProcessingTime = fileInfo.get('startProcessingTime', time.time()) startProcessingTime = fileInfo.get('startProcessingTime', time.time())
...@@ -67,6 +68,7 @@ class FileProcessingThread(threading.Thread): ...@@ -67,6 +68,7 @@ class FileProcessingThread(threading.Thread):
endProcessingTime = time.time() endProcessingTime = time.time()
if statusMonitor: if statusMonitor:
statusMonitor.fileProcessed(filePath, endProcessingTime) statusMonitor.fileProcessed(filePath, endProcessingTime)
statusMonitor.updateStatus()
except Exception, ex: except Exception, ex:
self.logger.exception(ex) self.logger.exception(ex)
processingError = '%s processing error: %s' % (processorName, str(ex)) processingError = '%s processing error: %s' % (processorName, str(ex))
...@@ -84,6 +86,7 @@ class FileProcessingThread(threading.Thread): ...@@ -84,6 +86,7 @@ class FileProcessingThread(threading.Thread):
endProcessingTime = time.time() endProcessingTime = time.time()
if statusMonitor: if statusMonitor:
statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime) statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime)
statusMonitor.updateStatus()
self.logger.debug('No more %s retries left for file %s, remaining plugins will not process it' % (processorName, filePath)) self.logger.debug('No more %s retries left for file %s, remaining plugins will not process it' % (processorName, filePath))
return return
else: else:
......
...@@ -108,9 +108,11 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -108,9 +108,11 @@ class FileSystemObserver(threading.Thread,Singleton):
observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys() observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys()
observedFile['statusMonitor'] = daqInfo observedFile['statusMonitor'] = daqInfo
self.observedFileMap[filePath] = observedFile
self.logger.debug('New observed file: %s', filePath)
else:
self.logger.debug('Observed file updated: %s', filePath)
observedFile.setLastUpdateTimeToNow() observedFile.setLastUpdateTimeToNow()
self.observedFileMap[filePath] = observedFile
self.logger.debug('Observed file updated: %s', filePath)
@ThreadingUtility.synchronize @ThreadingUtility.synchronize
def checkObservedFilesForProcessing(self): def checkObservedFilesForProcessing(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment