Forked from
DM / dm-docs
261 commits behind, 854 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
daqInfo.py 5.52 KiB
#!/usr/bin/env python
import copy
import time
import threading
from dmObject import DmObject
from dm.common.constants import dmProcessingStatus
from dm.common.utility.dictUtility import DictUtility
from dm.common.utility.timeUtility import TimeUtility
class DaqInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
self.lock = threading.RLock()
self.originalKeys = dict.keys()
def fileAdded(self, filePath):
self.lock.acquire()
try:
self['nFiles'] = self.get('nFiles', 0) + 1
finally:
self.lock.release()
def fileProcessed(self, filePath, processingEndTime):
self.lock.acquire()
try:
self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1
lastFileProcessedTime = self.get('lastFileProcessedTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessedTime:
self['lastFileProcessed'] = filePath
self['lastFileProcessedTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingError(self, filePath, processingError, processingEndTime):
self.lock.acquire()
try:
# file can be processed multiple times, keep only the last error
self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1
processingErrors = self.get('processingErrors', {})
processingErrors[filePath] = processingError
self['processingErrors'] = processingErrors
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
self['lastFileProcessingErrorTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingSkipped(self, processorName, filePath, processingError, processingEndTime):
self.lock.acquire()
try:
pluginStatsDict = self.get('pluginStats', {})
self['pluginStats'] = pluginStatsDict
pluginStats = pluginStatsDict.get(processorName, {})
pluginStatsDict[processorName] = pluginStats
pluginStats['nSkippedFiles'] = pluginStats.get('nSkippedFiles', 0) + 1
if processingError:
processingErrors = pluginStats.get('processingErrors', {})
processingErrors[filePath] = processingError
pluginStats['processingErrors'] = processingErrors
lastFileSkippedTime = pluginStats.get('lastFileSkippedTime', 0)
if processingEndTime is not None and processingEndTime > lastFileSkippedTime:
pluginStats['lastFileSkippedTime'] = processingEndTime
finally:
self.lock.release()
def updateStatus(self):
now = time.time()
daqStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING)
if daqStatus in dmProcessingStatus.DM_INACTIVE_PROCESSING_STATUS_LIST:
return
nFiles = self.get('nFiles', 0)
nProcessedFiles = self.get('nProcessedFiles', 0)
nProcessingErrors = self.get('nProcessingErrors', 0)
processingErrors = self.get('processingErrors', {})
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nCompletedFiles'] = nCompletedFiles
nWaitingFiles = nFiles-nCompletedFiles
self['nWaitingFiles'] = nWaitingFiles
startTime = self.get('startTime', now)
runTime = now - startTime
endTime = None
percentageComplete = 100.0
percentageProcessed = 100.0
percentageProcessingErrors = 0.0
if nFiles > 0:
percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0
percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0
percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0
self['percentageComplete'] = '%.2f' % percentageComplete
self['percentageProcessed'] = '%.2f' % percentageProcessed
self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
if self.get('endTime'):
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FINALIZING
if nCompletedFiles >= nFiles:
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
if nProcessingErrors:
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime')
lastFileProcessedTime = self.get('lastFileProcessedTime')
endTime = lastFileProcessedTime
if not endTime or lastFileProcessingErrorTime > endTime:
endTime = lastFileProcessingErrorTime
if self.get('endTime') > endTime:
endTime = self.get('endTime')
if endTime:
runTime = endTime - startTime
self['endTime'] = endTime
self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
self['runTime'] = runTime
self['status'] = daqStatus
def toDictWithOriginalKeys(self):
dict = {}
for key in self.originalKeys:
if self.has_key(key):
dict[key] = self.get(key)
return dict