From 556be87d40cd0fb55f94bd6100ae4c7f55a53417 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Wed, 22 Feb 2017 17:23:08 +0000 Subject: [PATCH] adding new workflow related objects --- src/python/dm/common/objects/childProcess.py | 11 +++ src/python/dm/common/objects/processingJob.py | 71 +++++++++++++++++++ src/python/dm/common/objects/workflow.py | 11 +++ 3 files changed, 93 insertions(+) create mode 100755 src/python/dm/common/objects/childProcess.py create mode 100755 src/python/dm/common/objects/processingJob.py create mode 100755 src/python/dm/common/objects/workflow.py diff --git a/src/python/dm/common/objects/childProcess.py b/src/python/dm/common/objects/childProcess.py new file mode 100755 index 00000000..972f6e6b --- /dev/null +++ b/src/python/dm/common/objects/childProcess.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python + +from dmObject import DmObject + +class ChildProcess(DmObject): + + DEFAULT_KEY_LIST = [ 'command', 'exitStatus', 'stdErr', 'stdOut', 'workingDir', 'childProcessNumber', 'stageId', 'status' ] + + def __init__(self, dict): + DmObject.__init__(self, dict) + diff --git a/src/python/dm/common/objects/processingJob.py b/src/python/dm/common/objects/processingJob.py new file mode 100755 index 00000000..d3d11c5e --- /dev/null +++ b/src/python/dm/common/objects/processingJob.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python + +import threading +import time +from dm.common.constants import dmProcessingStatus +from dm.common.utility.threadingUtility import ThreadingUtility +from dmObject import DmObject + +class ProcessingJob(DmObject): + + DEFAULT_KEY_LIST = [ 'id', 'name', 'owner', 'experimentName', 'status', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ] + + def __init__(self, dict): + DmObject.__init__(self, dict) + self.lock = threading.RLock() + self.childProcessEvent = None + + @ThreadingUtility.synchronize + def childProcessQueued(self, childProcess): + childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING + childProcess['submitTime'] = time.time() + stageId = childProcess.get('stageId') + childProcessNumber = childProcess.get('childProcessNumber', 0) + workflowStage = self['workflow']['stages'][stageId] + workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) + 1 + workflowStage['childProcesses'][childProcessNumber] = childProcess + + @ThreadingUtility.synchronize + def childProcessStarted(self, childProcess): + childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING + childProcess['startTime'] = time.time() + stageId = childProcess.get('stageId') + workflowStage = self['workflow']['stages'][stageId] + workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) - 1 + workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) + 1 + + @ThreadingUtility.synchronize + def childProcessCompleted(self, childProcess, processInfo): + childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE + childProcess['endTime'] = time.time() + childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime'] + for key in ['exitStatus', 'stdOut', 'stdErr']: + if processInfo.has_key(key): + childProcess[key] = processInfo.get(key) + stageId = childProcess.get('stageId') + workflowStage = self['workflow']['stages'][stageId] + workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1 + workflowStage['nCompletedChildProcesses'] = workflowStage.get('nCompletedChildProcesses', 0) + 1 + if self.childProcessEvent: + self.childProcessEvent.set() + + + @ThreadingUtility.synchronize + def childProcessFailed(self, childProcess, processInfo={}, errorMessage=None): + self['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED + childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED + childProcess['endTime'] = time.time() + childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime'] + for key in ['exitStatus', 'stdOut', 'stdErr']: + if processInfo.has_key(key): + childProcess[key] = processInfo.get(key) + if errorMessage: + childProcess['errorMessage'] = errorMessage + + stageId = childProcess.get('stageId') + workflowStage = self['workflow']['stages'][stageId] + workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1 + workflowStage['nFailedChildProcesses'] = workflowStage.get('nFailedChildProcesses', 0) + 1 + if self.childProcessEvent: + self.childProcessEvent.set() + diff --git a/src/python/dm/common/objects/workflow.py b/src/python/dm/common/objects/workflow.py new file mode 100755 index 00000000..585135a0 --- /dev/null +++ b/src/python/dm/common/objects/workflow.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python + +from dmObject import DmObject + +class Workflow(DmObject): + + DEFAULT_KEY_LIST = [ 'id', 'name', 'owner' ] + + def __init__(self, dict): + DmObject.__init__(self, dict) + -- GitLab