Forked from
DM / dm-docs
261 commits behind, 833 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
processingJob.py 3.42 KiB
#!/usr/bin/env python
import threading
import time
from dm.common.constants import dmProcessingStatus
from dm.common.utility.threadingUtility import ThreadingUtility
from dmObject import DmObject
class ProcessingJob(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'owner', 'experimentName', 'status', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
self.lock = threading.RLock()
self.childProcessEvent = None
@ThreadingUtility.synchronize
def childProcessQueued(self, childProcess):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
childProcess['submitTime'] = time.time()
stageId = childProcess.get('stageId')
childProcessNumber = childProcess.get('childProcessNumber', 0)
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) + 1
workflowStage['childProcesses'][childProcessNumber] = childProcess
@ThreadingUtility.synchronize
def childProcessStarted(self, childProcess):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
childProcess['startTime'] = time.time()
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) - 1
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) + 1
@ThreadingUtility.synchronize
def childProcessCompleted(self, childProcess, processInfo):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
childProcess['endTime'] = time.time()
childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime']
for key in ['exitStatus', 'stdOut', 'stdErr']:
if processInfo.has_key(key):
childProcess[key] = processInfo.get(key)
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1
workflowStage['nCompletedChildProcesses'] = workflowStage.get('nCompletedChildProcesses', 0) + 1
if self.childProcessEvent:
self.childProcessEvent.set()
@ThreadingUtility.synchronize
def childProcessFailed(self, childProcess, processInfo={}, errorMessage=None):
self['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
childProcess['endTime'] = time.time()
childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime']
for key in ['exitStatus', 'stdOut', 'stdErr']:
if processInfo.has_key(key):
childProcess[key] = processInfo.get(key)
if errorMessage:
childProcess['errorMessage'] = errorMessage
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1
workflowStage['nFailedChildProcesses'] = workflowStage.get('nFailedChildProcesses', 0) + 1
if self.childProcessEvent:
self.childProcessEvent.set()