#!/usr/bin/env python # # Implementation for experiment session controller. # import os import time import uuid from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.exceptions.objectNotFound import ObjectNotFound from dm.common.objects.observedFile import ObservedFile from dm.common.objects.uploadInfo import UploadInfo from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.utility.fileUtility import FileUtility from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory from experimentTracker import ExperimentTracker from uploadTracker import UploadTracker from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ def __init__(self): DmObjectManager.__init__(self) self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() def startDaq(self, experimentName, dataDirectory, daqInfo): FileSystemObserver.getInstance().createDirectory(dataDirectory) if daqInfo is None: daqInfo={} daqInfo['experimentName'] = experimentName daqInfo['dataDirectory'] = dataDirectory experiment = ExperimentTracker.getInstance().get(experimentName) if experiment is not None: oldDaqInfo = experiment.get('daqInfo') if oldDaqInfo.get('daqEndTime') is None: raise InvalidRequest('DAQ for experiment %s is already active in directory %s.' % (experimentName,oldDaqInfo.get('dataDirectory'))) experiment = self.dsExperimentApi.getExperimentByName(experimentName) storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) startTime = time.time() daqInfo['daqStartTime'] = startTime experiment['daqInfo'] = daqInfo self.logger.debug('Starting DAQ %s' % daqInfo) FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) ExperimentTracker.getInstance().put(experimentName, experiment) return experiment def stopDaq(self, experimentName): experiment = ExperimentTracker.getInstance().get(experimentName) if experiment is not None: daqInfo = experiment.get('daqInfo') if experiment is None or daqInfo.get('daqEndTime') is not None: raise InvalidRequest('Experiment %s is not active.' % experimentName) dataDirectory = daqInfo.get('dataDirectory') daqInfo['daqEndTime'] = time.time() FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) return experiment def upload(self, daqInfo): experimentName = daqInfo.get('experimentName') experiment = self.dsExperimentApi.getExperimentByName(experimentName) experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) dataDirectory = daqInfo.get('dataDirectory') filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory) fileProcessingManager = FileProcessingManager.getInstance() uploadId = str(uuid.uuid4()) self.logger.debug('Starting upload id %s' % uploadId) uploadInfo = UploadInfo() uploadInfo['id'] = uploadId uploadInfo['experiment'] = experimentName uploadInfo['dataDirectory'] = dataDirectory fileDict = {} for filePath in filePaths: fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo['uploadId'] = uploadId fileUploadInfo = { 'processed' : False } FileUtility.statFile(filePath, fileUploadInfo) fileDict[filePath] = fileUploadInfo fileProcessingManager.processFile(fileInfo) uploadInfo['fileDict'] = fileDict #self.logger.debug('Upload info %s' % uploadInfo) ExperimentTracker.getInstance().put(experimentName, experiment) UploadTracker.getInstance().put(uploadId, uploadInfo) return uploadInfo def getUploadInfo(self, id): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) uploadStatus = uploadInfo.get('status', 'running') if uploadStatus == 'complete': return uploadInfo fileDict = uploadInfo.get('fileDict') nFiles = len(fileDict) nProcessedFiles = 0 for (filePath,uploadFileInfo) in fileDict.items(): if uploadFileInfo.get('processed'): nProcessedFiles += 1 # need to handle 'failed' uploads if nProcessedFiles == nFiles: uploadStatus = 'complete' uploadInfo['status'] = uploadStatus uploadInfo['nProcessedFiles'] = '%s/%s' % (nProcessedFiles,nFiles) percentageComplete = 100.0 if nFiles > 0: percentageComplete = float(nProcessedFiles)/float(nFiles)*100.0 uploadInfo['percentageComplete'] = '%.2f' % percentageComplete return uploadInfo