#!/usr/bin/env python # # Implementation for experiment session controller. # import os import time import uuid import copy import threading from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.exceptions.objectNotFound import ObjectNotFound from dm.common.objects.observedFile import ObservedFile from dm.common.objects.uploadInfo import UploadInfo from dm.common.objects.pluginInfo import PluginInfo from dm.common.objects.directoryUploadInfo import DirectoryUploadInfo from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.utility.fileUtility import FileUtility from dm.common.utility.timeUtility import TimeUtility from dm.common.utility.dictUtility import DictUtility from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory from experimentTracker import ExperimentTracker from uploadTracker import UploadTracker from daqTracker import DaqTracker from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ UPLOAD_DELAY_IN_SECONDS = 1.0 DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0 def __init__(self): DmObjectManager.__init__(self) self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() def startDaq(self, experimentName, dataDirectory, daqInfo): FileSystemObserver.getInstance().createDirectory(dataDirectory) experiment = self.dsExperimentApi.getExperimentByName(experimentName) storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo) FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) return daqInfo def stopDaq(self, experimentName, dataDirectory): experiment = self.dsExperimentApi.getExperimentByName(experimentName) daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory) FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) daqInfo.updateStatus() return daqInfo def getDaqInfo(self, id): daqInfo = DaqTracker.getInstance().getDaqInfo(id) if not daqInfo: raise ObjectNotFound('Daq id %s not found.' % id) daqInfo.updateStatus() return daqInfo def uploadFiles(self, experimentName, dataDirectory, daqInfo): experiment = self.dsExperimentApi.getExperimentByName(experimentName) UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory) experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory) fileProcessingManager = FileProcessingManager.getInstance() uploadId = str(uuid.uuid4()) self.logger.debug('Starting upload id %s' % uploadId) uploadInfo = UploadInfo(daqInfo) uploadInfo['id'] = uploadId uploadInfo['experimentName'] = experimentName uploadInfo['storageDirectory'] = experiment.get('storageDirectory') uploadInfo['storageHost'] = experiment.get('storageHost') uploadInfo['storageUrl'] = experiment.get('storageUrl') uploadInfo['dataDirectory'] = dataDirectory uploadInfo['nProcessedFiles'] = 0 uploadInfo['nProcessingErrors'] = 0 startTime = time.time() uploadInfo['startTime'] = startTime uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime) daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') daqInfo['storageUrl'] = experiment.get('storageUrl') daqInfo['dataDirectory'] = dataDirectory daqInfo['uploadId'] = uploadId skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') if len(skipPlugins): skipPlugins = skipPlugins.split(',') uploadInfo['skipPlugins'] = skipPlugins else: skipPlugins = [] # Check that there is at least one processor that can process files processorList = [] for processorKey in fileProcessingManager.fileProcessorKeyList: processor = fileProcessingManager.fileProcessorDict.get(processorKey) processorName = processor.name if processorName not in skipPlugins: processorList.append(processor) if not len(processorList): raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory) # Remove hidden files self.logger.debug('Checking %s processing candidates' % len(filePathsDict)) filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) # Check which files need to be processed filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo) if not len(filePathsDict): raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) UploadTracker.getInstance().startUpload(uploadId, uploadInfo) uploadInfo['nFiles'] = len(filePathsDict) uploadInfo['status'] = 'running' self.logger.debug('Will prepare upload of %s files' % len(filePathsDict)) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment, filePathsDict]) timer.start() return uploadInfo def prepareUploadFiles(self, uploadInfo, daqInfo, experiment, filePathsDict): uploadId = uploadInfo.get('id') self.logger.debug('Preparing upload id: %s' % uploadId) dataDirectory = uploadInfo.get('dataDirectory') fileProcessingManager = FileProcessingManager.getInstance() nProcessedFiles = 0 nFiles = len(filePathsDict) for (filePath,filePathDict) in filePathsDict.items(): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo.update(filePathDict) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileInfo['statusMonitor'] = uploadInfo fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', []) try: if uploadInfo.get('status') != 'aborting': fileProcessingManager.processFile(fileInfo) nProcessedFiles += 1 else: nCancelledFiles = nFiles - nProcessedFiles uploadInfo.uploadAborted(nCancelledFiles) self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles)) break except Exception, ex: self.logger.error('Processing error: %s', ex) self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict))) def getUploadInfo(self, id): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) uploadInfo.updateStatus() return uploadInfo def stopUpload(self, id): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) uploadInfo['status'] = 'aborting' uploadInfo.updateStatus() return uploadInfo def uploadDirectory(self, experimentName, dataDirectory, daqInfo): experiment = self.dsExperimentApi.getExperimentByName(experimentName) UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory) experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory) uploadId = str(uuid.uuid4()) self.logger.debug('Starting upload id %s' % uploadId) uploadInfo = DirectoryUploadInfo(daqInfo) uploadInfo['id'] = uploadId uploadInfo['experimentName'] = experimentName uploadInfo['storageDirectory'] = experiment.get('storageDirectory') uploadInfo['storageHost'] = experiment.get('storageHost') uploadInfo['storageUrl'] = experiment.get('storageUrl') uploadInfo['dataDirectory'] = dataDirectory startTime = time.time() uploadInfo['startTime'] = startTime uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime) daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') daqInfo['storageUrl'] = experiment.get('storageUrl') daqInfo['dataDirectory'] = dataDirectory daqInfo['uploadId'] = uploadId skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') if len(skipPlugins): skipPlugins = skipPlugins.split(',') uploadInfo['skipPlugins'] = skipPlugins else: skipPlugins = [] fileProcessingManager = FileProcessingManager.getInstance() processingInfo = {} uploadInfo['processingInfo'] = processingInfo for processorKey in fileProcessingManager.fileProcessorKeyList: processor = fileProcessingManager.fileProcessorDict.get(processorKey) processorName = processor.name if processorName in skipPlugins: processingInfo[processorName] = {'status' : 'skipped'} else: self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory)) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict]) processingInfo[processorName] = {'status' : 'pending'} timer.start() UploadTracker.getInstance().startUpload(uploadId, uploadInfo) uploadInfo['nFiles'] = len(filePathsDict) uploadInfo['status'] = 'running' return uploadInfo def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict): uploadId = uploadInfo.get('id') dataDirectory = uploadInfo.get('dataDirectory') processorName = processor.name processingInfo = uploadInfo.get('processingInfo') self.logger.debug('Starting %s processing for upload %s by %s' % (dataDirectory, uploadId, processorName)) try: dependsOn = processor.dependsOn while True: # Check status if uploadInfo['status'] == 'aborting': processingInfo[processorName]['status'] = 'aborted' return # Check that processor can proceed canProcess = False if not len(dependsOn): canProcess = True for depProcessorName in dependsOn: depProcessorStatus = processingInfo.get(depProcessorName).get('status') if depProcessorStatus in ['skipped', 'aborted', 'failed']: # We must skip processing self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) processingInfo[processorName]['status'] = 'skipped' return elif depProcessorStatus in ['pending', 'running']: # Do nothing pass elif depProcessorStatus == 'done': # We can proceed canProcess = True else: # This should not happen self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) processingInfo[processorName]['status'] = 'skipped' return # Process directory if we can if canProcess: directoryInfo = {'uploadInfo' : uploadInfo, 'daqInfo' : daqInfo, 'experiment' : experiment, 'filePathsDict' : filePathsDict } processingInfo[processorName]['status'] = 'running' processingStartTime = time.time() processor.processDirectory(directoryInfo) if processingInfo[processorName]['status'] == 'running': processingInfo[processorName]['status'] = 'done' self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName)) else: self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status'])) break # Wait a bit longer time.sleep(self.DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS) except Exception, ex: self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex))) processingInfo[processorName]['status'] = 'failed' processingInfo[processorName]['processingError'] = str(ex) processingEndTime = time.time() processingInfo[processorName]['processingEndTime'] = processingEndTime processingInfo[processorName]['processingStartTime'] = processingStartTime processingInfo[processorName]['processingRunTime'] = processingEndTime-processingStartTime def getProcessingPlugins(self): pluginList = [] fileProcessingManager = FileProcessingManager.getInstance() for processorKey in fileProcessingManager.fileProcessorKeyList: processor = fileProcessingManager.fileProcessorDict.get(processorKey) pluginInfo = {'name' : processor.name, 'dependsOn' : processor.dependsOn} pluginList.append(PluginInfo(pluginInfo)) return pluginList