Newer
Older
import threading
from dm.common.constants import dmProcessingStatus
from dm.common.utility.dictUtility import DictUtility
from dm.common.utility.timeUtility import TimeUtility
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ]
def __init__(self, dict={}):
self.lock = threading.RLock()
self.originalKeys = dict.keys()
def fileAdded(self, filePath):
self.lock.acquire()
try:
self['nFiles'] = self.get('nFiles', 0) + 1
finally:
self.lock.release()
def fileProcessed(self, filePath, processingEndTime):
self.lock.acquire()
try:
self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1
lastFileProcessedTime = self.get('lastFileProcessedTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessedTime:
self['lastFileProcessed'] = filePath
self['lastFileProcessedTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingError(self, filePath, processingError, processingEndTime):
self.lock.acquire()
try:

sveseli
committed
# file can be processed multiple times, keep only the last error
self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1
processingErrors = self.get('processingErrors', {})

sveseli
committed
processingErrors[filePath] = processingError
self['processingErrors'] = processingErrors
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
self['lastFileProcessingErrorTime'] = processingEndTime
finally:
self.lock.release()
def updateStatus(self):
daqStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING)
if daqStatus in dmProcessingStatus.DM_INACTIVE_PROCESSING_STATUS_LIST:
nFiles = self.get('nFiles', 0)
nProcessedFiles = self.get('nProcessedFiles', 0)
nProcessingErrors = self.get('nProcessingErrors', 0)
processingErrors = self.get('processingErrors', {})
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nCompletedFiles'] = nCompletedFiles
nWaitingFiles = nFiles-nCompletedFiles
self['nWaitingFiles'] = nWaitingFiles
startTime = self.get('startTime', now)
runTime = now - startTime
endTime = None
percentageComplete = 100.0
percentageProcessed = 100.0
percentageProcessingErrors = 0.0
if nFiles > 0:
percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0
percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0
percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0
self['percentageComplete'] = '%.2f' % percentageComplete
self['percentageProcessed'] = '%.2f' % percentageProcessed
self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
if self.get('endTime'):
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FINALIZING
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
if nProcessingErrors:
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime')
lastFileProcessedTime = self.get('lastFileProcessedTime')
endTime = lastFileProcessedTime
if not endTime or lastFileProcessingErrorTime > endTime:
endTime = lastFileProcessingErrorTime
if self.get('endTime') > endTime:
endTime = self.get('endTime')
if endTime:
runTime = endTime - startTime
self['endTime'] = endTime
self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
self['runTime'] = runTime
self['status'] = daqStatus
def toDictWithOriginalKeys(self):
dict = {}
for key in self.originalKeys:
if self.has_key(key):
dict[key] = self.get(key)
return dict