#!/usr/bin/env python # # Implementation for experiment session controller. # import os import time import uuid import copy from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.exceptions.objectNotFound import ObjectNotFound from dm.common.objects.observedFile import ObservedFile from dm.common.objects.uploadInfo import UploadInfo from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.utility.fileUtility import FileUtility from dm.common.utility.timeUtility import TimeUtility from dm.common.utility.dictUtility import DictUtility from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory from experimentTracker import ExperimentTracker from uploadTracker import UploadTracker from daqTracker import DaqTracker from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ def __init__(self): DmObjectManager.__init__(self) self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() def startDaq(self, experimentName, dataDirectory, daqInfo): FileSystemObserver.getInstance().createDirectory(dataDirectory) experiment = self.dsExperimentApi.getExperimentByName(experimentName) storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo) FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) return daqInfo def stopDaq(self, experimentName, dataDirectory): experiment = self.dsExperimentApi.getExperimentByName(experimentName) daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory) FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) return daqInfo.scrub() def getDaqInfo(self, id): daqInfo = DaqTracker.getInstance().getDaqInfo(id) if not daqInfo: raise ObjectNotFound('Daq id %s not found.' % id) daqInfo.updateStatus() return daqInfo.scrub() def upload(self, experimentName, dataDirectory, daqInfo): experiment = self.dsExperimentApi.getExperimentByName(experimentName) experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory) fileProcessingManager = FileProcessingManager.getInstance() uploadId = str(uuid.uuid4()) self.logger.debug('Starting upload id %s' % uploadId) uploadInfo = UploadInfo(daqInfo) uploadInfo['id'] = uploadId uploadInfo['experimentName'] = experimentName uploadInfo['storageDirectory'] = experiment.get('storageDirectory') uploadInfo['storageHost'] = experiment.get('storageHost') uploadInfo['dataDirectory'] = dataDirectory startTime = time.time() uploadInfo['startTime'] = startTime uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimeStamp(startTime) daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') daqInfo['dataDirectory'] = dataDirectory daqInfo['uploadId'] = uploadId fileDict = {} for filePath in filePaths: fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileUploadInfo = { 'processed' : False } FileUtility.statFile(filePath, fileUploadInfo) fileDict[filePath] = fileUploadInfo fileProcessingManager.processFile(fileInfo) uploadInfo['fileDict'] = fileDict #self.logger.debug('Upload info %s' % uploadInfo) UploadTracker.getInstance().put(uploadId, uploadInfo) return uploadInfo.scrub() def getUploadInfo(self, id): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) uploadInfo.updateStatus() return uploadInfo.scrub()