Newer
Older
#!/usr/bin/env python
#
# Implementation for experiment session controller.
#
import os
import time
import copy
import threading
from dm.common.constants import dmProcessingStatus
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.directoryUploadInfo import DirectoryUploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.dictUtility import DictUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory

sveseli
committed
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
UPLOAD_DELAY_IN_SECONDS = 1.0
UPLOAD_CHUNK_SIZE_IN_FILES = 100
UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0
DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0
SECONDS_PER_HOUR = 60*60
def __init__(self):
DmObjectManager.__init__(self)
self.dsExperimentApi = DsRestApiFactory.getExperimentDsApi()
Barbara B. Frosik
committed
def startDaq(self, experimentName, dataDirectory, daqInfo):
FileSystemObserver.getInstance().createDirectory(dataDirectory)
experiment = self.dsExperimentApi.getExperimentByName(experimentName)

sveseli
committed
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo)

sveseli
committed
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
maxRunTimeInHours = daqInfo.get('maxRunTimeInHours')
if maxRunTimeInHours:
daqId = daqInfo['id']
self.logger.debug('Starting timer to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours' % (daqId, experimentName, maxRunTimeInHours))
maxRunTimeInSeconds = maxRunTimeInHours*self.SECONDS_PER_HOUR
timer = threading.Timer(maxRunTimeInSeconds, self.stopDaqTimer, args=[experimentName, dataDirectory, daqId])
timer.start()
return daqInfo
def stopDaqTimer(self, experimentName, dataDirectory, daqId):
try:
daqInfo = DaqTracker.getInstance().getDaqInfo(daqId)
maxRunTimeInHours = daqInfo.get('maxRunTimeInHours')
self.logger.debug('Attempting to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours was exceeded' % (daqId, experimentName, maxRunTimeInHours))
daqStatus = daqInfo.get('status')
if daqStatus != dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING:
self.logger.debug('DAQ id %s has status of %s, will not stop it automatically' % (daqId, daqStatus))
return
self.stopDaq(experimentName, dataDirectory)
except Exception, ex:
self.logger.error('Could not automatically stop DAQ id %s: %s' % (daqId, str(ex)))
def stopDaq(self, experimentName, dataDirectory):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)

sveseli
committed
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
daqId = daqInfo.get('id')
self.logger.error('Stopped DAQ id %s for experiment %s' % (daqId, experimentName))
# Prepare upload on exit
uploadDataDirectoryOnExit = daqInfo.get('uploadDataDirectoryOnExit')
if uploadDataDirectoryOnExit:
self.logger.error('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName))
daqInfo2 = {}
daqInfo2['originalDaqId'] = daqId
uploadTargetDirectoryOnExit = daqInfo.get('uploadTargetDirectoryOnExit')
if uploadTargetDirectoryOnExit:
self.logger.debug('Automatic upload on exit for DAQ id %s (experiment %s) is using target directory: %s' % (daqId, experimentName, uploadTargetDirectoryOnExit))
daqInfo2['targetDirectory'] = uploadTargetDirectoryOnExit
try:
uploadInfo = self.uploadFiles(experimentName, uploadDataDirectoryOnExit, daqInfo2)
daqInfo['uploadIdOnExit'] = uploadInfo.get('id')
except Exception, ex:
self.logger.error('Could not automatically upload DAQ id %s: %s' % (daqId, str(ex)))
daqInfo['uploadErrorOnExit'] = str(ex)
return daqInfo
daqInfo = DaqTracker.getInstance().getDaqInfo(id)
if not daqInfo:
raise ObjectNotFound('Daq id %s not found.' % id)
daqInfo.updateStatus()
return daqInfo
def listDaqs(self, status):
daqInfoList = DaqTracker.getInstance().getDaqInfos(status)
return daqInfoList
def uploadFiles(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
fileProcessingManager = FileProcessingManager.getInstance()
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = UploadInfo(daqInfo)
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['storageUrl'] = experiment.get('storageUrl')
uploadInfo['dataDirectory'] = dataDirectory
uploadInfo['nProcessedFiles'] = 0
uploadInfo['nProcessingErrors'] = 0
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '')
if len(skipPlugins):
skipPlugins = skipPlugins.split(',')
uploadInfo['skipPlugins'] = skipPlugins
else:
skipPlugins = []
# Check that there is at least one processor that can process files
processorList = []
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
if processorName not in skipPlugins:
processorList.append(processor)
if not len(processorList):
raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory)
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = 0
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
self.logger.debug('Starting upload timer for %s' % dataDirectory)
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment])
timer.start()
return uploadInfo
def prepareUploadFiles(self, uploadInfo, daqInfo, experiment):
uploadId = uploadInfo.get('id')
self.logger.debug('Preparing upload id: %s' % uploadId)
dataDirectory = uploadInfo.get('dataDirectory')
targetDirectory = uploadInfo.get('targetDirectory')
fileProcessingManager = FileProcessingManager.getInstance()
try:
# Get files
self.logger.debug('Retrieving file paths for %s' % dataDirectory)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
# Remove hidden files
self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
# Check which files need to be processed
filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if not len(filePathsDict):
raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
except Exception, ex:
self.logger.error('Processing error for upload %s: %s' % (uploadId, str(ex)))
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
uploadInfo['errorMessage'] = str(ex)
return
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
nProcessedFiles = 0
nFiles = len(filePathsDict)
for (filePath,filePathDict) in filePathsDict.items():
try:
# Only create new uploads if we have less than
# UPLOAD_CHUNK_SIZE_IN_FILES waiting to be completed
while True:
status = uploadInfo.get('status')
if status == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
nCancelledFiles = nFiles - nProcessedFiles
uploadInfo.uploadAborted(nCancelledFiles)
self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
return
nCompletedFiles = uploadInfo.get('nCompletedFiles', 0)
nWaitingFiles = nProcessedFiles - nCompletedFiles
if nWaitingFiles < self.UPLOAD_CHUNK_SIZE_IN_FILES:
# We need to add more files for upload
break
self.logger.debug('Upload %s has %s files waiting for upload, will not add any more for %s seconds' % (uploadId, nWaitingFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS))
time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory)
fileInfo.update(filePathDict)
fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId
fileInfo['statusMonitor'] = uploadInfo
fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', [])
fileProcessingManager.processFile(fileInfo)
nProcessedFiles += 1
except Exception, ex:
self.logger.error('Processing error: %s', ex)
self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict)))
def getUploadInfo(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadInfo.updateStatus()
return uploadInfo
def listUploads(self, status):
uploadInfoList = UploadTracker.getInstance().getUploadInfos(status)
return uploadInfoList
def stopUpload(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING
uploadInfo.updateStatus()
return uploadInfo
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def uploadDirectory(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = DirectoryUploadInfo(daqInfo)
uploadInfo['id'] = uploadId
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['storageUrl'] = experiment.get('storageUrl')
uploadInfo['dataDirectory'] = dataDirectory
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '')
if len(skipPlugins):
skipPlugins = skipPlugins.split(',')
uploadInfo['skipPlugins'] = skipPlugins
else:
skipPlugins = []
fileProcessingManager = FileProcessingManager.getInstance()
processingInfo = {}
uploadInfo['processingInfo'] = processingInfo
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED}
else:
self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict])
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_PENDING}
timer.start()
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict):
uploadId = uploadInfo.get('id')
dataDirectory = uploadInfo.get('dataDirectory')
processorName = processor.name
processingInfo = uploadInfo.get('processingInfo')
self.logger.debug('Starting %s processing for upload %s by %s' % (dataDirectory, uploadId, processorName))
try:
dependsOn = processor.dependsOn
while True:
# Check status
if uploadInfo['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED
return
# Check that processor can proceed
canProcess = False
if not len(dependsOn):
canProcess = True
for depProcessorName in dependsOn:
depProcessorStatus = processingInfo.get(depProcessorName).get('status')
if depProcessorStatus in ['skipped', 'aborted', 'failed']:
# We must skip processing
self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED
elif depProcessorStatus in [dmProcessingStatus.DM_PROCESSING_STATUS_PENDING, dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING]:
# Do nothing
pass
elif depProcessorStatus == 'done':
# We can proceed
canProcess = True
else:
# This should not happen
self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED
return
# Process directory if we can
if canProcess:
directoryInfo = {'uploadInfo' : uploadInfo,
'daqInfo' : daqInfo,
'experiment' : experiment,
'filePathsDict' : filePathsDict
}
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
processingStartTime = time.time()
processor.processDirectory(directoryInfo)
if processingInfo[processorName]['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING:
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName))
else:
self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status']))
break
# Wait a bit longer
time.sleep(self.DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS)
except Exception, ex:
self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex)))
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
processingInfo[processorName]['processingError'] = str(ex)
processingEndTime = time.time()
processingInfo[processorName]['processingEndTime'] = processingEndTime
processingInfo[processorName]['processingStartTime'] = processingStartTime
processingInfo[processorName]['processingRunTime'] = processingEndTime-processingStartTime
def getProcessingPlugins(self):
pluginList = []
fileProcessingManager = FileProcessingManager.getInstance()
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
pluginInfo = {'name' : processor.name, 'dependsOn' : processor.dependsOn}
pluginList.append(PluginInfo(pluginInfo))
return pluginList