From c103f7ade13bd3ffa47026e99ab5c77a78ca9e8e Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Tue, 2 Feb 2016 05:21:16 +0000 Subject: [PATCH] more reporting functionality added --- src/python/dm/common/objects/daqInfo.py | 6 ++++-- src/python/dm/common/objects/uploadInfo.py | 19 +++++++++++++++++-- .../common/processing/fileProcessingThread.py | 9 +++++++++ 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index cc7aaec0..0e3e2a9e 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -6,7 +6,8 @@ from dm.common.utility.dictUtility import DictUtility class DaqInfo(DmObject): - DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nFiles', 'startTimestamp', 'endTimestamp' ] + DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ] + def __init__(self, dict={}): DmObject.__init__(self, dict) @@ -50,6 +51,7 @@ class DaqInfo(DmObject): if self.get('endTime'): daqStatus = 'done' + self['runTime'] = self.get('endTime') - self.get('startTime') self['status'] = daqStatus def toDictWithOriginalKeys(self): @@ -61,7 +63,7 @@ class DaqInfo(DmObject): def scrub(self, includeFileDetails=False): # Remove redundant information - daqInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict') + daqInfo2 = DictUtility.deepCopy(self.data, excludeKeys=['fileDict']) if not includeFileDetails: return DaqInfo(daqInfo2) fileDict = self.get('fileDict', {}) diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index a317a903..06469ab7 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -1,11 +1,13 @@ #!/usr/bin/env python +import time from dmObject import DmObject from dm.common.utility.dictUtility import DictUtility +from dm.common.utility.timeUtility import TimeUtility class UploadInfo(DmObject): - DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nFiles', 'percentageComplete', 'startTimestamp', 'endTimestamp' ] + DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nFiles', 'percentageComplete', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ] def __init__(self, dict={}): DmObject.__init__(self, dict) @@ -20,12 +22,17 @@ class UploadInfo(DmObject): nProcessedFiles = 0 nProcessingErrors = 0 processingErrors = {} + endTime = 0 for (filePath,uploadFileInfo) in fileDict.items(): if uploadFileInfo.get('processed'): nProcessedFiles += 1 elif uploadFileInfo.get('processingError'): nProcessingErrors += 1 processingErrors[filePath] = uploadFileInfo.get('processingError') + + endProcessingTime = uploadFileInfo.get('endProcessingTime') + if endProcessingTime is not None and endProcessingTime > endTime: + endTime = endProcessingTime if len(processingErrors): self['processingErrors'] = processingErrors @@ -33,6 +40,14 @@ class UploadInfo(DmObject): nCompletedFiles = nProcessedFiles+nProcessingErrors if nCompletedFiles == nFiles: uploadStatus = 'done' + if not endTime: + endTime = time.time() + self['endTime'] = endTime + self['endTimestamp'] = TimeUtility.formatLocalTimeStamp(endTime) + startTime = self.get('startTime') + if startTime: + runTime = endTime - startTime + self['runTime'] = runTime self['status'] = uploadStatus self['nProcessedFiles'] = '%s' % (nProcessedFiles) self['nProcessingErrors'] = '%s' % (nProcessingErrors) @@ -51,7 +66,7 @@ class UploadInfo(DmObject): def scrub(self, includeFileDetails=False): # Remove redundant information - uploadInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict') + uploadInfo2 = DictUtility.deepCopy(self.data, excludeKeys=['fileDict']) if not includeFileDetails: return UploadInfo(uploadInfo2) fileDict = self.get('fileDict', {}) diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index 6f579cef..e546cd9e 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import threading +import time from dm.common.utility.loggingManager import LoggingManager @@ -36,7 +37,11 @@ class FileProcessingThread(threading.Thread): filePath = fileInfo.get('filePath') try: + fileInfo['startProcessingTime'] = time.time() + processorNumber = 0 + nProcessors = len(self.fileProcessorKeyList) for processorKey in self.fileProcessorKeyList: + processorNumber += 1 processor = self.fileProcessorDict.get(processorKey) processorName = processor.__class__.__name__ fileProcessedByDict = fileInfo.get('processedByDict', {}) @@ -51,6 +56,9 @@ class FileProcessingThread(threading.Thread): processor.processFile(fileInfo) fileProcessedByDict[processorName] = True self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) + if processorNumber == nProcessors: + self.logger.debug('File %s processing is complete' % (filePath)) + fileInfo['endProcessingTime'] = time.time() except Exception, ex: self.logger.exception(ex) errorMsg = '%s processing error: %s' % (processorName, str(ex)) @@ -68,6 +76,7 @@ class FileProcessingThread(threading.Thread): self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo)) self.unprocessedFileDict[filePath] = fileInfo fileInfo['processingError'] = errorMsg + fileInfo['endProcessingTime'] = time.time() else: retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) -- GitLab