Newer
Older
#!/usr/bin/env python
#
# Implementation for experiment session controller.
#
import os
import time
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.utility.osUtility import OsUtility
from dm.common.objects.observedFile import ObservedFile
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory

sveseli
committed
from experimentTracker import ExperimentTracker
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
def __init__(self):
DmObjectManager.__init__(self)

sveseli
committed
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
Barbara B. Frosik
committed
def startDaq(self, experimentName, dataDirectory, daqInfo):
if not os.path.exists(dataDirectory):
os.makedirs(dataDirectory)
mode = 0777
os.chmod(dataDirectory, mode)
Barbara B. Frosik
committed
if daqInfo is None:
daqInfo={}
daqInfo['experimentName'] = experimentName
daqInfo['dataDirectory'] = dataDirectory
experiment = ExperimentTracker.getInstance().get(experimentName)
if experiment is not None:
oldDaqInfo = experiment.get('daqInfo')
if oldDaqInfo.get('daqEndTime') is None:
raise InvalidRequest('DAQ for experiment %s is already active in directory %s.' % (experimentName,oldDaqInfo.get('dataDirectory')))
experiment = self.dsExperimentApi.getExperimentByName(experimentName)

sveseli
committed
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
startTime = time.time()
daqInfo['daqStartTime'] = startTime
experiment['daqInfo'] = daqInfo
self.logger.debug('Starting DAQ %s' % daqInfo)

sveseli
committed
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
ExperimentTracker.getInstance().put(experimentName, experiment)
return experiment
def stopDaq(self, experimentName):
experiment = ExperimentTracker.getInstance().get(experimentName)
if experiment is not None:
daqInfo = experiment.get('daqInfo')
if experiment is None or daqInfo.get('daqEndTime') is not None:
raise InvalidRequest('Experiment %s is not active.' % experimentName)
dataDirectory = daqInfo.get('dataDirectory')
daqInfo['daqEndTime'] = time.time()

sveseli
committed
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
return experiment
def upload(self, daqInfo):
experimentName = daqInfo.get('experimentName')
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
dataDirectory = daqInfo.get('dataDirectory')
filePaths = OsUtility.findFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
for filePath in filePaths:
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileProcessingManager.processFile(fileInfo)
ExperimentTracker.getInstance().put(experimentName, experiment)