Newer
Older
#!/usr/bin/env python
#
# Implementation for experiment session controller.
#
import os
import time
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory

sveseli
committed
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
def __init__(self):
DmObjectManager.__init__(self)

sveseli
committed
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
Barbara B. Frosik
committed
def startDaq(self, experimentName, dataDirectory, daqInfo):
FileSystemObserver.getInstance().createDirectory(dataDirectory)
Barbara B. Frosik
committed
if daqInfo is None:
daqInfo={}
daqInfo['experimentName'] = experimentName
daqInfo['dataDirectory'] = dataDirectory
experiment = ExperimentTracker.getInstance().get(experimentName)
if experiment is not None:
oldDaqInfo = experiment.get('daqInfo')
if oldDaqInfo.get('daqEndTime') is None:
raise InvalidRequest('DAQ for experiment %s is already active in directory %s.' % (experimentName,oldDaqInfo.get('dataDirectory')))
experiment = self.dsExperimentApi.getExperimentByName(experimentName)

sveseli
committed
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
startTime = time.time()
daqInfo['daqStartTime'] = startTime
experiment['daqInfo'] = daqInfo
self.logger.debug('Starting DAQ %s' % daqInfo)

sveseli
committed
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
ExperimentTracker.getInstance().put(experimentName, experiment)
return experiment
def stopDaq(self, experimentName):
experiment = ExperimentTracker.getInstance().get(experimentName)
if experiment is not None:
daqInfo = experiment.get('daqInfo')
if experiment is None or daqInfo.get('daqEndTime') is not None:
raise InvalidRequest('Experiment %s is not active.' % experimentName)
dataDirectory = daqInfo.get('dataDirectory')
daqInfo['daqEndTime'] = time.time()

sveseli
committed
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
return experiment
def upload(self, daqInfo):
experimentName = daqInfo.get('experimentName')
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
dataDirectory = daqInfo.get('dataDirectory')
filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = UploadInfo()
uploadInfo['id'] = uploadId
uploadInfo['experiment'] = experimentName
uploadInfo['dataDirectory'] = dataDirectory
fileDict = {}
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo['uploadId'] = uploadId
fileUploadInfo = { 'processed' : False }
FileUtility.statFile(filePath, fileUploadInfo)
fileDict[filePath] = fileUploadInfo
fileProcessingManager.processFile(fileInfo)
uploadInfo['fileDict'] = fileDict
#self.logger.debug('Upload info %s' % uploadInfo)
ExperimentTracker.getInstance().put(experimentName, experiment)
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
UploadTracker.getInstance().put(uploadId, uploadInfo)
return uploadInfo
def getUploadInfo(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadStatus = uploadInfo.get('status', 'running')
if uploadStatus == 'complete':
return uploadInfo
fileDict = uploadInfo.get('fileDict')
nFiles = len(fileDict)
nProcessedFiles = 0
for (filePath,uploadFileInfo) in fileDict.items():
if uploadFileInfo.get('processed'):
nProcessedFiles += 1
# need to handle 'failed' uploads
if nProcessedFiles == nFiles:
uploadStatus = 'complete'
uploadInfo['status'] = uploadStatus
uploadInfo['nProcessedFiles'] = '%s/%s' % (nProcessedFiles,nFiles)
percentageComplete = 100.0
if nFiles > 0:
percentageComplete = float(nProcessedFiles)/float(nFiles)*100.0
uploadInfo['percentageComplete'] = '%.2f' % percentageComplete
return uploadInfo