Skip to content
Snippets Groups Projects
experimentSessionControllerImpl.py 5.33 KiB
Newer Older
#!/usr/bin/env python

#
# Implementation for experiment session controller.
#

import uuid

from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from fileSystemObserver import FileSystemObserver

class ExperimentSessionControllerImpl(DmObjectManager):
    """ Experiment session controller implementation class. """

    def __init__(self):
        DmObjectManager.__init__(self)
        self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
    def startDaq(self, experimentName, dataDirectory, daqInfo):
        FileSystemObserver.getInstance().createDirectory(dataDirectory)
            daqInfo={}
	    daqInfo['experimentName'] = experimentName
	    daqInfo['dataDirectory'] = dataDirectory
        experiment = ExperimentTracker.getInstance().get(experimentName)
        if experiment is not None:
            oldDaqInfo = experiment.get('daqInfo')
            if oldDaqInfo.get('daqEndTime') is None:
                raise InvalidRequest('DAQ for experiment %s is already active in directory %s.' % (experimentName,oldDaqInfo.get('dataDirectory')))

        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
        storageDirectory = experiment.get('storageDirectory')
        if storageDirectory is None:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        daqInfo['daqStartTime'] = startTime
        experiment['daqInfo'] = daqInfo
        self.logger.debug('Starting DAQ %s' % daqInfo)
        FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
        ExperimentTracker.getInstance().put(experimentName, experiment)
    def stopDaq(self, experimentName):
        experiment = ExperimentTracker.getInstance().get(experimentName)
        if experiment is not None:
            daqInfo = experiment.get('daqInfo')
        if experiment is None or daqInfo.get('daqEndTime') is not None:
            raise InvalidRequest('Experiment %s is not active.' % experimentName)
        dataDirectory = daqInfo.get('dataDirectory')
        daqInfo['daqEndTime'] = time.time()
        FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
    def upload(self, daqInfo):
        experimentName = daqInfo.get('experimentName')
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
        experiment['daqInfo'] = daqInfo
        storageDirectory = experiment.get('storageDirectory')
        if storageDirectory is None:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        dataDirectory = daqInfo.get('dataDirectory')
        filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory)
        fileProcessingManager = FileProcessingManager.getInstance()
        uploadId = str(uuid.uuid4())
        self.logger.debug('Starting upload id %s' % uploadId)
        uploadInfo = UploadInfo()
        uploadInfo['id'] = uploadId
        uploadInfo['experiment'] = experimentName
        uploadInfo['dataDirectory'] = dataDirectory
        fileDict = {}
        for filePath in filePaths:
            fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
            fileInfo['uploadId'] = uploadId
            fileUploadInfo = { 'processed' : False }
            FileUtility.statFile(filePath, fileUploadInfo)
            fileDict[filePath] = fileUploadInfo
            fileProcessingManager.processFile(fileInfo)
        uploadInfo['fileDict'] = fileDict
        #self.logger.debug('Upload info %s' % uploadInfo)
        ExperimentTracker.getInstance().put(experimentName, experiment)
        UploadTracker.getInstance().put(uploadId, uploadInfo)
        return uploadInfo

    def getUploadInfo(self, id):
        uploadInfo = UploadTracker.getInstance().get(id)
        if not uploadInfo:
            raise ObjectNotFound('Upload id %s not found.' % id)
        uploadStatus = uploadInfo.get('status', 'running')
        if uploadStatus == 'complete':
            return uploadInfo
        fileDict = uploadInfo.get('fileDict')
        nFiles = len(fileDict)
        nProcessedFiles = 0
        for (filePath,uploadFileInfo) in fileDict.items():
            if uploadFileInfo.get('processed'):
                nProcessedFiles += 1

        # need to handle 'failed' uploads
        if nProcessedFiles == nFiles:
            uploadStatus = 'complete'
        uploadInfo['status'] = uploadStatus
        uploadInfo['nProcessedFiles'] = '%s/%s' % (nProcessedFiles,nFiles)
        percentageComplete = 100.0
        if nFiles > 0:
            percentageComplete = float(nProcessedFiles)/float(nFiles)*100.0 
        uploadInfo['percentageComplete'] = '%.2f' % percentageComplete
        return uploadInfo