Skip to content
Snippets Groups Projects
Commit c103f7ad authored by sveseli's avatar sveseli
Browse files

more reporting functionality added

parent eb925f1c
No related branches found
No related tags found
No related merge requests found
...@@ -6,7 +6,8 @@ from dm.common.utility.dictUtility import DictUtility ...@@ -6,7 +6,8 @@ from dm.common.utility.dictUtility import DictUtility
class DaqInfo(DmObject): class DaqInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nFiles', 'startTimestamp', 'endTimestamp' ] DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ]
def __init__(self, dict={}): def __init__(self, dict={}):
DmObject.__init__(self, dict) DmObject.__init__(self, dict)
...@@ -50,6 +51,7 @@ class DaqInfo(DmObject): ...@@ -50,6 +51,7 @@ class DaqInfo(DmObject):
if self.get('endTime'): if self.get('endTime'):
daqStatus = 'done' daqStatus = 'done'
self['runTime'] = self.get('endTime') - self.get('startTime')
self['status'] = daqStatus self['status'] = daqStatus
def toDictWithOriginalKeys(self): def toDictWithOriginalKeys(self):
...@@ -61,7 +63,7 @@ class DaqInfo(DmObject): ...@@ -61,7 +63,7 @@ class DaqInfo(DmObject):
def scrub(self, includeFileDetails=False): def scrub(self, includeFileDetails=False):
# Remove redundant information # Remove redundant information
daqInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict') daqInfo2 = DictUtility.deepCopy(self.data, excludeKeys=['fileDict'])
if not includeFileDetails: if not includeFileDetails:
return DaqInfo(daqInfo2) return DaqInfo(daqInfo2)
fileDict = self.get('fileDict', {}) fileDict = self.get('fileDict', {})
......
#!/usr/bin/env python #!/usr/bin/env python
import time
from dmObject import DmObject from dmObject import DmObject
from dm.common.utility.dictUtility import DictUtility from dm.common.utility.dictUtility import DictUtility
from dm.common.utility.timeUtility import TimeUtility
class UploadInfo(DmObject): class UploadInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nFiles', 'percentageComplete', 'startTimestamp', 'endTimestamp' ] DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nFiles', 'percentageComplete', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ]
def __init__(self, dict={}): def __init__(self, dict={}):
DmObject.__init__(self, dict) DmObject.__init__(self, dict)
...@@ -20,12 +22,17 @@ class UploadInfo(DmObject): ...@@ -20,12 +22,17 @@ class UploadInfo(DmObject):
nProcessedFiles = 0 nProcessedFiles = 0
nProcessingErrors = 0 nProcessingErrors = 0
processingErrors = {} processingErrors = {}
endTime = 0
for (filePath,uploadFileInfo) in fileDict.items(): for (filePath,uploadFileInfo) in fileDict.items():
if uploadFileInfo.get('processed'): if uploadFileInfo.get('processed'):
nProcessedFiles += 1 nProcessedFiles += 1
elif uploadFileInfo.get('processingError'): elif uploadFileInfo.get('processingError'):
nProcessingErrors += 1 nProcessingErrors += 1
processingErrors[filePath] = uploadFileInfo.get('processingError') processingErrors[filePath] = uploadFileInfo.get('processingError')
endProcessingTime = uploadFileInfo.get('endProcessingTime')
if endProcessingTime is not None and endProcessingTime > endTime:
endTime = endProcessingTime
if len(processingErrors): if len(processingErrors):
self['processingErrors'] = processingErrors self['processingErrors'] = processingErrors
...@@ -33,6 +40,14 @@ class UploadInfo(DmObject): ...@@ -33,6 +40,14 @@ class UploadInfo(DmObject):
nCompletedFiles = nProcessedFiles+nProcessingErrors nCompletedFiles = nProcessedFiles+nProcessingErrors
if nCompletedFiles == nFiles: if nCompletedFiles == nFiles:
uploadStatus = 'done' uploadStatus = 'done'
if not endTime:
endTime = time.time()
self['endTime'] = endTime
self['endTimestamp'] = TimeUtility.formatLocalTimeStamp(endTime)
startTime = self.get('startTime')
if startTime:
runTime = endTime - startTime
self['runTime'] = runTime
self['status'] = uploadStatus self['status'] = uploadStatus
self['nProcessedFiles'] = '%s' % (nProcessedFiles) self['nProcessedFiles'] = '%s' % (nProcessedFiles)
self['nProcessingErrors'] = '%s' % (nProcessingErrors) self['nProcessingErrors'] = '%s' % (nProcessingErrors)
...@@ -51,7 +66,7 @@ class UploadInfo(DmObject): ...@@ -51,7 +66,7 @@ class UploadInfo(DmObject):
def scrub(self, includeFileDetails=False): def scrub(self, includeFileDetails=False):
# Remove redundant information # Remove redundant information
uploadInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict') uploadInfo2 = DictUtility.deepCopy(self.data, excludeKeys=['fileDict'])
if not includeFileDetails: if not includeFileDetails:
return UploadInfo(uploadInfo2) return UploadInfo(uploadInfo2)
fileDict = self.get('fileDict', {}) fileDict = self.get('fileDict', {})
......
#!/usr/bin/env python #!/usr/bin/env python
import threading import threading
import time
from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.loggingManager import LoggingManager
...@@ -36,7 +37,11 @@ class FileProcessingThread(threading.Thread): ...@@ -36,7 +37,11 @@ class FileProcessingThread(threading.Thread):
filePath = fileInfo.get('filePath') filePath = fileInfo.get('filePath')
try: try:
fileInfo['startProcessingTime'] = time.time()
processorNumber = 0
nProcessors = len(self.fileProcessorKeyList)
for processorKey in self.fileProcessorKeyList: for processorKey in self.fileProcessorKeyList:
processorNumber += 1
processor = self.fileProcessorDict.get(processorKey) processor = self.fileProcessorDict.get(processorKey)
processorName = processor.__class__.__name__ processorName = processor.__class__.__name__
fileProcessedByDict = fileInfo.get('processedByDict', {}) fileProcessedByDict = fileInfo.get('processedByDict', {})
...@@ -51,6 +56,9 @@ class FileProcessingThread(threading.Thread): ...@@ -51,6 +56,9 @@ class FileProcessingThread(threading.Thread):
processor.processFile(fileInfo) processor.processFile(fileInfo)
fileProcessedByDict[processorName] = True fileProcessedByDict[processorName] = True
self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
if processorNumber == nProcessors:
self.logger.debug('File %s processing is complete' % (filePath))
fileInfo['endProcessingTime'] = time.time()
except Exception, ex: except Exception, ex:
self.logger.exception(ex) self.logger.exception(ex)
errorMsg = '%s processing error: %s' % (processorName, str(ex)) errorMsg = '%s processing error: %s' % (processorName, str(ex))
...@@ -68,6 +76,7 @@ class FileProcessingThread(threading.Thread): ...@@ -68,6 +76,7 @@ class FileProcessingThread(threading.Thread):
self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo)) self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo))
self.unprocessedFileDict[filePath] = fileInfo self.unprocessedFileDict[filePath] = fileInfo
fileInfo['processingError'] = errorMsg fileInfo['processingError'] = errorMsg
fileInfo['endProcessingTime'] = time.time()
else: else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment