Skip to content
Snippets Groups Projects
Commit 556be87d authored by sveseli's avatar sveseli
Browse files

adding new workflow related objects

parent 7ea9478a
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
from dmObject import DmObject
class ChildProcess(DmObject):
DEFAULT_KEY_LIST = [ 'command', 'exitStatus', 'stdErr', 'stdOut', 'workingDir', 'childProcessNumber', 'stageId', 'status' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import threading
import time
from dm.common.constants import dmProcessingStatus
from dm.common.utility.threadingUtility import ThreadingUtility
from dmObject import DmObject
class ProcessingJob(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'owner', 'experimentName', 'status', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
self.lock = threading.RLock()
self.childProcessEvent = None
@ThreadingUtility.synchronize
def childProcessQueued(self, childProcess):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
childProcess['submitTime'] = time.time()
stageId = childProcess.get('stageId')
childProcessNumber = childProcess.get('childProcessNumber', 0)
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) + 1
workflowStage['childProcesses'][childProcessNumber] = childProcess
@ThreadingUtility.synchronize
def childProcessStarted(self, childProcess):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
childProcess['startTime'] = time.time()
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) - 1
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) + 1
@ThreadingUtility.synchronize
def childProcessCompleted(self, childProcess, processInfo):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
childProcess['endTime'] = time.time()
childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime']
for key in ['exitStatus', 'stdOut', 'stdErr']:
if processInfo.has_key(key):
childProcess[key] = processInfo.get(key)
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1
workflowStage['nCompletedChildProcesses'] = workflowStage.get('nCompletedChildProcesses', 0) + 1
if self.childProcessEvent:
self.childProcessEvent.set()
@ThreadingUtility.synchronize
def childProcessFailed(self, childProcess, processInfo={}, errorMessage=None):
self['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
childProcess['endTime'] = time.time()
childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime']
for key in ['exitStatus', 'stdOut', 'stdErr']:
if processInfo.has_key(key):
childProcess[key] = processInfo.get(key)
if errorMessage:
childProcess['errorMessage'] = errorMessage
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1
workflowStage['nFailedChildProcesses'] = workflowStage.get('nFailedChildProcesses', 0) + 1
if self.childProcessEvent:
self.childProcessEvent.set()
#!/usr/bin/env python
from dmObject import DmObject
class Workflow(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'owner' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment