#!/usr/bin/env python # # Implementation for experiment session controller. # import os import time import uuid import copy import threading from dm.common.constants import dmProcessingStatus from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.exceptions.objectNotFound import ObjectNotFound from dm.common.objects.observedFile import ObservedFile from dm.common.objects.uploadInfo import UploadInfo from dm.common.objects.pluginInfo import PluginInfo from dm.common.objects.directoryUploadInfo import DirectoryUploadInfo from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.utility.fileUtility import FileUtility from dm.common.utility.timeUtility import TimeUtility from dm.common.utility.dictUtility import DictUtility from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory from experimentTracker import ExperimentTracker from uploadTracker import UploadTracker from daqTracker import DaqTracker from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ UPLOAD_DELAY_IN_SECONDS = 1.0 UPLOAD_CHUNK_SIZE_IN_FILES = 100 UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0 DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0 SECONDS_PER_HOUR = 60*60 def __init__(self): DmObjectManager.__init__(self) self.dsExperimentApi = DsRestApiFactory.getExperimentDsApi() def startDaq(self, experimentName, dataDirectory, daqInfo): FileSystemObserver.getInstance().createDirectory(dataDirectory) experiment = self.dsExperimentApi.getExperimentByName(experimentName) storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo) FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) maxRunTimeInHours = daqInfo.get('maxRunTimeInHours') if maxRunTimeInHours: daqId = daqInfo['id'] self.logger.debug('Starting timer to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours' % (daqId, experimentName, maxRunTimeInHours)) maxRunTimeInSeconds = maxRunTimeInHours*self.SECONDS_PER_HOUR timer = threading.Timer(maxRunTimeInSeconds, self.stopDaqTimer, args=[experimentName, dataDirectory, daqId]) timer.start() return daqInfo def stopDaqTimer(self, experimentName, dataDirectory, daqId): try: daqInfo = DaqTracker.getInstance().getDaqInfo(daqId) maxRunTimeInHours = daqInfo.get('maxRunTimeInHours') self.logger.debug('Attempting to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours was exceeded' % (daqId, experimentName, maxRunTimeInHours)) daqStatus = daqInfo.get('status') if daqStatus != dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING: self.logger.debug('DAQ id %s has status of %s, will not stop it automatically' % (daqId, daqStatus)) return self.stopDaq(experimentName, dataDirectory) except Exception, ex: self.logger.error('Could not automatically stop DAQ id %s: %s' % (daqId, str(ex))) def stopDaq(self, experimentName, dataDirectory): experiment = self.dsExperimentApi.getExperimentByName(experimentName) daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory) FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) daqInfo.updateStatus() daqId = daqInfo.get('id') self.logger.debug('Stopped DAQ id %s for experiment %s' % (daqId, experimentName)) # Prepare upload on exit uploadDataDirectoryOnExit = daqInfo.get('uploadDataDirectoryOnExit') if uploadDataDirectoryOnExit: self.logger.debug('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName)) daqInfo2 = {} daqInfo2['originalDaqId'] = daqId uploadDestDirectoryOnExit = daqInfo.get('uploadDestDirectoryOnExit') if uploadDestDirectoryOnExit: self.logger.debug('Automatic upload on exit for DAQ id %s (experiment %s) is using dest directory: %s' % (daqId, experimentName, uploadDestDirectoryOnExit)) daqInfo2['destDirectory'] = uploadDestDirectoryOnExit try: uploadInfo = self.uploadFiles(experimentName, uploadDataDirectoryOnExit, daqInfo2) daqInfo['uploadIdOnExit'] = uploadInfo.get('id') except Exception, ex: self.logger.error('Could not automatically upload DAQ id %s: %s' % (daqId, str(ex))) daqInfo['uploadErrorOnExit'] = str(ex) return daqInfo def getDaqInfo(self, id): daqInfo = DaqTracker.getInstance().getDaqInfo(id) if not daqInfo: raise ObjectNotFound('Daq id %s not found.' % id) daqInfo.updateStatus() return daqInfo def listDaqs(self, status): daqInfoList = DaqTracker.getInstance().getDaqInfos(status) return daqInfoList def uploadFiles(self, experimentName, dataDirectory, daqInfo): experiment = self.dsExperimentApi.getExperimentByName(experimentName) UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory) experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) fileProcessingManager = FileProcessingManager.getInstance() uploadId = str(uuid.uuid4()) self.logger.debug('Starting upload id %s' % uploadId) uploadInfo = UploadInfo(daqInfo) uploadInfo['id'] = uploadId uploadInfo['experimentName'] = experimentName uploadInfo['storageDirectory'] = experiment.get('storageDirectory') uploadInfo['storageHost'] = experiment.get('storageHost') uploadInfo['storageUrl'] = experiment.get('storageUrl') uploadInfo['dataDirectory'] = dataDirectory uploadInfo['nProcessedFiles'] = 0 uploadInfo['nProcessingErrors'] = 0 startTime = time.time() uploadInfo['startTime'] = startTime uploadInfo['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime) daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') daqInfo['storageUrl'] = experiment.get('storageUrl') daqInfo['dataDirectory'] = dataDirectory daqInfo['uploadId'] = uploadId skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') if len(skipPlugins): skipPlugins = skipPlugins.split(',') uploadInfo['skipPlugins'] = skipPlugins else: skipPlugins = [] # Check that there is at least one processor that can process files processorList = [] for processorKey in fileProcessingManager.fileProcessorKeyList: processor = fileProcessingManager.fileProcessorDict.get(processorKey) processorName = processor.name if processorName not in skipPlugins: processorList.append(processor) if not len(processorList): raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory) UploadTracker.getInstance().startUpload(uploadId, uploadInfo) uploadInfo['nFiles'] = 0 uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING self.logger.debug('Starting upload timer for %s' % dataDirectory) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment]) timer.start() return uploadInfo def prepareUploadFiles(self, uploadInfo, daqInfo, experiment): uploadId = uploadInfo.get('id') self.logger.debug('Preparing upload id: %s' % uploadId) dataDirectory = uploadInfo.get('dataDirectory') destDirectory = uploadInfo.get('destDirectory') fileProcessingManager = FileProcessingManager.getInstance() try: # Get files self.logger.debug('Retrieving file paths for %s' % dataDirectory) filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory) # Remove hidden files self.logger.debug('Checking %s processing candidates' % len(filePathsDict)) filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) # Check which files need to be processed filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo) if not len(filePathsDict): raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) except Exception, ex: self.logger.error('Processing error for upload %s: %s' % (uploadId, str(ex))) uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED uploadInfo['errorMessage'] = str(ex) return uploadInfo['nFiles'] = len(filePathsDict) uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING self.logger.debug('Will prepare upload of %s files' % len(filePathsDict)) nProcessedFiles = 0 nFiles = len(filePathsDict) for (filePath,filePathDict) in filePathsDict.items(): try: # Only create new uploads if we have less than # UPLOAD_CHUNK_SIZE_IN_FILES waiting to be completed while True: status = uploadInfo.get('status') if status == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING: nCancelledFiles = nFiles - nProcessedFiles uploadInfo.uploadAborted(nCancelledFiles) self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles)) return nCompletedFiles = uploadInfo.get('nCompletedFiles', 0) nQueuedFiles = fileProcessingManager.getNumberOfQueuedFilesByOwner(uploadId) if nQueuedFiles < self.UPLOAD_CHUNK_SIZE_IN_FILES: # We need to add more files for upload break self.logger.debug('Upload %s has %s completed files, %s queued files waiting for upload, will not add any more for %s seconds; upload timer has processed %s out of %s files' % (uploadId, nCompletedFiles, nQueuedFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS, nProcessedFiles, nFiles)) time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS) fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory) fileInfo.update(filePathDict) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileInfo['statusMonitor'] = uploadInfo fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', []) fileInfo['owner'] = uploadId fileProcessingManager.queueFile(fileInfo) nProcessedFiles += 1 except Exception, ex: self.logger.error('Processing error: %s', ex) nCompletedFiles = uploadInfo.get('nCompletedFiles', 0) nQueuedFiles = fileProcessingManager.getNumberOfQueuedFilesByOwner(uploadId) self.logger.debug('Done preparing upload %s; it has a total of %s files (%s completed and %s queued)' % (uploadId, nFiles, nCompletedFiles, nQueuedFiles)) def getUploadInfo(self, id): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) uploadInfo.updateStatus() return uploadInfo def listUploads(self, status): uploadInfoList = UploadTracker.getInstance().getUploadInfos(status) return uploadInfoList def stopUpload(self, id): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING uploadInfo.updateStatus() return uploadInfo def uploadDirectory(self, experimentName, dataDirectory, daqInfo): experiment = self.dsExperimentApi.getExperimentByName(experimentName) UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory) experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) uploadId = str(uuid.uuid4()) self.logger.debug('Starting upload id %s' % uploadId) uploadInfo = DirectoryUploadInfo(daqInfo) uploadInfo['id'] = uploadId uploadInfo['experimentName'] = experimentName uploadInfo['storageDirectory'] = experiment.get('storageDirectory') uploadInfo['storageHost'] = experiment.get('storageHost') uploadInfo['storageUrl'] = experiment.get('storageUrl') uploadInfo['dataDirectory'] = dataDirectory startTime = time.time() uploadInfo['startTime'] = startTime uploadInfo['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime) daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') daqInfo['storageUrl'] = experiment.get('storageUrl') daqInfo['dataDirectory'] = dataDirectory daqInfo['uploadId'] = uploadId skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') if len(skipPlugins): skipPlugins = skipPlugins.split(',') uploadInfo['skipPlugins'] = skipPlugins UploadTracker.getInstance().startUpload(uploadId, uploadInfo) uploadInfo['nFiles'] = 0 uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING fileProcessingManager = FileProcessingManager.getInstance() processingInfo = {} uploadInfo['processingInfo'] = processingInfo for processorKey in fileProcessingManager.fileProcessorKeyList: processor = fileProcessingManager.fileProcessorDict.get(processorKey) processorName = processor.name if processorName in skipPlugins: processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED} else: processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_PENDING} self.logger.debug('Starting upload directory %s timer for experiment %s' % (dataDirectory, experimentName)) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadDirectory, args=[uploadInfo, daqInfo, experiment]) timer.start() return uploadInfo def prepareUploadDirectory(self, uploadInfo, daqInfo, experiment): uploadId = uploadInfo['id'] dataDirectory = uploadInfo['dataDirectory'] experimentName = uploadInfo['experimentName'] skipPlugins = uploadInfo.get('skipPlugins', []) self.logger.debug('Preparing directory %s upload for experiment %s' % (dataDirectory, experimentName)) try: filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory) self.logger.debug('There are %s files in directory %s (experiment %s)' % (len(filePathsDict), dataDirectory, experimentName)) except Exception, ex: self.logger.error('Cannot retrieve files for directory upload %s: %s' % (uploadId, str(ex))) self.logger.error('Marking directory upload %s as failed' % (uploadId)) uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED uploadInfo['errorMessage'] = str(ex) return fileProcessingManager = FileProcessingManager.getInstance() self.logger.debug('Preparing plugin timers for directory %s upload (experiment %s)' % (dataDirectory, experimentName)) for processorKey in fileProcessingManager.fileProcessorKeyList: processor = fileProcessingManager.fileProcessorDict.get(processorKey) processorName = processor.name if not processorName in skipPlugins: self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory)) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict]) timer.start() uploadInfo['nFiles'] = len(filePathsDict) uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict): uploadId = uploadInfo.get('id') dataDirectory = uploadInfo.get('dataDirectory') processorName = processor.name processingInfo = uploadInfo.get('processingInfo') self.logger.debug('Starting %s processing for upload %s by %s' % (dataDirectory, uploadId, processorName)) try: dependsOn = processor.dependsOn while True: # Check status if uploadInfo['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING: processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED return # Check that processor can proceed canProcess = False if not len(dependsOn): canProcess = True for depProcessorName in dependsOn: depProcessorStatus = processingInfo.get(depProcessorName).get('status') if depProcessorStatus in [ dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED, dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED, dmProcessingStatus.DM_PROCESSING_STATUS_FAILED]: # We must skip processing self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED return elif depProcessorStatus in [dmProcessingStatus.DM_PROCESSING_STATUS_PENDING, dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING]: # Do nothing pass elif depProcessorStatus == dmProcessingStatus.DM_PROCESSING_STATUS_DONE: # We can proceed canProcess = True else: # This should not happen self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED return # Process directory if we can if canProcess: directoryInfo = {'uploadInfo' : uploadInfo, 'daqInfo' : daqInfo, 'experiment' : experiment, 'filePathsDict' : filePathsDict } processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING processingStartTime = time.time() processor.processDirectory(directoryInfo) if processingInfo[processorName]['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING: processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName)) else: self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status'])) break # Wait a bit longer time.sleep(self.DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS) except Exception, ex: self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex))) processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED processingInfo[processorName]['processingError'] = str(ex) processingEndTime = time.time() processingInfo[processorName]['processingEndTime'] = processingEndTime processingInfo[processorName]['processingStartTime'] = processingStartTime processingInfo[processorName]['processingRunTime'] = processingEndTime-processingStartTime def getProcessingPlugins(self): pluginList = [] fileProcessingManager = FileProcessingManager.getInstance() for processorKey in fileProcessingManager.fileProcessorKeyList: processor = fileProcessingManager.fileProcessorDict.get(processorKey) pluginInfo = {'name' : processor.name, 'dependsOn' : processor.dependsOn} pluginList.append(PluginInfo(pluginInfo)) return pluginList