From 54889c100222e18b30fe4f01e900594e53c6c0bf Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Wed, 26 Oct 2016 03:33:42 +0000 Subject: [PATCH] fix accounting problem with daqs where multiple processing of a single file can mess up daq info --- src/python/dm/common/objects/daqInfo.py | 5 +++-- src/python/dm/common/objects/uploadInfo.py | 2 +- .../daq_web_service/service/impl/fileSystemObserver.py | 9 +++++---- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index b45d6cc5..660568de 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -40,10 +40,11 @@ class DaqInfo(DmObject): def fileProcessingError(self, filePath, processingError, processingEndTime): self.lock.acquire() try: + # file can be processed multiple times, keep only the last error + self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1 processingErrors = self.get('processingErrors', {}) - processingErrors[filePath] = processingError + processingErrors[filePath] = processingError self['processingErrors'] = processingErrors - self['nProcessingErrors'] = len(processingErrors) lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0) if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime: self['lastFileProcessingErrorTime'] = processingEndTime diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index 5fd41280..fc94a208 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -30,10 +30,10 @@ class UploadInfo(DmObject): def fileProcessingError(self, filePath, processingError, processingEndTime): self.lock.acquire() try: + self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1 processingErrors = self.get('processingErrors', {}) processingErrors[filePath] = processingError self['processingErrors'] = processingErrors - self['nProcessingErrors'] = len(processingErrors) lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0) if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime: self['lastFileProcessingErrorTime'] = processingEndTime diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index 74ef566b..6555ada2 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -92,7 +92,7 @@ class FileSystemObserver(threading.Thread,Singleton): # No daq info, ignore if not daqInfo: - self.logger.debug('No daq for data directory %s and experiment %s, file path %s will not be processed' % (dataDirectoy, experimentName, experimentfilePath)) + self.logger.debug('No daq for data directory %s and experiment %s, file path %s will not be processed' % (dataDirectory, experimentName, experimentfilePath)) return # Do not process hidden files unless requested @@ -102,16 +102,17 @@ class FileSystemObserver(threading.Thread,Singleton): self.logger.debug('File path %s is hidden file, will not process it' % filePath) return + daqId = daqInfo['id'] observedFile = self.observedFileMap.get(filePath) if not observedFile: - daqInfo.fileAdded(filePath) observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys() observedFile['statusMonitor'] = daqInfo self.observedFileMap[filePath] = observedFile - self.logger.debug('New observed file: %s', filePath) + self.logger.debug('New observed file: %s (daq id: %s)' % (filePath, daqId)) + daqInfo.fileAdded(filePath) else: - self.logger.debug('Observed file updated: %s', filePath) + self.logger.debug('Observed file updated: %s (daq id: %s)' % (filePath, daqId)) observedFile.setLastUpdateTimeToNow() @ThreadingUtility.synchronize -- GitLab