From 1613da738b0fcbf5a731a6651d94aab9f70951c2 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Tue, 2 Feb 2016 03:31:41 +0000 Subject: [PATCH] added handling and reporting for processing errors --- src/python/dm/common/objects/daqInfo.py | 29 +++++++++++++++++-- src/python/dm/common/objects/uploadInfo.py | 25 +++++++++++++--- .../common/processing/fileProcessingThread.py | 4 ++- 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index c5d32214..cc7aaec0 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -20,13 +20,34 @@ class DaqInfo(DmObject): fileDict = self.get('fileDict') nFiles = len(fileDict) nProcessedFiles = 0 + nProcessingErrors = 0 + processingErrors = {} for (filePath,uploadFileInfo) in fileDict.items(): if uploadFileInfo.get('processed'): nProcessedFiles += 1 + elif uploadFileInfo.get('processingError'): + nProcessingErrors += 1 + processingErrors[filePath] = uploadFileInfo.get('processingError') + if len(processingErrors): + self['processingErrors'] = processingErrors + + # need to handle 'failed' uploads + nCompletedFiles = nProcessedFiles+nProcessingErrors self['nProcessedFiles'] = '%s' % (nProcessedFiles) + self['nProcessingErrors'] = '%s' % (nProcessingErrors) self['nFiles'] = '%s' % (nFiles) - # need to handle 'failed' uploads + percentageComplete = 100.0 + percentageProcessed = 100.0 + percentageProcessingErrors = 0.0 + if nFiles > 0: + percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0 + percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0 + percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0 + self['percentageComplete'] = '%.2f' % percentageComplete + self['percentageProcessed'] = '%.2f' % percentageProcessed + self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors + if self.get('endTime'): daqStatus = 'done' self['status'] = daqStatus @@ -38,14 +59,16 @@ class DaqInfo(DmObject): del dict[key] return dict - def scrub(self): + def scrub(self, includeFileDetails=False): # Remove redundant information daqInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict') + if not includeFileDetails: + return DaqInfo(daqInfo2) fileDict = self.get('fileDict', {}) fileDict2 = {} for (filePath,fileInfo) in fileDict.items(): fileInfo2 = {} - for key in ['processed', 'lastUpdateTime']: + for key in ['processed', 'lastUpdateTime', 'processingError']: if fileInfo.has_key(key): fileInfo2[key] = fileInfo[key] fileDict2[filePath] = fileInfo2 diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index d992e3c7..a317a903 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -18,30 +18,47 @@ class UploadInfo(DmObject): fileDict = self.get('fileDict') nFiles = len(fileDict) nProcessedFiles = 0 + nProcessingErrors = 0 + processingErrors = {} for (filePath,uploadFileInfo) in fileDict.items(): if uploadFileInfo.get('processed'): nProcessedFiles += 1 + elif uploadFileInfo.get('processingError'): + nProcessingErrors += 1 + processingErrors[filePath] = uploadFileInfo.get('processingError') + if len(processingErrors): + self['processingErrors'] = processingErrors # need to handle 'failed' uploads - if nProcessedFiles == nFiles: + nCompletedFiles = nProcessedFiles+nProcessingErrors + if nCompletedFiles == nFiles: uploadStatus = 'done' self['status'] = uploadStatus self['nProcessedFiles'] = '%s' % (nProcessedFiles) + self['nProcessingErrors'] = '%s' % (nProcessingErrors) self['nFiles'] = '%s' % (nFiles) percentageComplete = 100.0 + percentageProcessed = 100.0 + percentageProcessingErrors = 0.0 if nFiles > 0: - percentageComplete = float(nProcessedFiles)/float(nFiles)*100.0 + percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0 + percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0 + percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0 self['percentageComplete'] = '%.2f' % percentageComplete + self['percentageProcessed'] = '%.2f' % percentageProcessed + self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors - def scrub(self): + def scrub(self, includeFileDetails=False): # Remove redundant information uploadInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict') + if not includeFileDetails: + return UploadInfo(uploadInfo2) fileDict = self.get('fileDict', {}) fileDict2 = {} for (filePath,fileInfo) in fileDict.items(): fileInfo2 = {} - for key in ['processed', 'lastUpdateTime']: + for key in ['processed', 'lastUpdateTime', 'processingError']: if fileInfo.has_key(key): fileInfo2[key] = fileInfo[key] fileDict2[filePath] = fileInfo2 diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index 8a85fe6e..9db72007 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -53,7 +53,8 @@ class FileProcessingThread(threading.Thread): self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) except Exception, ex: self.logger.exception(ex) - self.logger.debug('%s processing failed for file at path %s ' % (processorName, filePath)) + errorMsg = '%s processing failed for file %s: %s' % (processorName, filePath, str(ex))) + self.logger.debug(errorMsg) fileProcessingDict = fileInfo.get('processingDict', {}) fileInfo['processingDict'] = fileProcessingDict processorDict = fileProcessingDict.get(processorName, {}) @@ -66,6 +67,7 @@ class FileProcessingThread(threading.Thread): if nRetriesLeft <= 0: self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo)) self.unprocessedFileDict[filePath] = fileInfo + fileInfo['processingError'] = errorMsg else: retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) -- GitLab