#!/usr/bin/env python import copy import time import threading from dmObject import DmObject from dm.common.constants import dmProcessingStatus from dm.common.utility.dictUtility import DictUtility from dm.common.utility.timeUtility import TimeUtility class DaqInfo(DmObject): DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ] def __init__(self, dict={}): DmObject.__init__(self, dict) self.lock = threading.RLock() self.originalKeys = dict.keys() def fileAdded(self, filePath): self.lock.acquire() try: self['nFiles'] = self.get('nFiles', 0) + 1 finally: self.lock.release() def fileProcessed(self, filePath, processingEndTime): self.lock.acquire() try: self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1 lastFileProcessedTime = self.get('lastFileProcessedTime', 0) if processingEndTime is not None and processingEndTime > lastFileProcessedTime: self['lastFileProcessed'] = filePath self['lastFileProcessedTime'] = processingEndTime finally: self.lock.release() def fileProcessingError(self, filePath, processingError, processingEndTime): self.lock.acquire() try: # file can be processed multiple times, keep only the last error self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1 processingErrors = self.get('processingErrors', {}) processingErrors[filePath] = processingError self['processingErrors'] = processingErrors lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0) if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime: self['lastFileProcessingErrorTime'] = processingEndTime finally: self.lock.release() def updateStatus(self): now = time.time() daqStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING) if daqStatus in dmProcessingStatus.DM_INACTIVE_PROCESSING_STATUS_LIST: return nFiles = self.get('nFiles', 0) nProcessedFiles = self.get('nProcessedFiles', 0) nProcessingErrors = self.get('nProcessingErrors', 0) processingErrors = self.get('processingErrors', {}) nCompletedFiles = nProcessedFiles+nProcessingErrors self['nCompletedFiles'] = nCompletedFiles nWaitingFiles = nFiles-nCompletedFiles self['nWaitingFiles'] = nWaitingFiles startTime = self.get('startTime', now) runTime = now - startTime endTime = None percentageComplete = 100.0 percentageProcessed = 100.0 percentageProcessingErrors = 0.0 if nFiles > 0: percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0 percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0 percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0 self['percentageComplete'] = '%.2f' % percentageComplete self['percentageProcessed'] = '%.2f' % percentageProcessed self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors if self.get('endTime'): daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FINALIZING if nCompletedFiles >= nFiles: daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_DONE if nProcessingErrors: daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime') lastFileProcessedTime = self.get('lastFileProcessedTime') endTime = lastFileProcessedTime if not endTime or lastFileProcessingErrorTime > endTime: endTime = lastFileProcessingErrorTime if self.get('endTime') > endTime: endTime = self.get('endTime') if endTime: runTime = endTime - startTime self['endTime'] = endTime self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime) self['runTime'] = runTime self['status'] = daqStatus def toDictWithOriginalKeys(self): dict = {} for key in self.originalKeys: if self.has_key(key): dict[key] = self.get(key) return dict