#!/usr/bin/env python # # Implementation for experiment session controller. # import os import time from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.utility.osUtility import OsUtility from dm.common.objects.observedFile import ObservedFile from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory from experimentTracker import ExperimentTracker from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ def __init__(self): DmObjectManager.__init__(self) self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() def startDaq(self, experimentName, dataDirectory, daqInfo): if not os.path.exists(dataDirectory): os.makedirs(dataDirectory) if daqInfo is None: daqInfo={} daqInfo['experimentName'] = experimentName daqInfo['dataDirectory'] = dataDirectory experiment = ExperimentTracker.getInstance().get(experimentName) if experiment is not None: oldDaqInfo = experiment.get('daqInfo') if oldDaqInfo.get('daqEndTime') is None: raise InvalidRequest('DAQ for experiment %s is already active in directory %s.' % (experimentName,oldDaqInfo.get('dataDirectory'))) experiment = self.dsExperimentApi.getExperimentByName(experimentName) storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) startTime = time.time() daqInfo['daqStartTime'] = startTime experiment['daqInfo'] = daqInfo self.logger.debug('Starting DAQ %s' % daqInfo) FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) ExperimentTracker.getInstance().put(experimentName, experiment) return experiment def stopDaq(self, experimentName): experiment = ExperimentTracker.getInstance().get(experimentName) if experiment is not None: daqInfo = experiment.get('daqInfo') if experiment is None or daqInfo.get('daqEndTime') is not None: raise InvalidRequest('Experiment %s is not active.' % experimentName) dataDirectory = daqInfo.get('dataDirectory') daqInfo['daqEndTime'] = time.time() FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) return experiment def upload(self, daqInfo): experimentName = daqInfo.get('experimentName') experiment = self.dsExperimentApi.getExperimentByName(experimentName) experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) dataDirectory = daqInfo.get('dataDirectory') filePaths = OsUtility.findFiles(dataDirectory) fileProcessingManager = FileProcessingManager.getInstance() for filePath in filePaths: fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileProcessingManager.processFile(fileInfo) ExperimentTracker.getInstance().put(experimentName, experiment) return experiment