Newer
Older
#!/usr/bin/env python
#
# Implementation for experiment session controller.
#
import os
import time
import copy
import threading
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.dictUtility import DictUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory

sveseli
committed
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
UPLOAD_DELAY_IN_SECONDS = 1.0
def __init__(self):
DmObjectManager.__init__(self)

sveseli
committed
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
Barbara B. Frosik
committed
def startDaq(self, experimentName, dataDirectory, daqInfo):
FileSystemObserver.getInstance().createDirectory(dataDirectory)
experiment = self.dsExperimentApi.getExperimentByName(experimentName)

sveseli
committed
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo)

sveseli
committed
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
return daqInfo
def stopDaq(self, experimentName, dataDirectory, includeFileDetails=False):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)

sveseli
committed
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
daqInfo.updateStatus()
return daqInfo.scrub(includeFileDetails)
def getDaqInfo(self, id, includeFileDetails=False):
daqInfo = DaqTracker.getInstance().getDaqInfo(id)
if not daqInfo:
raise ObjectNotFound('Daq id %s not found.' % id)
daqInfo.updateStatus()
return daqInfo.scrub(includeFileDetails)
def upload(self, experimentName, dataDirectory, daqInfo, includeFileDetails=False):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = UploadInfo(daqInfo)
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['storageUrl'] = experiment.get('storageUrl')
uploadInfo['dataDirectory'] = dataDirectory
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimeStamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
# Remove hidden files
self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
# Check which files need to be processed
filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if not len(filePathsDict):
raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['fileDict'] = fileDict
uploadInfo['nFiles'] = len(filePathsDict)
self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict])
timer.start()
return uploadInfo
def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict):
uploadId = uploadInfo.get('id')
self.logger.debug('Preparing upload id: %s' % uploadId)
fileDict = uploadInfo.get('fileDict')
dataDirectory = uploadInfo.get('dataDirectory')
fileProcessingManager = FileProcessingManager.getInstance()
for filePath in filePathsDict.keys():
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo['daqInfo'] = daqInfo
fileProcessingInfo = { 'processed' : False }
fileDict[filePath] = fileProcessingInfo
fileInfo['processingInfo'] = fileProcessingInfo
try:
fileProcessingManager.processFile(fileInfo)
except Exception, ex:
self.logger.error('Processing error: %s', ex)
self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict)))
def getUploadInfo(self, id, includeFileDetails=False):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadInfo.updateStatus()
return uploadInfo.scrub(includeFileDetails)