Skip to content
Snippets Groups Projects
Commit 1613da73 authored by sveseli's avatar sveseli
Browse files

added handling and reporting for processing errors

parent b8cdfa88
No related branches found
No related tags found
No related merge requests found
...@@ -20,13 +20,34 @@ class DaqInfo(DmObject): ...@@ -20,13 +20,34 @@ class DaqInfo(DmObject):
fileDict = self.get('fileDict') fileDict = self.get('fileDict')
nFiles = len(fileDict) nFiles = len(fileDict)
nProcessedFiles = 0 nProcessedFiles = 0
nProcessingErrors = 0
processingErrors = {}
for (filePath,uploadFileInfo) in fileDict.items(): for (filePath,uploadFileInfo) in fileDict.items():
if uploadFileInfo.get('processed'): if uploadFileInfo.get('processed'):
nProcessedFiles += 1 nProcessedFiles += 1
elif uploadFileInfo.get('processingError'):
nProcessingErrors += 1
processingErrors[filePath] = uploadFileInfo.get('processingError')
if len(processingErrors):
self['processingErrors'] = processingErrors
# need to handle 'failed' uploads
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nProcessedFiles'] = '%s' % (nProcessedFiles) self['nProcessedFiles'] = '%s' % (nProcessedFiles)
self['nProcessingErrors'] = '%s' % (nProcessingErrors)
self['nFiles'] = '%s' % (nFiles) self['nFiles'] = '%s' % (nFiles)
# need to handle 'failed' uploads percentageComplete = 100.0
percentageProcessed = 100.0
percentageProcessingErrors = 0.0
if nFiles > 0:
percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0
percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0
percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0
self['percentageComplete'] = '%.2f' % percentageComplete
self['percentageProcessed'] = '%.2f' % percentageProcessed
self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
if self.get('endTime'): if self.get('endTime'):
daqStatus = 'done' daqStatus = 'done'
self['status'] = daqStatus self['status'] = daqStatus
...@@ -38,14 +59,16 @@ class DaqInfo(DmObject): ...@@ -38,14 +59,16 @@ class DaqInfo(DmObject):
del dict[key] del dict[key]
return dict return dict
def scrub(self): def scrub(self, includeFileDetails=False):
# Remove redundant information # Remove redundant information
daqInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict') daqInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict')
if not includeFileDetails:
return DaqInfo(daqInfo2)
fileDict = self.get('fileDict', {}) fileDict = self.get('fileDict', {})
fileDict2 = {} fileDict2 = {}
for (filePath,fileInfo) in fileDict.items(): for (filePath,fileInfo) in fileDict.items():
fileInfo2 = {} fileInfo2 = {}
for key in ['processed', 'lastUpdateTime']: for key in ['processed', 'lastUpdateTime', 'processingError']:
if fileInfo.has_key(key): if fileInfo.has_key(key):
fileInfo2[key] = fileInfo[key] fileInfo2[key] = fileInfo[key]
fileDict2[filePath] = fileInfo2 fileDict2[filePath] = fileInfo2
......
...@@ -18,30 +18,47 @@ class UploadInfo(DmObject): ...@@ -18,30 +18,47 @@ class UploadInfo(DmObject):
fileDict = self.get('fileDict') fileDict = self.get('fileDict')
nFiles = len(fileDict) nFiles = len(fileDict)
nProcessedFiles = 0 nProcessedFiles = 0
nProcessingErrors = 0
processingErrors = {}
for (filePath,uploadFileInfo) in fileDict.items(): for (filePath,uploadFileInfo) in fileDict.items():
if uploadFileInfo.get('processed'): if uploadFileInfo.get('processed'):
nProcessedFiles += 1 nProcessedFiles += 1
elif uploadFileInfo.get('processingError'):
nProcessingErrors += 1
processingErrors[filePath] = uploadFileInfo.get('processingError')
if len(processingErrors):
self['processingErrors'] = processingErrors
# need to handle 'failed' uploads # need to handle 'failed' uploads
if nProcessedFiles == nFiles: nCompletedFiles = nProcessedFiles+nProcessingErrors
if nCompletedFiles == nFiles:
uploadStatus = 'done' uploadStatus = 'done'
self['status'] = uploadStatus self['status'] = uploadStatus
self['nProcessedFiles'] = '%s' % (nProcessedFiles) self['nProcessedFiles'] = '%s' % (nProcessedFiles)
self['nProcessingErrors'] = '%s' % (nProcessingErrors)
self['nFiles'] = '%s' % (nFiles) self['nFiles'] = '%s' % (nFiles)
percentageComplete = 100.0 percentageComplete = 100.0
percentageProcessed = 100.0
percentageProcessingErrors = 0.0
if nFiles > 0: if nFiles > 0:
percentageComplete = float(nProcessedFiles)/float(nFiles)*100.0 percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0
percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0
percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0
self['percentageComplete'] = '%.2f' % percentageComplete self['percentageComplete'] = '%.2f' % percentageComplete
self['percentageProcessed'] = '%.2f' % percentageProcessed
self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
def scrub(self): def scrub(self, includeFileDetails=False):
# Remove redundant information # Remove redundant information
uploadInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict') uploadInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict')
if not includeFileDetails:
return UploadInfo(uploadInfo2)
fileDict = self.get('fileDict', {}) fileDict = self.get('fileDict', {})
fileDict2 = {} fileDict2 = {}
for (filePath,fileInfo) in fileDict.items(): for (filePath,fileInfo) in fileDict.items():
fileInfo2 = {} fileInfo2 = {}
for key in ['processed', 'lastUpdateTime']: for key in ['processed', 'lastUpdateTime', 'processingError']:
if fileInfo.has_key(key): if fileInfo.has_key(key):
fileInfo2[key] = fileInfo[key] fileInfo2[key] = fileInfo[key]
fileDict2[filePath] = fileInfo2 fileDict2[filePath] = fileInfo2
......
...@@ -53,7 +53,8 @@ class FileProcessingThread(threading.Thread): ...@@ -53,7 +53,8 @@ class FileProcessingThread(threading.Thread):
self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
except Exception, ex: except Exception, ex:
self.logger.exception(ex) self.logger.exception(ex)
self.logger.debug('%s processing failed for file at path %s ' % (processorName, filePath)) errorMsg = '%s processing failed for file %s: %s' % (processorName, filePath, str(ex)))
self.logger.debug(errorMsg)
fileProcessingDict = fileInfo.get('processingDict', {}) fileProcessingDict = fileInfo.get('processingDict', {})
fileInfo['processingDict'] = fileProcessingDict fileInfo['processingDict'] = fileProcessingDict
processorDict = fileProcessingDict.get(processorName, {}) processorDict = fileProcessingDict.get(processorName, {})
...@@ -66,6 +67,7 @@ class FileProcessingThread(threading.Thread): ...@@ -66,6 +67,7 @@ class FileProcessingThread(threading.Thread):
if nRetriesLeft <= 0: if nRetriesLeft <= 0:
self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo)) self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo))
self.unprocessedFileDict[filePath] = fileInfo self.unprocessedFileDict[filePath] = fileInfo
fileInfo['processingError'] = errorMsg
else: else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment