Newer
Older
import threading
from dm.common.constants import dmProcessingStatus
from dm.common.utility.dictUtility import DictUtility
from dm.common.utility.timeUtility import TimeUtility
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
self.lock = threading.RLock()
def fileProcessed(self, filePath, processingEndTime):
self.lock.acquire()
try:
self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1
lastFileProcessedTime = self.get('lastFileProcessedTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessedTime:
self['lastFileProcessed'] = filePath
self['lastFileProcessedTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingError(self, filePath, processingError, processingEndTime):
self.lock.acquire()
try:

sveseli
committed
self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1
processingErrors = self.get('processingErrors', {})
processingErrors[filePath] = processingError
self['processingErrors'] = processingErrors
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
self['lastFileProcessingErrorTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingCancelled(self, filePath, processingEndTime):
self.lock.acquire()
try:
self['nCancelledFiles'] = self.get('nCancelledFiles', 0) + 1
lastFileProcessingCancelledTime = self.get('lastFileProcessingCancelledTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingCancelledTime:
self['lastFileProcessingCancelledTime'] = processingEndTime
finally:
self.lock.release()
def uploadAborted(self, nCancelledFiles):
self.lock.acquire()
try:
self['nCancelledFiles'] = self.get('nCancelledFiles', 0) + nCancelledFiles
finally:
self.lock.release()
def updateStatus(self):
uploadStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING)
if uploadStatus in dmProcessingStatus.DM_INACTIVE_PROCESSING_STATUS_LIST:
nFiles = self.get('nFiles', 0)
nProcessedFiles = self.get('nProcessedFiles', 0)
nCancelledFiles = self.get('nCancelledFiles', 0)
nProcessingErrors = self.get('nProcessingErrors', 0)
processingErrors = self.get('processingErrors', {})
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nCompletedFiles'] = nCompletedFiles
nWaitingFiles = nFiles-nCompletedFiles-nCancelledFiles
self['nWaitingFiles'] = nWaitingFiles
startTime = self.get('startTime', now)
runTime = now - startTime
endTime = None
if nCompletedFiles == nFiles and uploadStatus != dmProcessingStatus.DM_PROCESSING_STATUS_PENDING:
uploadStatus = 'done'
if nProcessingErrors:
uploadStatus = 'failed'
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime')
lastFileProcessedTime = self.get('lastFileProcessedTime')
endTime = lastFileProcessedTime
if not endTime or lastFileProcessingErrorTime > endTime:
endTime = lastFileProcessingErrorTime
if nCancelledFiles > 0 and nCancelledFiles+nCompletedFiles == nFiles:
uploadStatus = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED
endTime = self.get('lastFileProcessingCancelledTime', now)
if endTime:
runTime = endTime - startTime
self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
self['runTime'] = runTime
self['status'] = uploadStatus
percentageComplete = 100.0
percentageProcessed = 100.0
percentageCancelled = 0.0
percentageProcessingErrors = 0.0
if nFiles > 0:
percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0
percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0
percentageCancelled = float(nCancelledFiles)/float(nFiles)*100.0
percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0
self['percentageComplete'] = '%.2f' % percentageComplete
self['percentageProcessed'] = '%.2f' % percentageProcessed
self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
if nCancelledFiles > 0:
self['percentageCancelled'] = '%.2f' % percentageCancelled