Skip to content
Snippets Groups Projects
Commit 54889c10 authored by sveseli's avatar sveseli
Browse files

fix accounting problem with daqs where multiple processing of a single file can mess up daq info

parent 3ffbc522
No related branches found
No related tags found
No related merge requests found
...@@ -40,10 +40,11 @@ class DaqInfo(DmObject): ...@@ -40,10 +40,11 @@ class DaqInfo(DmObject):
def fileProcessingError(self, filePath, processingError, processingEndTime): def fileProcessingError(self, filePath, processingError, processingEndTime):
self.lock.acquire() self.lock.acquire()
try: try:
# file can be processed multiple times, keep only the last error
self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1
processingErrors = self.get('processingErrors', {}) processingErrors = self.get('processingErrors', {})
processingErrors[filePath] = processingError processingErrors[filePath] = processingError
self['processingErrors'] = processingErrors self['processingErrors'] = processingErrors
self['nProcessingErrors'] = len(processingErrors)
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0) lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime: if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
self['lastFileProcessingErrorTime'] = processingEndTime self['lastFileProcessingErrorTime'] = processingEndTime
......
...@@ -30,10 +30,10 @@ class UploadInfo(DmObject): ...@@ -30,10 +30,10 @@ class UploadInfo(DmObject):
def fileProcessingError(self, filePath, processingError, processingEndTime): def fileProcessingError(self, filePath, processingError, processingEndTime):
self.lock.acquire() self.lock.acquire()
try: try:
self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1
processingErrors = self.get('processingErrors', {}) processingErrors = self.get('processingErrors', {})
processingErrors[filePath] = processingError processingErrors[filePath] = processingError
self['processingErrors'] = processingErrors self['processingErrors'] = processingErrors
self['nProcessingErrors'] = len(processingErrors)
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0) lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime: if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
self['lastFileProcessingErrorTime'] = processingEndTime self['lastFileProcessingErrorTime'] = processingEndTime
......
...@@ -92,7 +92,7 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -92,7 +92,7 @@ class FileSystemObserver(threading.Thread,Singleton):
# No daq info, ignore # No daq info, ignore
if not daqInfo: if not daqInfo:
self.logger.debug('No daq for data directory %s and experiment %s, file path %s will not be processed' % (dataDirectoy, experimentName, experimentfilePath)) self.logger.debug('No daq for data directory %s and experiment %s, file path %s will not be processed' % (dataDirectory, experimentName, experimentfilePath))
return return
# Do not process hidden files unless requested # Do not process hidden files unless requested
...@@ -102,16 +102,17 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -102,16 +102,17 @@ class FileSystemObserver(threading.Thread,Singleton):
self.logger.debug('File path %s is hidden file, will not process it' % filePath) self.logger.debug('File path %s is hidden file, will not process it' % filePath)
return return
daqId = daqInfo['id']
observedFile = self.observedFileMap.get(filePath) observedFile = self.observedFileMap.get(filePath)
if not observedFile: if not observedFile:
daqInfo.fileAdded(filePath)
observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys() observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys()
observedFile['statusMonitor'] = daqInfo observedFile['statusMonitor'] = daqInfo
self.observedFileMap[filePath] = observedFile self.observedFileMap[filePath] = observedFile
self.logger.debug('New observed file: %s', filePath) self.logger.debug('New observed file: %s (daq id: %s)' % (filePath, daqId))
daqInfo.fileAdded(filePath)
else: else:
self.logger.debug('Observed file updated: %s', filePath) self.logger.debug('Observed file updated: %s (daq id: %s)' % (filePath, daqId))
observedFile.setLastUpdateTimeToNow() observedFile.setLastUpdateTimeToNow()
@ThreadingUtility.synchronize @ThreadingUtility.synchronize
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment