#!/usr/bin/env python import threading import time from dm.common.constants import dmProcessingStatus from dm.common.utility.threadingUtility import ThreadingUtility from dmObject import DmObject class ProcessingJob(DmObject): DEFAULT_KEY_LIST = [ 'id', 'name', 'owner', 'experimentName', 'status', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ] def __init__(self, dict): DmObject.__init__(self, dict) self.lock = threading.RLock() self.childProcessEvent = None @ThreadingUtility.synchronize def childProcessQueued(self, childProcess): childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING childProcess['submitTime'] = time.time() stageId = childProcess.get('stageId') childProcessNumber = childProcess.get('childProcessNumber', 0) workflowStage = self['workflow']['stages'][stageId] workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) + 1 workflowStage['childProcesses'][childProcessNumber] = childProcess @ThreadingUtility.synchronize def childProcessStarted(self, childProcess): childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING childProcess['startTime'] = time.time() stageId = childProcess.get('stageId') workflowStage = self['workflow']['stages'][stageId] workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) - 1 workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) + 1 @ThreadingUtility.synchronize def childProcessCompleted(self, childProcess, processInfo): childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE childProcess['endTime'] = time.time() childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime'] for key in ['exitStatus', 'stdOut', 'stdErr']: if processInfo.has_key(key): childProcess[key] = processInfo.get(key) stageId = childProcess.get('stageId') workflowStage = self['workflow']['stages'][stageId] workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1 workflowStage['nCompletedChildProcesses'] = workflowStage.get('nCompletedChildProcesses', 0) + 1 if self.childProcessEvent: self.childProcessEvent.set() @ThreadingUtility.synchronize def childProcessFailed(self, childProcess, processInfo={}, errorMessage=None): self['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED childProcess['endTime'] = time.time() childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime'] for key in ['exitStatus', 'stdOut', 'stdErr']: if processInfo.has_key(key): childProcess[key] = processInfo.get(key) if errorMessage: childProcess['errorMessage'] = errorMessage stageId = childProcess.get('stageId') workflowStage = self['workflow']['stages'][stageId] workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1 workflowStage['nFailedChildProcesses'] = workflowStage.get('nFailedChildProcesses', 0) + 1 if self.childProcessEvent: self.childProcessEvent.set()