#!/usr/bin/env python import threading import time import os import uuid import glob import copy from dm.common.constants import dmProcessingStatus from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.objects.childProcess import ChildProcess from dm.common.objects.processingJob import ProcessingJob from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.configurationManager import ConfigurationManager from dm.common.utility.objectUtility import ObjectUtility from dm.common.utility.valueUtility import ValueUtility from dm.common.utility.timeUtility import TimeUtility from dm.common.utility.threadingUtility import ThreadingUtility from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue from dm.common.utility.singleton import Singleton from executionThread import ExecutionThread from processingJobTracker import ProcessingJobTracker class ExecutionManager(Singleton): CONFIG_SECTION_NAME = 'ExecutionManager' NUMBER_OF_EXECUTION_THREADS_KEY = 'numberofexecutionthreads' DEFAULT_NUMBER_OF_EXECUTION_THREADS = 3 FILE_PATH_LIST_KEY = 'filePathList' FILE_PATH_PATTERN_KEY = 'filePathPattern' FILE_QUERY_DICT_KEY = 'fileQueryDict' PROCESSING_JOB_START_DELAY_IN_SECONDS = 10 MAX_NUMBER_OF_CHILD_PROCESSES = 10 CHILD_PROCESSES_EVENT_TIMEOUT_IN_SECONDS = 10 # Singleton. __instanceLock = threading.RLock() __instance = None def __init__(self): ExecutionManager.__instanceLock.acquire() try: if ExecutionManager.__instance: return ExecutionManager.__instance = self self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) self.logger.debug('Initializing') self.lock = threading.RLock() self.executionThreadDict = {} self.eventFlag = threading.Event() self.executionQueue = TimeBasedProcessingQueue() self.__configure() self.logger.debug('Initialization complete') finally: ExecutionManager.__instanceLock.release() def __configure(self): cm = ConfigurationManager.getInstance() configItems = cm.getConfigItems(ExecutionManager.CONFIG_SECTION_NAME) self.logger.debug('Got config items: %s' % configItems) self.nExecutionThreads = int(cm.getConfigOption(ExecutionManager.CONFIG_SECTION_NAME, ExecutionManager.NUMBER_OF_EXECUTION_THREADS_KEY, ExecutionManager.DEFAULT_NUMBER_OF_EXECUTION_THREADS)) def resolveFileSpec(self, processingJob): if processingJob.has_key('filePathList'): return self.resolveFilePathList(processingJob) elif processingJob.has_key('filePathPattern'): return self.resolveFilePathPattern(processingJob) elif processingJob.has_key('fileQueryDict'): return self.resolveFileQueryDict(processingJob) else: raise InvalidRequest('Invalid input file specification.') def resolveFilePathList(self, processingJob): return processingJob.get('filePathList') def resolveFilePathPattern(self, processingJob): if not processingJob.has_key('filePathList'): globPattern = processingJob.get('filePathPattern') dataDir = processingJob.get('dataDir') currentDir = os.getcwd() if dataDir: os.chdir(dataDir) processingJob['filePathList'] = glob.glob(globPattern) os.chdir(currentDir) return processingJob.get('filePathList', []) def resolveFileQueryDict(self, processingJob): return [] def startProcessingJob(self, processingJob): workflow = processingJob.get('workflow') self.logger.debug('Submitted processing job for workflow %s' % workflow.get('name')) jobId = str(uuid.uuid4()) processingJob['id'] = jobId startTime = time.time() processingJob['startTime'] = startTime processingJob['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime) processingJob['nFiles'] = 0 processingJob['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING ProcessingJobTracker.getInstance().put(jobId, processingJob) self.logger.debug('Starting processing job %s timer' % jobId) timer = threading.Timer(self.PROCESSING_JOB_START_DELAY_IN_SECONDS, self.runProcessingJob, args=[processingJob]) timer.start() return processingJob def runProcessingJob(self, processingJob): try: childProcessEvent = threading.Event() processingJob.childProcessEvent = childProcessEvent jobId = processingJob['id'] self.logger.debug('Timer started for processing job %s.' % jobId) # Resolve files filePathList = self.resolveFileSpec(processingJob) if not filePathList: raise InvalidRequest('There are no input files available for processing.') self.logger.debug('There are %s files for processing job %s.' % (len(filePathList), jobId)) processingJob['nFiles'] = len(filePathList) workflowStages = processingJob.get('workflow').get('stages') workflowStageKeys = workflowStages.keys() workflowStageKeys.sort() processingJob['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING childProcessNumber = 0 workingDir = processingJob.get('workingDir') for stageKey in workflowStageKeys: if processingJob['status'] != dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING: self.logger.debug('Processing job %s status is %s, exiting workflow at stage %s' % (jobId, processingJob['status'], stageKey)) break self.logger.debug('Starting stage %s for processing job %s.' % (stageKey,jobId)) workflowStage = workflowStages.get(stageKey) workflowStage['childProcesses'] = {} workflowStage['nRunningChildProcesses'] = 0 workflowStage['nQueuedChildProcesses'] = 0 command = workflowStage.get('command') processingJob['activeStage'] = stageKey variableNames = [word for word in command.split() if word.startswith('$')] self.logger.debug('Variables found for stage %s: ' % variableNames) iterateOverFiles = False for v in variableNames: variableName = v[1:] # remove '$' variableValue = processingJob.get(variableName) if variableName == 'filePathList': variableValue = ' '.join(filePathList) if variableValue is not None: command = command.replace(v, variableValue) if variableName == 'filePath': iterateOverFiles = True if iterateOverFiles: # We iterate over all files for f in filePathList: childProcessEvent.clear() if processingJob['status'] != dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING: self.logger.debug('Processing job %s status is %s, exiting workflow at stage %s' % (jobId, processingJob['status'], stageKey)) break # Wait until we can queue child process while True: nQueuedChildProcesses = workflowStage.get('nQueuedChildProcesses') nRunningChildProcesses = workflowStage.get('nRunningChildProcesses') nChildProcesses = nQueuedChildProcesses + nRunningChildProcesses if nChildProcesses < self.MAX_NUMBER_OF_CHILD_PROCESSES: break else: self.logger.debug('There are %s child processes for stage %s (processing job %s), cannot create new one' % (nChildProcesses, stageKey, jobId)) childProcessEvent.wait(self.CHILD_PROCESSES_EVENT_TIMEOUT_IN_SECONDS) childCommand = command.replace('$filePath', f) self.logger.debug('Execution command for stage %s, child process %s: %s' % (stageKey, childProcessNumber, childCommand)) childProcess = self.createChildProcess(stageKey, childProcessNumber, childCommand, workingDir) self.pushToExecutionQueue(processingJob, childProcess) childProcessNumber += 1 else: childCommand = command self.logger.debug('Execution command for stage %s, child process %s: %s' % (stageKey, childProcessNumber, childCommand)) childProcess = self.createChildProcess(stageKey, childProcessNumber, childCommand, workingDir) self.pushToExecutionQueue(processingJob, childProcess) childProcessNumber += 1 # Child processed queued, wait until they finish while True: childProcessEvent.clear() nQueuedChildProcesses = workflowStage.get('nQueuedChildProcesses') nRunningChildProcesses = workflowStage.get('nRunningChildProcesses') nChildProcesses = nQueuedChildProcesses + nRunningChildProcesses if nChildProcesses == 0: self.logger.debug('No more queued/running child processes for stage %s (processing job %s)' % (stageKey, jobId)) break else: self.logger.debug('There are %s queued/running child processes for stage %s (processing job %s)' % (nChildProcesses, stageKey, jobId)) childProcessEvent.wait(self.CHILD_PROCESSES_EVENT_TIMEOUT_IN_SECONDS) except Exception, ex: processingJob['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED processingJob['errorMessage'] = str(ex) self.logger.error('Processing job %s failed: %s' % (jobId,str(ex))) if processingJob['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING: processingJob['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE self.logger.debug('Processing job %s is done.' % jobId) self.logger.debug('Processing job: %s' % processingJob) def pushToExecutionQueue(self, processingJob, childProcess): self.executionQueue.push((processingJob, childProcess)) processingJob.childProcessQueued(childProcess) self.setEvent() def createChildProcess(self, stageId, childProcessNumber, command, workingDir): childProcess = ChildProcess({ 'stageId' : stageId, 'childProcessNumber' : childProcessNumber, 'command' : command, 'workingDir' : workingDir, }) return childProcess @ThreadingUtility.synchronize def start(self): self.logger.debug('Starting execution threads') for i in range(0, self.nExecutionThreads): tName = 'ExecutionThread-%s' % i t = ExecutionThread(tName, self, self.executionQueue) t.start() self.executionThreadDict[tName] = t def stop(self): self.logger.debug('Stopping execution threads') for (tName, t) in self.executionThreadDict.items(): t.stop() self.setEvent() for (tName, t) in self.executionThreadDict.items(): t.join() @ThreadingUtility.synchronize def setEvent(self): self.eventFlag.set() @ThreadingUtility.synchronize def clearEvent(self): self.eventFlag.clear() def waitOnEvent(self, timeoutInSeconds=None): self.eventFlag.wait(timeoutInSeconds) #################################################################### # Testing if __name__ == '__main__': LoggingManager.getInstance().configure() LoggingManager.getInstance().setConsoleLogLevel('DEBUG') em = ExecutionManager.getInstance() print em em.start() # Stage Keys # command (required) # execOutFormat (optional, to be parsed for variables like $jobId ) # workingDir (optional, default .) # dataDir (optional, default $workingDir) # monitorCmd (optional, if exists, will execute monitoring in a timer) # parallelExec (optional, default False) # filePathList, filePathPattern, or fileQueryDict (one must be present) # If $filePathList or $filePath is in the argument list, and # job spec contains filePathPattern or fileQueryDict, files will be # resolved into filePathList # If $filePath is in the argument list, filePathList is iterated over # for a particular stage # Variables that get filled automatically # $username : DM username # $execHost : execution host # $id : job id # $startTime : procesing start time # $endTime : procesing end time workflow = {u'owner': u'sveseli', u'stages': { '1' : {u'command': u'/bin/date', 'stopOnFailure' : False}, '2' : {u'command': u'/bin/ls -l $filePath', 'parallelExec' : True}, '3' : {u'command': u'/bin/ls -c1 $filePathList'}, '4' : {u'command': u'/bin/echo Energy is $energyArg current is $CURRENT_ARG'}, }, u'description': u'My first workflow', u'name': u'workflow-01', u'id': u'589396e0e20bf2465e3a6bf0'} processingJob = ProcessingJob({ 'workflow' : workflow, 'experimentName' : 'exp-01', 'energyArg' : '2TeV', 'CURRENT_ARG' : '2mA', 'filePathList' : [ 'testfile01', 'testfile02', 'testfile03' ], 'dataDir' : '/tmp/data', 'workingDir' : '/tmp/data', }) em.startProcessingJob(processingJob) time.sleep(180) em.stop()