diff --git a/src/python/dm/proc_web_service/__init__.py b/src/python/dm/proc_web_service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/python/dm/proc_web_service/api/__init__.py b/src/python/dm/proc_web_service/api/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/python/dm/proc_web_service/api/procRestApi.py b/src/python/dm/proc_web_service/api/procRestApi.py new file mode 100755 index 0000000000000000000000000000000000000000..5504a69ed617b2d518ac36f95854395075badb12 --- /dev/null +++ b/src/python/dm/proc_web_service/api/procRestApi.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python + +from dm.common.api.dmRestApi import DmRestApi +from dm.common.utility.configurationManager import ConfigurationManager + +class ProcRestApi(DmRestApi): + """ Base CAT DM REST api class. """ + + def __init__(self, username=None, password=None, host=None, port=None, protocol=None): + if host == None: + host = ConfigurationManager.getInstance().getProcWebServiceHost() + if port == None: + port = ConfigurationManager.getInstance().getProcWebServicePort() + DmRestApi.__init__(self, username, password, host, port, protocol) + +####################################################################### +# Testing. + +if __name__ == '__main__': + pass + + diff --git a/src/python/dm/proc_web_service/api/workflowRestApi.py b/src/python/dm/proc_web_service/api/workflowRestApi.py new file mode 100755 index 0000000000000000000000000000000000000000..bc3e1a34118e244b60a29966da669fb3e16e24fb --- /dev/null +++ b/src/python/dm/proc_web_service/api/workflowRestApi.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python + +import os +import urllib +import json + +from dm.common.utility.encoder import Encoder +from dm.common.exceptions.dmException import DmException +from dm.common.exceptions.invalidRequest import InvalidRequest +from dm.common.objects.workflow import Workflow +from procRestApi import ProcRestApi + +class WorkflowRestApi(ProcRestApi): + + def __init__(self, username=None, password=None, host=None, port=None, protocol=None): + ProcRestApi.__init__(self, username, password, host, port, protocol) + + @ProcRestApi.execute + def addWorkflow(self, fileInfo): + owner = fileInfo.get('owner') + if not owner: + raise InvalidRequest('Workflow metadata must contain owner key.') + experimentWorkflowPath = fileInfo.get('experimentWorkflowPath') + if not experimentWorkflowPath: + raise InvalidRequest('Workflow metadata must contain experimentWorkflowPath key.') + url = '%s/filesByExperimentAndPath/%s/%s' % (self.getContextRoot(), owner, Encoder.encode(experimentWorkflowPath)) + url += '?fileInfo=%s' % (Encoder.encode(json.dumps(fileInfo))) + responseData = self.sendSessionRequest(url=url, method='POST') + return Workflow(responseData) + + @ProcRestApi.execute + def updateWorkflow(self, fileInfo): + owner = fileInfo.get('owner') + if not owner: + raise InvalidRequest('Workflow metadata must contain owner key.') + experimentWorkflowPath = fileInfo.get('experimentWorkflowPath') + if not experimentWorkflowPath: + raise InvalidRequest('Workflow metadata must contain experimentWorkflowPath key.') + url = '%s/filesByExperimentAndPath/%s/%s' % (self.getContextRoot(), owner, Encoder.encode(experimentWorkflowPath)) + url += '?fileInfo=%s' % (Encoder.encode(json.dumps(fileInfo))) + responseData = self.sendSessionRequest(url=url, method='PUT') + return Workflow(responseData) + + @ProcRestApi.execute + def updateWorkflowById(self, fileInfo): + owner = fileInfo.get('owner') + if not owner: + raise InvalidRequest('Workflow metadata must contain owner key.') + id = fileInfo.get('id') + if not id: + raise InvalidRequest('Workflow metadata must contain id key.') + url = '%s/filesByExperimentAndId/%s/%s' % (self.getContextRoot(), owner, id) + url += '?fileInfo=%s' % (Encoder.encode(json.dumps(fileInfo))) + responseData = self.sendSessionRequest(url=url, method='PUT') + return Workflow(responseData) + + @ProcRestApi.execute + def getWorkflows(self, owner, queryDict={}): + if not owner: + raise InvalidRequest('Invalid experiment name provided.') + url = '%s/filesByExperiment/%s' % (self.getContextRoot(), owner) + url += '?queryDict=%s' % (Encoder.encode(json.dumps(queryDict))) + responseData = self.sendSessionRequest(url=url, method='GET') + return self.toDmObjectList(responseData, Workflow) + + @ProcRestApi.execute + def getWorkflowById(self, owner, id): + if not owner: + raise InvalidRequest('Invalid experiment name provided.') + if not id: + raise InvalidRequest('Invalid file id provided.') + url = '%s/filesByExperimentAndId/%s/%s' % (self.getContextRoot(), owner, id) + responseData = self.sendSessionRequest(url=url, method='GET') + return Workflow(responseData) + + @ProcRestApi.execute + def getWorkflowByName(self, owner, name): + if not owner: + raise InvalidRequest('Invalid owner name provided.') + if not name: + raise InvalidRequest('Invalid workflow name provided.') + url = '%s/workflowsByOwner/%s/%s' % (self.getContextRoot(), owner, Encoder.encode(name)) + responseData = self.sendSessionRequest(url=url, method='GET') + return Workflow(responseData) + +####################################################################### +# Testing. + +if __name__ == '__main__': + api = WorkflowRestApi() + + diff --git a/src/python/dm/proc_web_service/cli/__init__.py b/src/python/dm/proc_web_service/cli/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/python/dm/proc_web_service/cli/getWorkflowCli.py b/src/python/dm/proc_web_service/cli/getWorkflowCli.py new file mode 100755 index 0000000000000000000000000000000000000000..16f5510bc6cd5ec88da6cecea08148fb7a74a6cc --- /dev/null +++ b/src/python/dm/proc_web_service/cli/getWorkflowCli.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +from dm.proc_web_service.api.workflowRestApi import WorkflowRestApi +from dm.common.exceptions.invalidRequest import InvalidRequest +from procWebServiceSessionCli import ProcWebServiceSessionCli + +class GetWorkflowCli(ProcWebServiceSessionCli): + def __init__(self): + ProcWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) + self.addOption('', '--owner', dest='owner', help='Owner name.') + self.addOption('', '--workflow', dest='workflow', help='Workflow name.') + + def checkArgs(self): + if self.options.owner is None: + raise InvalidRequest('Owner name must be provided.') + if self.options.workflow is None: + raise InvalidRequest('Workflow name must be provided.') + + def getOwner(self): + return self.options.owner + + def getWorkflow(self): + return self.options.workflow + + def runCommand(self): + self.parseArgs(usage=""" + dm-get-workflow --workflow=WORKFLOW --owner=OWNER + +Description: + Get workflow details. + """) + self.checkArgs() + api = WorkflowRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) + workflow = api.getWorkflowByName(self.getOwner(), self.getWorkflow()) + print workflow.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) + +####################################################################### +# Run command. +if __name__ == '__main__': + cli = GetWorkflowCli() + cli.run() + diff --git a/src/python/dm/proc_web_service/cli/procWebServiceCli.py b/src/python/dm/proc_web_service/cli/procWebServiceCli.py new file mode 100755 index 0000000000000000000000000000000000000000..01cef2e5bbd347f8e543a4efdcd4e70877fadfac --- /dev/null +++ b/src/python/dm/proc_web_service/cli/procWebServiceCli.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python + +from dm.common.cli.dmRestCli import DmRestCli +from dm.common.utility.configurationManager import ConfigurationManager + +class ProcWebServiceCli(DmRestCli): + """ DM PROC web service cli class. """ + + def __init__(self, validArgCount=0): + DmRestCli.__init__(self, validArgCount) + + def getDefaultServiceHost(self): + return ConfigurationManager.getInstance().getProcWebServiceHost() + + def getDefaultServicePort(self): + return ConfigurationManager.getInstance().getProcWebServicePort() + diff --git a/src/python/dm/proc_web_service/cli/procWebServiceSessionCli.py b/src/python/dm/proc_web_service/cli/procWebServiceSessionCli.py new file mode 100755 index 0000000000000000000000000000000000000000..836a493acff3086b28c51d15c327843458abca10 --- /dev/null +++ b/src/python/dm/proc_web_service/cli/procWebServiceSessionCli.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +from dm.common.cli.dmRestSessionCli import DmRestSessionCli +from dm.common.utility.osUtility import OsUtility +from dm.common.utility.configurationManager import ConfigurationManager + +class ProcWebServiceSessionCli(DmRestSessionCli): + """ DM PROC web service session cli class. """ + + DEFAULT_SESSION_CACHE_FILE = OsUtility.getUserHomeDir() + '/.dm/.proc.session.cache' + + def __init__(self, validArgCount=0): + DmRestSessionCli.__init__(self, validArgCount) + ConfigurationManager.getInstance().setSessionCacheFile(ProcWebServiceSessionCli.DEFAULT_SESSION_CACHE_FILE) + + def getDefaultServiceHost(self): + return ConfigurationManager.getInstance().getProcWebServiceHost() + + def getDefaultServicePort(self): + return ConfigurationManager.getInstance().getProcWebServicePort() + diff --git a/src/python/dm/proc_web_service/service/__init__.py b/src/python/dm/proc_web_service/service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/python/dm/proc_web_service/service/impl/__init__.py b/src/python/dm/proc_web_service/service/impl/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/python/dm/proc_web_service/service/impl/executionManager.py b/src/python/dm/proc_web_service/service/impl/executionManager.py new file mode 100755 index 0000000000000000000000000000000000000000..401e4a26e415700b4ad60e5060dd5479f3e23d66 --- /dev/null +++ b/src/python/dm/proc_web_service/service/impl/executionManager.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python + +import threading +import time +import os +import uuid +import glob +import copy + +from dm.common.constants import dmProcessingStatus +from dm.common.exceptions.invalidRequest import InvalidRequest +from dm.common.objects.childProcess import ChildProcess +from dm.common.objects.processingJob import ProcessingJob +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.configurationManager import ConfigurationManager +from dm.common.utility.objectUtility import ObjectUtility +from dm.common.utility.valueUtility import ValueUtility +from dm.common.utility.timeUtility import TimeUtility +from dm.common.utility.threadingUtility import ThreadingUtility +from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue +from dm.common.utility.singleton import Singleton +from executionThread import ExecutionThread +from processingJobTracker import ProcessingJobTracker + +class ExecutionManager(Singleton): + + CONFIG_SECTION_NAME = 'ExecutionManager' + NUMBER_OF_EXECUTION_THREADS_KEY = 'numberofexecutionthreads' + DEFAULT_NUMBER_OF_EXECUTION_THREADS = 3 + + FILE_PATH_LIST_KEY = 'filePathList' + FILE_PATH_PATTERN_KEY = 'filePathPattern' + FILE_QUERY_DICT_KEY = 'fileQueryDict' + + PROCESSING_JOB_START_DELAY_IN_SECONDS = 10 + MAX_NUMBER_OF_CHILD_PROCESSES = 10 + CHILD_PROCESSES_EVENT_TIMEOUT_IN_SECONDS = 10 + + # Singleton. + __instanceLock = threading.RLock() + __instance = None + + def __init__(self): + ExecutionManager.__instanceLock.acquire() + try: + if ExecutionManager.__instance: + return + ExecutionManager.__instance = self + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + + self.logger.debug('Initializing') + self.lock = threading.RLock() + self.executionThreadDict = {} + self.eventFlag = threading.Event() + self.executionQueue = TimeBasedProcessingQueue() + self.__configure() + self.logger.debug('Initialization complete') + finally: + ExecutionManager.__instanceLock.release() + + def __configure(self): + cm = ConfigurationManager.getInstance() + configItems = cm.getConfigItems(ExecutionManager.CONFIG_SECTION_NAME) + self.logger.debug('Got config items: %s' % configItems) + self.nExecutionThreads = int(cm.getConfigOption(ExecutionManager.CONFIG_SECTION_NAME, ExecutionManager.NUMBER_OF_EXECUTION_THREADS_KEY, ExecutionManager.DEFAULT_NUMBER_OF_EXECUTION_THREADS)) + + def resolveFileSpec(self, processingJob): + if processingJob.has_key('filePathList'): + return self.resolveFilePathList(processingJob) + elif processingJob.has_key('filePathPattern'): + return self.resolveFilePathPattern(processingJob) + elif processingJob.has_key('fileQueryDict'): + return self.resolveFileQueryDict(processingJob) + else: + raise InvalidRequest('Invalid input file specification.') + + def resolveFilePathList(self, processingJob): + return processingJob.get('filePathList') + + def resolveFilePathPattern(self, processingJob): + if not processingJob.has_key('filePathList'): + globPattern = processingJob.get('filePathPattern') + dataDir = processingJob.get('dataDir') + currentDir = os.getcwd() + if dataDir: + os.chdir(dataDir) + processingJob['filePathList'] = glob.glob(globPattern) + os.chdir(currentDir) + return processingJob.get('filePathList', []) + + def resolveFileQueryDict(self, processingJob): + return [] + + def startProcessingJob(self, processingJob): + workflow = processingJob.get('workflow') + self.logger.debug('Submitted processing job for workflow %s' % workflow.get('name')) + jobId = str(uuid.uuid4()) + processingJob['id'] = jobId + startTime = time.time() + processingJob['startTime'] = startTime + processingJob['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime) + + processingJob['nFiles'] = 0 + processingJob['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING + ProcessingJobTracker.getInstance().put(jobId, processingJob) + self.logger.debug('Starting processing job %s timer' % jobId) + + timer = threading.Timer(self.PROCESSING_JOB_START_DELAY_IN_SECONDS, self.runProcessingJob, args=[processingJob]) + timer.start() + return processingJob + + def runProcessingJob(self, processingJob): + try: + childProcessEvent = threading.Event() + processingJob.childProcessEvent = childProcessEvent + jobId = processingJob['id'] + self.logger.debug('Timer started for processing job %s.' % jobId) + + # Resolve files + filePathList = self.resolveFileSpec(processingJob) + if not filePathList: + raise InvalidRequest('There are no input files available for processing.') + self.logger.debug('There are %s files for processing job %s.' % (len(filePathList), jobId)) + processingJob['nFiles'] = len(filePathList) + workflowStages = processingJob.get('workflow').get('stages') + workflowStageKeys = workflowStages.keys() + workflowStageKeys.sort() + processingJob['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING + childProcessNumber = 0 + workingDir = processingJob.get('workingDir') + for stageKey in workflowStageKeys: + if processingJob['status'] != dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING: + self.logger.debug('Processing job %s status is %s, exiting workflow at stage %s' % (jobId, processingJob['status'], stageKey)) + break + + self.logger.debug('Starting stage %s for processing job %s.' % (stageKey,jobId)) + workflowStage = workflowStages.get(stageKey) + workflowStage['childProcesses'] = {} + workflowStage['nRunningChildProcesses'] = 0 + workflowStage['nQueuedChildProcesses'] = 0 + command = workflowStage.get('command') + processingJob['activeStage'] = stageKey + variableNames = [word for word in command.split() if word.startswith('$')] + self.logger.debug('Variables found for stage %s: ' % variableNames) + iterateOverFiles = False + for v in variableNames: + variableName = v[1:] # remove '$' + variableValue = processingJob.get(variableName) + if variableName == 'filePathList': + variableValue = ' '.join(filePathList) + if variableValue is not None: + command = command.replace(v, variableValue) + if variableName == 'filePath': + iterateOverFiles = True + + if iterateOverFiles: + # We iterate over all files + for f in filePathList: + childProcessEvent.clear() + if processingJob['status'] != dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING: + self.logger.debug('Processing job %s status is %s, exiting workflow at stage %s' % (jobId, processingJob['status'], stageKey)) + break + + # Wait until we can queue child process + while True: + nQueuedChildProcesses = workflowStage.get('nQueuedChildProcesses') + nRunningChildProcesses = workflowStage.get('nRunningChildProcesses') + nChildProcesses = nQueuedChildProcesses + nRunningChildProcesses + if nChildProcesses < self.MAX_NUMBER_OF_CHILD_PROCESSES: + break + else: + self.logger.debug('There are %s child processes for stage %s (processing job %s), cannot create new one' % (nChildProcesses, stageKey, jobId)) + childProcessEvent.wait(self.CHILD_PROCESSES_EVENT_TIMEOUT_IN_SECONDS) + + + childCommand = command.replace('$filePath', f) + self.logger.debug('Execution command for stage %s, child process %s: %s' % (stageKey, childProcessNumber, childCommand)) + childProcess = self.createChildProcess(stageKey, childProcessNumber, childCommand, workingDir) + self.pushToExecutionQueue(processingJob, childProcess) + childProcessNumber += 1 + + else: + childCommand = command + self.logger.debug('Execution command for stage %s, child process %s: %s' % (stageKey, childProcessNumber, childCommand)) + childProcess = self.createChildProcess(stageKey, childProcessNumber, childCommand, workingDir) + self.pushToExecutionQueue(processingJob, childProcess) + childProcessNumber += 1 + + # Child processed queued, wait until they finish + while True: + childProcessEvent.clear() + nQueuedChildProcesses = workflowStage.get('nQueuedChildProcesses') + nRunningChildProcesses = workflowStage.get('nRunningChildProcesses') + nChildProcesses = nQueuedChildProcesses + nRunningChildProcesses + if nChildProcesses == 0: + self.logger.debug('No more queued/running child processes for stage %s (processing job %s)' % (stageKey, jobId)) + break + else: + self.logger.debug('There are %s queued/running child processes for stage %s (processing job %s)' % (nChildProcesses, stageKey, jobId)) + childProcessEvent.wait(self.CHILD_PROCESSES_EVENT_TIMEOUT_IN_SECONDS) + + + except Exception, ex: + processingJob['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED + processingJob['errorMessage'] = str(ex) + self.logger.error('Processing job %s failed: %s' % (jobId,str(ex))) + if processingJob['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING: + processingJob['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE + self.logger.debug('Processing job %s is done.' % jobId) + self.logger.debug('Processing job: %s' % processingJob) + + def pushToExecutionQueue(self, processingJob, childProcess): + self.executionQueue.push((processingJob, childProcess)) + processingJob.childProcessQueued(childProcess) + self.setEvent() + + def createChildProcess(self, stageId, childProcessNumber, command, workingDir): + childProcess = ChildProcess({ + 'stageId' : stageId, + 'childProcessNumber' : childProcessNumber, + 'command' : command, + 'workingDir' : workingDir, + }) + return childProcess + + @ThreadingUtility.synchronize + def start(self): + self.logger.debug('Starting execution threads') + for i in range(0, self.nExecutionThreads): + tName = 'ExecutionThread-%s' % i + t = ExecutionThread(tName, self, self.executionQueue) + t.start() + self.executionThreadDict[tName] = t + + def stop(self): + self.logger.debug('Stopping execution threads') + for (tName, t) in self.executionThreadDict.items(): + t.stop() + self.setEvent() + for (tName, t) in self.executionThreadDict.items(): + t.join() + + @ThreadingUtility.synchronize + def setEvent(self): + self.eventFlag.set() + + @ThreadingUtility.synchronize + def clearEvent(self): + self.eventFlag.clear() + + def waitOnEvent(self, timeoutInSeconds=None): + self.eventFlag.wait(timeoutInSeconds) + +#################################################################### +# Testing + +if __name__ == '__main__': + LoggingManager.getInstance().configure() + LoggingManager.getInstance().setConsoleLogLevel('DEBUG') + em = ExecutionManager.getInstance() + print em + em.start() + # Stage Keys + # command (required) + # execOutFormat (optional, to be parsed for variables like $jobId ) + # workingDir (optional, default .) + # dataDir (optional, default $workingDir) + # monitorCmd (optional, if exists, will execute monitoring in a timer) + # parallelExec (optional, default False) + # filePathList, filePathPattern, or fileQueryDict (one must be present) + + # If $filePathList or $filePath is in the argument list, and + # job spec contains filePathPattern or fileQueryDict, files will be + # resolved into filePathList + # If $filePath is in the argument list, filePathList is iterated over + # for a particular stage + + # Variables that get filled automatically + # $username : DM username + # $execHost : execution host + # $id : job id + # $startTime : procesing start time + # $endTime : procesing end time + + workflow = {u'owner': u'sveseli', u'stages': { + '1' : {u'command': u'/bin/date', 'stopOnFailure' : False}, + '2' : {u'command': u'/bin/ls -l $filePath', 'parallelExec' : True}, + '3' : {u'command': u'/bin/ls -c1 $filePathList'}, + '4' : {u'command': u'/bin/echo Energy is $energyArg current is $CURRENT_ARG'}, + }, u'description': u'My first workflow', u'name': u'workflow-01', u'id': u'589396e0e20bf2465e3a6bf0'} + + processingJob = ProcessingJob({ + 'workflow' : workflow, + 'experimentName' : 'exp-01', + 'energyArg' : '2TeV', + 'CURRENT_ARG' : '2mA', + 'filePathList' : [ 'testfile01', 'testfile02', 'testfile03' ], + 'dataDir' : '/tmp/data', + 'workingDir' : '/tmp/data', + }) + em.startProcessingJob(processingJob) + + time.sleep(180) + em.stop() + + diff --git a/src/python/dm/proc_web_service/service/impl/executionThread.py b/src/python/dm/proc_web_service/service/impl/executionThread.py new file mode 100755 index 0000000000000000000000000000000000000000..ecc7bc7236b1e0b7f737b4c8df329113b392cc19 --- /dev/null +++ b/src/python/dm/proc_web_service/service/impl/executionThread.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python + +import threading +import time + +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.dmSubprocess import DmSubprocess + +class ExecutionThread(threading.Thread): + + THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0 + + def __init__ (self, name, executionManager, executionQueue): + + threading.Thread.__init__(self) + self.setName(name) + self.exitFlag = False + self.executionManager = executionManager + self.executionQueue = executionQueue + self.logger = LoggingManager.getInstance().getLogger(name) + + def processFile(self, fileInfo): + + try: + pass + except Exception, ex: + self.logger.exception(ex) + + def run(self): + self.logger.debug('Starting thread: %s' % self.getName()) + while True: + self.executionManager.clearEvent() + if self.exitFlag: + self.logger.debug('Exit flag is set') + break + + while True: + try: + queuedTuple = self.executionQueue.pop() + if queuedTuple is None: + break + processingJob, childProcess = queuedTuple + self.logger.debug('Execution queue depth after pop: %s' % self.executionQueue.getLength()) + self.logger.debug('Starting child process: %s' % childProcess) + command = childProcess.get('command') + workingDir = childProcess.get('workingDir') + processingJob.childProcessStarted(childProcess) + p = DmSubprocess.executeCommandAndIgnoreFailure(command, workingDir) + processInfo = p.toDict() + if p.getExitStatus() == 0: + processingJob.childProcessCompleted(childProcess, processInfo) + else: + processingJob.childProcessFailed(childProcess, processInfo) + except Exception, ex: + self.logger.exception(ex) + processingJob.childProcessFailed(childProcess, errorMessage=str(ex)) + + self.executionManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS) + self.logger.debug('%s is done' % self.getName()) + + def stop(self): + self.exitFlag = True + +#################################################################### +# Testing + +if __name__ == '__main__': + pass + + diff --git a/src/python/dm/proc_web_service/service/impl/processingJobTracker.py b/src/python/dm/proc_web_service/service/impl/processingJobTracker.py new file mode 100755 index 0000000000000000000000000000000000000000..5c358903017f35c7b569d99b8587088953a13419 --- /dev/null +++ b/src/python/dm/proc_web_service/service/impl/processingJobTracker.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +import os +import uuid +import time + +from dm.common.objects.processingJob import ProcessingJob +from dm.common.utility.objectTracker import ObjectTracker + +class ProcessingJobTracker(ObjectTracker): + + # Cache configuration + objectClass = ProcessingJob + cacheSize = 100 + + def __init__(self, *args, **kwargs): + ObjectTracker.__init__(self, args, kwargs) + + +#################################################################### +# Testing + +if __name__ == '__main__': + tracker = ProcessingJobTracker.getInstance() + print tracker + + + diff --git a/src/python/dm/proc_web_service/service/impl/workflowSessionControllerImpl.py b/src/python/dm/proc_web_service/service/impl/workflowSessionControllerImpl.py new file mode 100755 index 0000000000000000000000000000000000000000..768ff34a360e4ce4319668d9301ee4b8eedbda6f --- /dev/null +++ b/src/python/dm/proc_web_service/service/impl/workflowSessionControllerImpl.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +# +# Implementation for workflow controller. +# + +from dm.common.objects.dmObject import DmObject +from dm.common.objects.dmObjectManager import DmObjectManager +from dm.common.mongodb.api.workflowMongoDbApi import WorkflowMongoDbApi + +class WorkflowSessionControllerImpl(DmObjectManager): + """ Workflow controller implementation class. """ + + def __init__(self): + DmObjectManager.__init__(self) + self.workflowMongoDbApi = WorkflowMongoDbApi() + + def getWorkflowByName(self, owner, name): + return self.workflowMongoDbApi.getWorkflowByName(owner, name) + diff --git a/src/python/dm/proc_web_service/service/procWebService.py b/src/python/dm/proc_web_service/service/procWebService.py new file mode 100755 index 0000000000000000000000000000000000000000..bd0830234eb009df46f06ada2f10389c560f9bdf --- /dev/null +++ b/src/python/dm/proc_web_service/service/procWebService.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python + +# +# DM PROC Web Service +# + +from dm.common.service.dmRestWebServiceBase import DmRestWebServiceBase +from dm.common.utility.dmModuleManager import DmModuleManager +from dm.common.utility.configurationManager import ConfigurationManager + +from procWebServiceRouteMapper import ProcWebServiceRouteMapper + +class ProcWebService(DmRestWebServiceBase): + + def __init__(self): + DmRestWebServiceBase.__init__(self, ProcWebServiceRouteMapper) + + def initDmModules(self): + self.logger.debug('Initializing dm modules') + + # Add modules that will be started. + moduleManager = DmModuleManager.getInstance() + + self.logger.debug('Initialized dm modules') + + def getDefaultServerHost(self): + return ConfigurationManager.getInstance().getServiceHost() + + def getDefaultServerPort(self): + return ConfigurationManager.getInstance().getServicePort() + +#################################################################### +# Run service + +if __name__ == '__main__': + ConfigurationManager.getInstance().setServiceName('proc-web-service') + service = ProcWebService(); + service.run() diff --git a/src/python/dm/proc_web_service/service/procWebServiceRouteMapper.py b/src/python/dm/proc_web_service/service/procWebServiceRouteMapper.py new file mode 100755 index 0000000000000000000000000000000000000000..4a21b6c3bc0e1ccaffc884f2a30c224d039767de --- /dev/null +++ b/src/python/dm/proc_web_service/service/procWebServiceRouteMapper.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +# +# Route mapper for DM DAQ web service. +# + +import sys +import os + +import cherrypy +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.configurationManager import ConfigurationManager +from dm.common.service.loginRouteDescriptor import LoginRouteDescriptor +from workflowRouteDescriptor import WorkflowRouteDescriptor + +class ProcWebServiceRouteMapper: + + @classmethod + def setupRoutes(cls): + """ Setup RESTFul routes. """ + logger = LoggingManager.getInstance().getLogger(cls.__name__) + contextRoot = ConfigurationManager.getInstance().getContextRoot() + logger.debug('Using context root: %s' % contextRoot) + + # Get routes. + routes = LoginRouteDescriptor.getRoutes() + routes += WorkflowRouteDescriptor.getRoutes() + + # Add routes to dispatcher. + d = cherrypy.dispatch.RoutesDispatcher() + for route in routes: + logger.debug('Connecting route: %s' % route) + d.connect(route['name'], route['path'], action=route['action'], controller=route['controller'], conditions=dict(method=route['method'])) + return d + + diff --git a/src/python/dm/proc_web_service/service/workflowRouteDescriptor.py b/src/python/dm/proc_web_service/service/workflowRouteDescriptor.py new file mode 100755 index 0000000000000000000000000000000000000000..f15eae71196027153cc76c74722364d06b488eee --- /dev/null +++ b/src/python/dm/proc_web_service/service/workflowRouteDescriptor.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python + +# +# User route descriptor. +# + +from dm.common.utility.configurationManager import ConfigurationManager +from workflowSessionController import WorkflowSessionController + +class WorkflowRouteDescriptor: + + @classmethod + def getRoutes(cls): + contextRoot = ConfigurationManager.getInstance().getContextRoot() + + # Static instances shared between different routes + workflowSessionController = WorkflowSessionController() + + # Define routes. + routes = [ + + # Get workflow + { + 'name' : 'getWorkflowByName', + 'path' : '%s/workflowsByOwner/:(owner)/:(encodedName)' % contextRoot, + 'controller' : workflowSessionController, + 'action' : 'getWorkflowByName', + 'method' : ['GET'] + }, + + ] + + return routes + + diff --git a/src/python/dm/proc_web_service/service/workflowSessionController.py b/src/python/dm/proc_web_service/service/workflowSessionController.py new file mode 100755 index 0000000000000000000000000000000000000000..24774fb0a10b4597cc602bfce4e1ec5d6852f5f4 --- /dev/null +++ b/src/python/dm/proc_web_service/service/workflowSessionController.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +import cherrypy +import json +from dm.common.utility.encoder import Encoder +from dm.common.service.dmSessionController import DmSessionController +from dm.proc_web_service.service.impl.workflowSessionControllerImpl import WorkflowSessionControllerImpl + +class WorkflowSessionController(DmSessionController): + + def __init__(self): + DmSessionController.__init__(self) + self.workflowSessionControllerImpl = WorkflowSessionControllerImpl() + + @cherrypy.expose + @DmSessionController.require(DmSessionController.canManageStation()) + @DmSessionController.execute + def getWorkflowByName(self, owner, encodedName, **kwargs): + if not owner: + raise InvalidRequest('Invalid owner provided.') + name = Encoder.decode(encodedName) + if not name: + raise InvalidRequest('Invalid workflow name provided.') + response = self.workflowSessionControllerImpl.getWorkflowByName(owner, name).getFullJsonRep() + return response +