Skip to content
Snippets Groups Projects
experimentSessionControllerImpl.py 6.86 KiB
Newer Older
#!/usr/bin/env python

#
# Implementation for experiment session controller.
#

import uuid

from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.dictUtility import DictUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
from fileSystemObserver import FileSystemObserver

class ExperimentSessionControllerImpl(DmObjectManager):
    """ Experiment session controller implementation class. """

    def __init__(self):
        DmObjectManager.__init__(self)
        self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
    def startDaq(self, experimentName, dataDirectory, daqInfo):
        FileSystemObserver.getInstance().createDirectory(dataDirectory)
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
        storageDirectory = experiment.get('storageDirectory')
        if storageDirectory is None:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo)
        FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
    def stopDaq(self, experimentName, dataDirectory, includeFileDetails=False):
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
        daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)
        FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
        daqInfo.updateStatus()
    def getDaqInfo(self, id, includeFileDetails=False):
        daqInfo = DaqTracker.getInstance().getDaqInfo(id)
        if not daqInfo:
            raise ObjectNotFound('Daq id %s not found.' % id)
        daqInfo.updateStatus()
    def upload(self, experimentName, dataDirectory, daqInfo, includeFileDetails=False):
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
sveseli's avatar
sveseli committed
        UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)

        experiment['daqInfo'] = daqInfo
        storageDirectory = experiment.get('storageDirectory')
        if storageDirectory is None:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
        fileProcessingManager = FileProcessingManager.getInstance()
        uploadId = str(uuid.uuid4())
        self.logger.debug('Starting upload id %s' % uploadId)
        uploadInfo = UploadInfo(daqInfo)
        uploadInfo['id'] = uploadId
        uploadInfo['experimentName'] = experimentName
        uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
        uploadInfo['storageHost'] = experiment.get('storageHost')
        uploadInfo['storageUrl'] = experiment.get('storageUrl')
        uploadInfo['dataDirectory'] = dataDirectory
        uploadInfo['nProcessedFiles'] = 0
        uploadInfo['nProcessingErrors'] = 0

        startTime = time.time()
        uploadInfo['startTime'] = startTime
        uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
        daqInfo['experimentName'] = experimentName
        daqInfo['storageDirectory'] = experiment.get('storageDirectory')
        daqInfo['storageHost'] = experiment.get('storageHost')
        daqInfo['storageUrl'] = experiment.get('storageUrl')
        daqInfo['dataDirectory'] = dataDirectory
        daqInfo['uploadId'] = uploadId

        self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
        filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
        # Check which files need to be processed
        filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
        if not len(filePathsDict):
            raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)

sveseli's avatar
sveseli committed
        UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
        uploadInfo['nFiles'] = len(filePathsDict)
        self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
        timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict])
        timer.start()
        return uploadInfo

    def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict):
        uploadId = uploadInfo.get('id')
        self.logger.debug('Preparing upload id: %s' % uploadId)
        dataDirectory = uploadInfo.get('dataDirectory')
        fileProcessingManager = FileProcessingManager.getInstance()
        nProcessedFiles = 0
        nFiles = len(filePathsDict)
        for (filePath,filePathDict) in filePathsDict.items():
            fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
            fileInfo['uploadId'] = uploadId
            fileInfo['statusMonitor'] = uploadInfo
                if uploadInfo.get('status') != 'aborting':
                    fileProcessingManager.processFile(fileInfo)
                    nProcessedFiles += 1
                else:
                    nCancelledFiles = nFiles - nProcessedFiles
                    uploadInfo.uploadAborted(nCancelledFiles)
                    self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
                    break
            except Exception, ex:
                self.logger.error('Processing error: %s', ex)
        self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict)))
        uploadInfo = UploadTracker.getInstance().get(id)
        if not uploadInfo:
            raise ObjectNotFound('Upload id %s not found.' % id)
    def stopUpload(self, id):
        uploadInfo = UploadTracker.getInstance().get(id)
        if not uploadInfo:
            raise ObjectNotFound('Upload id %s not found.' % id)
        uploadInfo['status'] = 'aborting'
        uploadInfo.updateStatus()
        return uploadInfo