From 116fc336ffc7c487f653cd1f67b27765f69c4ae5 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Tue, 2 Feb 2016 18:43:26 +0000 Subject: [PATCH] fixes for monitorint errors --- src/python/dm/common/objects/daqInfo.py | 8 ++++---- src/python/dm/common/objects/uploadInfo.py | 10 +++++----- .../dm/common/processing/fileProcessingThread.py | 9 +++++++-- .../service/impl/experimentSessionControllerImpl.py | 5 +++-- .../daq_web_service/service/impl/fileSystemObserver.py | 2 ++ 5 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index a657e033..c355cd03 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -25,12 +25,12 @@ class DaqInfo(DmObject): nProcessedFiles = 0 nProcessingErrors = 0 processingErrors = {} - for (filePath,uploadFileInfo) in fileDict.items(): - if uploadFileInfo.get('processed'): + for (filePath,fileProcessingInfo) in fileDict.items(): + if fileProcessingInfo.get('processed'): nProcessedFiles += 1 - elif uploadFileInfo.get('processingError'): + elif fileProcessingInfo.get('processingError'): nProcessingErrors += 1 - processingErrors[filePath] = uploadFileInfo.get('processingError') + processingErrors[filePath] = fileProcessingInfo.get('processingError') if len(processingErrors): self['processingErrors'] = processingErrors diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index 1c54ca9c..e00a44a6 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -24,14 +24,14 @@ class UploadInfo(DmObject): nProcessingErrors = 0 processingErrors = {} endTime = 0 - for (filePath,uploadFileInfo) in fileDict.items(): - if uploadFileInfo.get('processed'): + for (filePath,fileProcessingInfo) in fileDict.items(): + if fileProcessingInfo.get('processed'): nProcessedFiles += 1 - elif uploadFileInfo.get('processingError'): + elif fileProcessingInfo.get('processingError'): nProcessingErrors += 1 - processingErrors[filePath] = uploadFileInfo.get('processingError') + processingErrors[filePath] = fileProcessingInfo.get('processingError') - endProcessingTime = uploadFileInfo.get('endProcessingTime') + endProcessingTime = fileProcessingInfo.get('endProcessingTime') if endProcessingTime is not None and endProcessingTime > endTime: endTime = endProcessingTime if len(processingErrors): diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index e546cd9e..9a50ae9e 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -38,6 +38,8 @@ class FileProcessingThread(threading.Thread): try: fileInfo['startProcessingTime'] = time.time() + processingInfo = fileInfo.get('processingInfo') + processingInfo['startProcessingTime'] = fileInfo['startProcessingTime'] processorNumber = 0 nProcessors = len(self.fileProcessorKeyList) for processorKey in self.fileProcessorKeyList: @@ -59,6 +61,8 @@ class FileProcessingThread(threading.Thread): if processorNumber == nProcessors: self.logger.debug('File %s processing is complete' % (filePath)) fileInfo['endProcessingTime'] = time.time() + processingInfo['endProcessingTime'] = fileInfo['endProcessingTime'] + processingInfo['processed'] = True except Exception, ex: self.logger.exception(ex) errorMsg = '%s processing error: %s' % (processorName, str(ex)) @@ -73,10 +77,11 @@ class FileProcessingThread(threading.Thread): self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft)) processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1 if nRetriesLeft <= 0: - self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo)) self.unprocessedFileDict[filePath] = fileInfo - fileInfo['processingError'] = errorMsg fileInfo['endProcessingTime'] = time.time() + processingInfo['endProcessingTime'] = fileInfo['endProcessingTime'] + processingInfo['processingError'] = errorMsg + self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo)) else: retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index cc91ed87..0bf7ff67 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -114,8 +114,9 @@ class ExperimentSessionControllerImpl(DmObjectManager): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId - fileUploadInfo = { 'processed' : False } - fileDict[filePath] = fileUploadInfo + fileProcessingInfo = { 'processed' : False } + fileDict[filePath] = fileProcessingInfo + fileInfo['processingInfo'] = fileProcessingInfo try: fileProcessingManager.processFile(fileInfo) except Exception, ex: diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index 89c8df97..90b0e099 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -96,6 +96,8 @@ class FileSystemObserver(threading.Thread,Singleton): return observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)) + fileProcessingInfo = { 'processed' : False } + observedFile['processingInfo'] = fileProcessingInfo observedFile.setLastUpdateTimeToNow() if daqInfo: daqFileDict = daqInfo['fileDict'] -- GitLab