#!/usr/bin/env python # # Implementation for experiment session controller. # import os import time import uuid import copy import threading from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.exceptions.objectNotFound import ObjectNotFound from dm.common.objects.observedFile import ObservedFile from dm.common.objects.uploadInfo import UploadInfo from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.utility.fileUtility import FileUtility from dm.common.utility.timeUtility import TimeUtility from dm.common.utility.dictUtility import DictUtility from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory from experimentTracker import ExperimentTracker from uploadTracker import UploadTracker from daqTracker import DaqTracker from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ UPLOAD_DELAY_IN_SECONDS = 1.0 def __init__(self): DmObjectManager.__init__(self) self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() def startDaq(self, experimentName, dataDirectory, daqInfo): FileSystemObserver.getInstance().createDirectory(dataDirectory) experiment = self.dsExperimentApi.getExperimentByName(experimentName) storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo) FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) return daqInfo def stopDaq(self, experimentName, dataDirectory, includeFileDetails=False): experiment = self.dsExperimentApi.getExperimentByName(experimentName) daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory) FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) daqInfo.updateStatus() return daqInfo.scrub(includeFileDetails) def getDaqInfo(self, id, includeFileDetails=False): daqInfo = DaqTracker.getInstance().getDaqInfo(id) if not daqInfo: raise ObjectNotFound('Daq id %s not found.' % id) daqInfo.updateStatus() return daqInfo.scrub(includeFileDetails) def upload(self, experimentName, dataDirectory, daqInfo, includeFileDetails=False): experiment = self.dsExperimentApi.getExperimentByName(experimentName) experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory) fileProcessingManager = FileProcessingManager.getInstance() uploadId = str(uuid.uuid4()) self.logger.debug('Starting upload id %s' % uploadId) uploadInfo = UploadInfo(daqInfo) uploadInfo['id'] = uploadId uploadInfo['experimentName'] = experimentName uploadInfo['storageDirectory'] = experiment.get('storageDirectory') uploadInfo['storageHost'] = experiment.get('storageHost') uploadInfo['storageUrl'] = experiment.get('storageUrl') uploadInfo['dataDirectory'] = dataDirectory startTime = time.time() uploadInfo['startTime'] = startTime uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimeStamp(startTime) daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') daqInfo['storageUrl'] = experiment.get('storageUrl') daqInfo['dataDirectory'] = dataDirectory daqInfo['uploadId'] = uploadId # Remove hidden files filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) # Check which files need to be processed filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo) if not len(filePathsDict): raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) fileDict = {} UploadTracker.getInstance().put(uploadId, uploadInfo) uploadInfo['fileDict'] = fileDict uploadInfo['nFiles'] = len(filePathsDict) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict]) timer.start() return uploadInfo def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict): uploadId = uploadInfo.get('id') self.logger.debug('Preparing upload id: %s', uploadId) fileDict = uploadInfo.get('fileDict') dataDirectory = uploadInfo.get('dataDirectory') for filePath in filePathsDict.keys(): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileUploadInfo = { 'processed' : False } fileDict[filePath] = fileUploadInfo #fileProcessingManager.processFile(fileInfo) self.logger.debug('Done preparing upload id: %s', uploadId) def getUploadInfo(self, id, includeFileDetails=False): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) uploadInfo.updateStatus() return uploadInfo.scrub(includeFileDetails)