#!/usr/bin/env python # # Implementation for experiment session controller. # import os import time import uuid import copy import threading from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.exceptions.objectNotFound import ObjectNotFound from dm.common.objects.observedFile import ObservedFile from dm.common.objects.uploadInfo import UploadInfo from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.utility.fileUtility import FileUtility from dm.common.utility.timeUtility import TimeUtility from dm.common.utility.dictUtility import DictUtility from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory from experimentTracker import ExperimentTracker from uploadTracker import UploadTracker from daqTracker import DaqTracker from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ UPLOAD_DELAY_IN_SECONDS = 1.0 def __init__(self): DmObjectManager.__init__(self) self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() def startDaq(self, experimentName, dataDirectory, daqInfo): FileSystemObserver.getInstance().createDirectory(dataDirectory) experiment = self.dsExperimentApi.getExperimentByName(experimentName) storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo) FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) return daqInfo def stopDaq(self, experimentName, dataDirectory, includeFileDetails=False): experiment = self.dsExperimentApi.getExperimentByName(experimentName) daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory) FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) daqInfo.updateStatus() return daqInfo def getDaqInfo(self, id, includeFileDetails=False): daqInfo = DaqTracker.getInstance().getDaqInfo(id) if not daqInfo: raise ObjectNotFound('Daq id %s not found.' % id) daqInfo.updateStatus() return daqInfo def upload(self, experimentName, dataDirectory, daqInfo, includeFileDetails=False): experiment = self.dsExperimentApi.getExperimentByName(experimentName) UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory) experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory) fileProcessingManager = FileProcessingManager.getInstance() uploadId = str(uuid.uuid4()) self.logger.debug('Starting upload id %s' % uploadId) uploadInfo = UploadInfo(daqInfo) uploadInfo['id'] = uploadId uploadInfo['experimentName'] = experimentName uploadInfo['storageDirectory'] = experiment.get('storageDirectory') uploadInfo['storageHost'] = experiment.get('storageHost') uploadInfo['storageUrl'] = experiment.get('storageUrl') uploadInfo['dataDirectory'] = dataDirectory uploadInfo['nProcessedFiles'] = 0 uploadInfo['nProcessingErrors'] = 0 startTime = time.time() uploadInfo['startTime'] = startTime uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime) daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') daqInfo['storageUrl'] = experiment.get('storageUrl') daqInfo['dataDirectory'] = dataDirectory daqInfo['uploadId'] = uploadId # Remove hidden files self.logger.debug('Checking %s processing candidates' % len(filePathsDict)) filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) # Check which files need to be processed filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo) if not len(filePathsDict): raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) UploadTracker.getInstance().startUpload(uploadId, uploadInfo) uploadInfo['nFiles'] = len(filePathsDict) self.logger.debug('Will prepare upload of %s files' % len(filePathsDict)) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict]) timer.start() return uploadInfo def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict): uploadId = uploadInfo.get('id') self.logger.debug('Preparing upload id: %s' % uploadId) dataDirectory = uploadInfo.get('dataDirectory') fileProcessingManager = FileProcessingManager.getInstance() nProcessedFiles = 0 nFiles = len(filePathsDict) for (filePath,filePathDict) in filePathsDict.items(): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo.update(filePathDict) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileInfo['statusMonitor'] = uploadInfo try: if uploadInfo.get('status') != 'aborting': fileProcessingManager.processFile(fileInfo) nProcessedFiles += 1 else: nCancelledFiles = nFiles - nProcessedFiles uploadInfo.uploadAborted(nCancelledFiles) self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles)) break except Exception, ex: self.logger.error('Processing error: %s', ex) self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict))) def getUploadInfo(self, id): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) uploadInfo.updateStatus() return uploadInfo def stopUpload(self, id): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) uploadInfo['status'] = 'aborting' uploadInfo.updateStatus() return uploadInfo