Skip to content
Snippets Groups Projects
experimentSessionControllerImpl.py 22 KiB
Newer Older
#!/usr/bin/env python

#
# Implementation for experiment session controller.
#

import uuid
from dm.common.constants import dmProcessingStatus
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.directoryUploadInfo import DirectoryUploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.dictUtility import DictUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
from fileSystemObserver import FileSystemObserver

class ExperimentSessionControllerImpl(DmObjectManager):
    """ Experiment session controller implementation class. """

    UPLOAD_CHUNK_SIZE_IN_FILES = 100
    UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0
    DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0
    def __init__(self):
        DmObjectManager.__init__(self)
        self.dsExperimentApi = DsRestApiFactory.getExperimentDsApi()
    def startDaq(self, experimentName, dataDirectory, daqInfo):
        FileSystemObserver.getInstance().createDirectory(dataDirectory)
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
        storageDirectory = experiment.get('storageDirectory')
        if storageDirectory is None:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo)
        FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
        maxRunTimeInHours = daqInfo.get('maxRunTimeInHours')
        if maxRunTimeInHours:
            daqId = daqInfo['id']
            self.logger.debug('Starting timer to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours' % (daqId, experimentName, maxRunTimeInHours))
            maxRunTimeInSeconds = maxRunTimeInHours*self.SECONDS_PER_HOUR
            timer = threading.Timer(maxRunTimeInSeconds, self.stopDaqTimer, args=[experimentName, dataDirectory, daqId])
            timer.start()

    def stopDaqTimer(self, experimentName, dataDirectory, daqId):
        try:
            daqInfo = DaqTracker.getInstance().getDaqInfo(daqId)
            maxRunTimeInHours = daqInfo.get('maxRunTimeInHours')
            self.logger.debug('Attempting to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours was exceeded' % (daqId, experimentName, maxRunTimeInHours))
            daqStatus = daqInfo.get('status')
            if daqStatus != dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING:
               self.logger.debug('DAQ id %s has status of %s, will not stop it automatically' % (daqId, daqStatus))
               return
            self.stopDaq(experimentName, dataDirectory)
        except Exception, ex:
            self.logger.error('Could not automatically stop DAQ id %s: %s' % (daqId, str(ex)))

        
    def stopDaq(self, experimentName, dataDirectory):
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
        daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)
        FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
        daqInfo.updateStatus()
        self.logger.debug('Stopped DAQ id %s for experiment %s' % (daqId, experimentName))

        # Prepare upload on exit
        uploadDataDirectoryOnExit = daqInfo.get('uploadDataDirectoryOnExit')
        if uploadDataDirectoryOnExit:
            self.logger.debug('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName))
            daqInfo2 = {}
            daqInfo2['originalDaqId'] = daqId
            uploadDestDirectoryOnExit = daqInfo.get('uploadDestDirectoryOnExit')
            if uploadDestDirectoryOnExit:
                self.logger.debug('Automatic upload on exit for DAQ id %s (experiment %s) is using dest directory: %s' % (daqId, experimentName, uploadDestDirectoryOnExit))
                daqInfo2['destDirectory'] = uploadDestDirectoryOnExit
       
            try:
                uploadInfo = self.uploadFiles(experimentName, uploadDataDirectoryOnExit, daqInfo2)
                daqInfo['uploadIdOnExit'] = uploadInfo.get('id')
            except Exception, ex:
                self.logger.error('Could not automatically upload DAQ id %s: %s' % (daqId, str(ex)))
                daqInfo['uploadErrorOnExit'] = str(ex)
 
    def getDaqInfo(self, id):
        daqInfo = DaqTracker.getInstance().getDaqInfo(id)
        if not daqInfo:
            raise ObjectNotFound('Daq id %s not found.' % id)
        daqInfo.updateStatus()
    def listDaqs(self, status):
        daqInfoList = DaqTracker.getInstance().getDaqInfos(status)
        return daqInfoList

    def uploadFiles(self, experimentName, dataDirectory, daqInfo):
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
sveseli's avatar
sveseli committed
        UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)

        experiment['daqInfo'] = daqInfo
        storageDirectory = experiment.get('storageDirectory')
        if storageDirectory is None:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        fileProcessingManager = FileProcessingManager.getInstance()
        uploadId = str(uuid.uuid4())
        self.logger.debug('Starting upload id %s' % uploadId)
        uploadInfo = UploadInfo(daqInfo)
        uploadInfo['id'] = uploadId
        uploadInfo['experimentName'] = experimentName
        uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
        uploadInfo['storageHost'] = experiment.get('storageHost')
        uploadInfo['storageUrl'] = experiment.get('storageUrl')
        uploadInfo['dataDirectory'] = dataDirectory
        uploadInfo['nProcessedFiles'] = 0
        uploadInfo['nProcessingErrors'] = 0

        startTime = time.time()
        uploadInfo['startTime'] = startTime
        uploadInfo['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
        daqInfo['experimentName'] = experimentName
        daqInfo['storageDirectory'] = experiment.get('storageDirectory')
        daqInfo['storageHost'] = experiment.get('storageHost')
        daqInfo['storageUrl'] = experiment.get('storageUrl')
        daqInfo['dataDirectory'] = dataDirectory
        daqInfo['uploadId'] = uploadId

        skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') 
        if len(skipPlugins):
            skipPlugins = skipPlugins.split(',')
            uploadInfo['skipPlugins'] = skipPlugins
        else:
            skipPlugins = []

        # Check that there is at least one processor that can process files
        processorList = []
        for processorKey in fileProcessingManager.fileProcessorKeyList:
            processor = fileProcessingManager.fileProcessorDict.get(processorKey)
            processorName = processor.name
            if processorName not in skipPlugins:
                processorList.append(processor)

        if not len(processorList):
            raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory)

sveseli's avatar
sveseli committed
        UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
        uploadInfo['nFiles'] = 0
        uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
        self.logger.debug('Starting upload timer for %s' % dataDirectory)

        timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment])
    def prepareUploadFiles(self, uploadInfo, daqInfo, experiment):
        self.logger.debug('Preparing upload id: %s' % uploadId)
        dataDirectory = uploadInfo.get('dataDirectory')
        destDirectory = uploadInfo.get('destDirectory')
        fileProcessingManager = FileProcessingManager.getInstance()
sveseli's avatar
sveseli committed
            self.logger.debug('Retrieving file paths for %s' % dataDirectory)
            filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)

            # Remove hidden files
            self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
            filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)

            # Check which files need to be processed
            filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)

            if not len(filePathsDict):
                raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
        except Exception, ex:
            self.logger.error('Processing error for upload %s: %s' % (uploadId, str(ex)))
            uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
            uploadInfo['errorMessage'] = str(ex)
            return

        uploadInfo['nFiles'] = len(filePathsDict)
        uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
        self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
        nProcessedFiles = 0
        nFiles = len(filePathsDict)
        for (filePath,filePathDict) in filePathsDict.items():
                # Only create new uploads if we have less than
                # UPLOAD_CHUNK_SIZE_IN_FILES waiting to be completed
                while True:
                    status = uploadInfo.get('status')
                    if status == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
                        nCancelledFiles = nFiles - nProcessedFiles
                        uploadInfo.uploadAborted(nCancelledFiles)
                        self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
                        return
                    nCompletedFiles = uploadInfo.get('nCompletedFiles', 0)
                    nQueuedFiles = fileProcessingManager.getNumberOfQueuedFilesByOwner(uploadId)
                
                    if nQueuedFiles < self.UPLOAD_CHUNK_SIZE_IN_FILES:
                        # We need to add more files for upload
                        break
                    self.logger.debug('Upload %s has %s completed files, %s queued files waiting for upload, will not add any more for %s seconds; upload timer has processed %s out of %s files' % (uploadId, nCompletedFiles, nQueuedFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS, nProcessedFiles, nFiles))
                    time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)

                fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory)
                fileInfo.update(filePathDict)
                fileInfo['daqInfo'] = daqInfo
                fileInfo['uploadId'] = uploadId
                fileInfo['statusMonitor'] = uploadInfo
                fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', [])
                fileInfo['owner'] = uploadId
                fileProcessingManager.queueFile(fileInfo)
            except Exception, ex:
                self.logger.error('Processing error: %s', ex)
sveseli's avatar
sveseli committed
        nCompletedFiles = uploadInfo.get('nCompletedFiles', 0)
        nQueuedFiles = fileProcessingManager.getNumberOfQueuedFilesByOwner(uploadId)
        self.logger.debug('Done preparing upload %s; it has a total of %s files (%s completed and %s queued)' % (uploadId, nFiles, nCompletedFiles, nQueuedFiles))
        uploadInfo = UploadTracker.getInstance().get(id)
        if not uploadInfo:
            raise ObjectNotFound('Upload id %s not found.' % id)
    def listUploads(self, status):
        uploadInfoList = UploadTracker.getInstance().getUploadInfos(status)
        return uploadInfoList

    def stopUpload(self, id):
        uploadInfo = UploadTracker.getInstance().get(id)
        if not uploadInfo:
            raise ObjectNotFound('Upload id %s not found.' % id)
        uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING
    def uploadDirectory(self, experimentName, dataDirectory, daqInfo):
        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
        UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)

        experiment['daqInfo'] = daqInfo
        storageDirectory = experiment.get('storageDirectory')
        if storageDirectory is None:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        uploadId = str(uuid.uuid4())
        self.logger.debug('Starting upload id %s' % uploadId)
        uploadInfo = DirectoryUploadInfo(daqInfo)
        uploadInfo['id'] = uploadId
        uploadInfo['experimentName'] = experimentName
        uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
        uploadInfo['storageHost'] = experiment.get('storageHost')
        uploadInfo['storageUrl'] = experiment.get('storageUrl')
        uploadInfo['dataDirectory'] = dataDirectory

        startTime = time.time()
        uploadInfo['startTime'] = startTime
        uploadInfo['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
        daqInfo['experimentName'] = experimentName
        daqInfo['storageDirectory'] = experiment.get('storageDirectory')
        daqInfo['storageHost'] = experiment.get('storageHost')
        daqInfo['storageUrl'] = experiment.get('storageUrl')
        daqInfo['dataDirectory'] = dataDirectory
        daqInfo['uploadId'] = uploadId

        skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') 
        if len(skipPlugins):
            skipPlugins = skipPlugins.split(',')
            uploadInfo['skipPlugins'] = skipPlugins

        UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
        uploadInfo['nFiles'] = 0
        uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
sveseli's avatar
sveseli committed

        fileProcessingManager = FileProcessingManager.getInstance()
        processingInfo = {}
        uploadInfo['processingInfo'] = processingInfo
        for processorKey in fileProcessingManager.fileProcessorKeyList:
            processor = fileProcessingManager.fileProcessorDict.get(processorKey)
            processorName = processor.name
            if processorName in skipPlugins:
                processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED}
            else:
                processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_PENDING}


        self.logger.debug('Starting upload directory %s timer for experiment %s' % (dataDirectory, experimentName))
        timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadDirectory, args=[uploadInfo, daqInfo, experiment])
        timer.start()
        return uploadInfo

    def prepareUploadDirectory(self, uploadInfo, daqInfo, experiment):
        uploadId = uploadInfo['id']
        dataDirectory = uploadInfo['dataDirectory'] 
        experimentName = uploadInfo['experimentName'] 
        skipPlugins = uploadInfo.get('skipPlugins', [])

        self.logger.debug('Preparing directory %s upload for experiment %s' % (dataDirectory, experimentName))

        try:
            filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
            self.logger.debug('There are %s files in directory %s (experiment %s)' % (len(filePathsDict), dataDirectory, experimentName))
        except Exception, ex:
            self.logger.error('Cannot retrieve files for directory upload %s: %s' % (uploadId, str(ex)))
            self.logger.error('Marking directory upload %s as failed' % (uploadId))
            uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
            uploadInfo['errorMessage'] = str(ex)
            return

        fileProcessingManager = FileProcessingManager.getInstance()
        self.logger.debug('Preparing plugin timers for directory %s upload (experiment %s)' % (dataDirectory, experimentName))
        for processorKey in fileProcessingManager.fileProcessorKeyList:
            processor = fileProcessingManager.fileProcessorDict.get(processorKey)
            processorName = processor.name
sveseli's avatar
sveseli committed
            if not processorName in skipPlugins:
                self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory))
                timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict])
                timer.start()

        uploadInfo['nFiles'] = len(filePathsDict)
        uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
    def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict):
        uploadId = uploadInfo.get('id')
        dataDirectory = uploadInfo.get('dataDirectory')
        processorName = processor.name
        processingInfo = uploadInfo.get('processingInfo')
        self.logger.debug('Starting %s processing for upload %s by %s' % (dataDirectory, uploadId, processorName)) 
        try:
            dependsOn = processor.dependsOn
            while True:
                # Check status
                if uploadInfo['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
                    processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED
                    return

                # Check that processor can proceed
                canProcess = False
                if not len(dependsOn):
                    canProcess = True
                for depProcessorName in dependsOn:
                    depProcessorStatus = processingInfo.get(depProcessorName).get('status')
sveseli's avatar
sveseli committed
                    if depProcessorStatus in [
                            dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED,
                            dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED, 
                            dmProcessingStatus.DM_PROCESSING_STATUS_FAILED]:
                        # We must skip processing
                        self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) 
                        processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED
                    elif depProcessorStatus in [dmProcessingStatus.DM_PROCESSING_STATUS_PENDING, dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING]:
                        # Do nothing
                        pass
sveseli's avatar
sveseli committed
                    elif depProcessorStatus == dmProcessingStatus.DM_PROCESSING_STATUS_DONE:
                        # We can proceed
                        canProcess = True
                    else:
                        # This should not happen
                        self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) 
                        processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED
                        return
                # Process directory if we can
                if canProcess:
                    directoryInfo = {'uploadInfo' : uploadInfo, 
                        'daqInfo' : daqInfo,
                        'experiment' : experiment,
                        'filePathsDict' : filePathsDict
                    }
                    processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
                    processingStartTime = time.time()
                    processor.processDirectory(directoryInfo)
                    if processingInfo[processorName]['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING:
                        processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
                        self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName)) 
                    else:
                        self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status'])) 
                    break

                # Wait a bit longer
                time.sleep(self.DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS)

        except Exception, ex:
            self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex))) 
            processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
            processingInfo[processorName]['processingError'] = str(ex)

        processingEndTime = time.time()
        processingInfo[processorName]['processingEndTime'] = processingEndTime
        processingInfo[processorName]['processingStartTime'] = processingStartTime
        processingInfo[processorName]['processingRunTime'] = processingEndTime-processingStartTime


    def getProcessingPlugins(self):
        pluginList = []
        fileProcessingManager = FileProcessingManager.getInstance()
        for processorKey in fileProcessingManager.fileProcessorKeyList:
            processor = fileProcessingManager.fileProcessorDict.get(processorKey)
            pluginInfo = {'name' : processor.name, 'dependsOn' : processor.dependsOn}
            pluginList.append(PluginInfo(pluginInfo))
        return pluginList