Skip to content
Snippets Groups Projects
Forked from DM / dm-docs
261 commits behind, 634 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
experimentSessionControllerImpl.py 14.77 KiB
#!/usr/bin/env python

#
# Implementation for experiment session controller.
#

import os
import time
import uuid
import copy
import threading 

from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.directoryUploadInfo import DirectoryUploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.dictUtility import DictUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
from fileSystemObserver import FileSystemObserver

class ExperimentSessionControllerImpl(DmObjectManager):
    """ Experiment session controller implementation class. """

    UPLOAD_DELAY_IN_SECONDS = 1.0
    DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0

    def __init__(self):
        DmObjectManager.__init__(self)
        self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()

    def startDaq(self, experimentName, dataDirectory, daqInfo):
        FileSystemObserver.getInstance().createDirectory(dataDirectory)
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
        storageDirectory = experiment.get('storageDirectory')
        if storageDirectory is None:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo)
        FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
        return daqInfo

    def stopDaq(self, experimentName, dataDirectory):
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
        daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)
        FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
        daqInfo.updateStatus()
        return daqInfo

    def getDaqInfo(self, id):
        daqInfo = DaqTracker.getInstance().getDaqInfo(id)
        if not daqInfo:
            raise ObjectNotFound('Daq id %s not found.' % id)
        daqInfo.updateStatus()
        return daqInfo

    def uploadFiles(self, experimentName, dataDirectory, daqInfo):
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
        UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)

        experiment['daqInfo'] = daqInfo
        storageDirectory = experiment.get('storageDirectory')
        if storageDirectory is None:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
        fileProcessingManager = FileProcessingManager.getInstance()
        uploadId = str(uuid.uuid4())
        self.logger.debug('Starting upload id %s' % uploadId)
        uploadInfo = UploadInfo(daqInfo)
        uploadInfo['id'] = uploadId
        uploadInfo['experimentName'] = experimentName
        uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
        uploadInfo['storageHost'] = experiment.get('storageHost')
        uploadInfo['storageUrl'] = experiment.get('storageUrl')
        uploadInfo['dataDirectory'] = dataDirectory
        uploadInfo['nProcessedFiles'] = 0
        uploadInfo['nProcessingErrors'] = 0

        startTime = time.time()
        uploadInfo['startTime'] = startTime
        uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
        daqInfo['experimentName'] = experimentName
        daqInfo['storageDirectory'] = experiment.get('storageDirectory')
        daqInfo['storageHost'] = experiment.get('storageHost')
        daqInfo['storageUrl'] = experiment.get('storageUrl')
        daqInfo['dataDirectory'] = dataDirectory
        daqInfo['uploadId'] = uploadId

        skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') 
        if len(skipPlugins):
            skipPlugins = skipPlugins.split(',')
            uploadInfo['skipPlugins'] = skipPlugins
        else:
            skipPlugins = []

        # Check that there is at least one processor that can process files
        processorList = []
        for processorKey in fileProcessingManager.fileProcessorKeyList:
            processor = fileProcessingManager.fileProcessorDict.get(processorKey)
            processorName = processor.name
            if processorName not in skipPlugins:
                processorList.append(processor)

        if not len(processorList):
            raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory)

        # Remove hidden files
        self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
        filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
        # Check which files need to be processed
        filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
        if not len(filePathsDict):
            raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)

        UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
        uploadInfo['nFiles'] = len(filePathsDict)
        uploadInfo['status'] = 'running'
        self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
        timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment, filePathsDict])
        timer.start()
        return uploadInfo

    def prepareUploadFiles(self, uploadInfo, daqInfo, experiment, filePathsDict):
        uploadId = uploadInfo.get('id')
        self.logger.debug('Preparing upload id: %s' % uploadId)
        dataDirectory = uploadInfo.get('dataDirectory')
        fileProcessingManager = FileProcessingManager.getInstance()
        nProcessedFiles = 0
        nFiles = len(filePathsDict)
        for (filePath,filePathDict) in filePathsDict.items():
            fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
            fileInfo.update(filePathDict)
            fileInfo['daqInfo'] = daqInfo
            fileInfo['uploadId'] = uploadId
            fileInfo['statusMonitor'] = uploadInfo
            fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', [])
            try: 
                if uploadInfo.get('status') != 'aborting':
                    fileProcessingManager.processFile(fileInfo)
                    nProcessedFiles += 1
                else:
                    nCancelledFiles = nFiles - nProcessedFiles
                    uploadInfo.uploadAborted(nCancelledFiles)
                    self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
                    break
            except Exception, ex:
                self.logger.error('Processing error: %s', ex)
        self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict)))

    def getUploadInfo(self, id):
        uploadInfo = UploadTracker.getInstance().get(id)
        if not uploadInfo:
            raise ObjectNotFound('Upload id %s not found.' % id)
        uploadInfo.updateStatus()
        return uploadInfo

    def stopUpload(self, id):
        uploadInfo = UploadTracker.getInstance().get(id)
        if not uploadInfo:
            raise ObjectNotFound('Upload id %s not found.' % id)
        uploadInfo['status'] = 'aborting'
        uploadInfo.updateStatus()
        return uploadInfo

    def uploadDirectory(self, experimentName, dataDirectory, daqInfo):
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
        UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)

        experiment['daqInfo'] = daqInfo
        storageDirectory = experiment.get('storageDirectory')
        if storageDirectory is None:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
        uploadId = str(uuid.uuid4())
        self.logger.debug('Starting upload id %s' % uploadId)
        uploadInfo = DirectoryUploadInfo(daqInfo)
        uploadInfo['id'] = uploadId
        uploadInfo['experimentName'] = experimentName
        uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
        uploadInfo['storageHost'] = experiment.get('storageHost')
        uploadInfo['storageUrl'] = experiment.get('storageUrl')
        uploadInfo['dataDirectory'] = dataDirectory

        startTime = time.time()
        uploadInfo['startTime'] = startTime
        uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
        daqInfo['experimentName'] = experimentName
        daqInfo['storageDirectory'] = experiment.get('storageDirectory')
        daqInfo['storageHost'] = experiment.get('storageHost')
        daqInfo['storageUrl'] = experiment.get('storageUrl')
        daqInfo['dataDirectory'] = dataDirectory
        daqInfo['uploadId'] = uploadId

        skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') 
        if len(skipPlugins):
            skipPlugins = skipPlugins.split(',')
            uploadInfo['skipPlugins'] = skipPlugins
        else:
            skipPlugins = []
        fileProcessingManager = FileProcessingManager.getInstance()
        processingInfo = {}
        uploadInfo['processingInfo'] = processingInfo
        for processorKey in fileProcessingManager.fileProcessorKeyList:
            processor = fileProcessingManager.fileProcessorDict.get(processorKey)
            processorName = processor.name
            if processorName in skipPlugins:
                processingInfo[processorName] = {'status' : 'skipped'}
            else:
                self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory))
                timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict])
                processingInfo[processorName] = {'status' : 'pending'}
                timer.start()

        UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
        uploadInfo['nFiles'] = len(filePathsDict)
        uploadInfo['status'] = 'running'
        return uploadInfo

    def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict):
        uploadId = uploadInfo.get('id')
        dataDirectory = uploadInfo.get('dataDirectory')
        processorName = processor.name
        processingInfo = uploadInfo.get('processingInfo')
        self.logger.debug('Starting %s processing for upload %s by %s' % (dataDirectory, uploadId, processorName)) 
        try:
            dependsOn = processor.dependsOn
            while True:
                # Check status
                if uploadInfo['status'] == 'aborting':
                    processingInfo[processorName]['status'] = 'aborted'
                    return

                # Check that processor can proceed
                canProcess = False
                if not len(dependsOn):
                    canProcess = True
                for depProcessorName in dependsOn:
                    depProcessorStatus = processingInfo.get(depProcessorName).get('status')
                    if depProcessorStatus in ['skipped', 'aborted', 'failed']:
                        # We must skip processing
                        self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) 
                        processingInfo[processorName]['status'] = 'skipped'
                        return
                    elif depProcessorStatus in ['pending', 'running']:
                        # Do nothing
                        pass
                    elif depProcessorStatus == 'done':
                        # We can proceed
                        canProcess = True
                    else:
                        # This should not happen
                        self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) 
                        processingInfo[processorName]['status'] = 'skipped'
                        return
                # Process directory if we can
                if canProcess:
                    directoryInfo = {'uploadInfo' : uploadInfo, 
                        'daqInfo' : daqInfo,
                        'experiment' : experiment,
                        'filePathsDict' : filePathsDict
                    }
                    processingInfo[processorName]['status'] = 'running'
                    processingStartTime = time.time()
                    processor.processDirectory(directoryInfo)
                    if processingInfo[processorName]['status'] == 'running':
                        processingInfo[processorName]['status'] = 'done'
                        self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName)) 
                    else:
                        self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status'])) 
                    break

                # Wait a bit longer
                time.sleep(self.DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS)

        except Exception, ex:
            self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex))) 
            processingInfo[processorName]['status'] = 'failed'
            processingInfo[processorName]['processingError'] = str(ex)

        processingEndTime = time.time()
        processingInfo[processorName]['processingEndTime'] = processingEndTime
        processingInfo[processorName]['processingStartTime'] = processingStartTime
        processingInfo[processorName]['processingRunTime'] = processingEndTime-processingStartTime


    def getProcessingPlugins(self):
        pluginList = []
        fileProcessingManager = FileProcessingManager.getInstance()
        for processorKey in fileProcessingManager.fileProcessorKeyList:
            processor = fileProcessingManager.fileProcessorDict.get(processorKey)
            pluginInfo = {'name' : processor.name, 'dependsOn' : processor.dependsOn}
            pluginList.append(PluginInfo(pluginInfo))
        return pluginList