Newer
Older
#!/usr/bin/env python
#
# Implementation for experiment session controller.
#
import os
import time
import copy
import threading
from dm.common.constants import dmProcessingStatus
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.directoryUploadInfo import DirectoryUploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.dictUtility import DictUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory

sveseli
committed
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
UPLOAD_DELAY_IN_SECONDS = 1.0
UPLOAD_CHUNK_SIZE_IN_FILES = 100
UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0
DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0
SECONDS_PER_HOUR = 60*60
def __init__(self):
DmObjectManager.__init__(self)
self.dsExperimentApi = DsRestApiFactory.getExperimentDsApi()
Barbara B. Frosik
committed
def startDaq(self, experimentName, dataDirectory, daqInfo):
FileSystemObserver.getInstance().createDirectory(dataDirectory)
experiment = self.dsExperimentApi.getExperimentByName(experimentName)

sveseli
committed
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo)

sveseli
committed
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
maxRunTimeInHours = daqInfo.get('maxRunTimeInHours')
if maxRunTimeInHours:
daqId = daqInfo['id']
self.logger.debug('Starting timer to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours' % (daqId, experimentName, maxRunTimeInHours))
maxRunTimeInSeconds = maxRunTimeInHours*self.SECONDS_PER_HOUR
timer = threading.Timer(maxRunTimeInSeconds, self.stopDaqTimer, args=[experimentName, dataDirectory, daqId])
timer.start()
return daqInfo
def stopDaqTimer(self, experimentName, dataDirectory, daqId):
try:
daqInfo = DaqTracker.getInstance().getDaqInfo(daqId)
maxRunTimeInHours = daqInfo.get('maxRunTimeInHours')
self.logger.debug('Attempting to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours was exceeded' % (daqId, experimentName, maxRunTimeInHours))
daqStatus = daqInfo.get('status')
if daqStatus != dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING:
self.logger.debug('DAQ id %s has status of %s, will not stop it automatically' % (daqId, daqStatus))
return
self.stopDaq(experimentName, dataDirectory)
except Exception, ex:
self.logger.error('Could not automatically stop DAQ id %s: %s' % (daqId, str(ex)))
def stopDaq(self, experimentName, dataDirectory):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)

sveseli
committed
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
daqId = daqInfo.get('id')
self.logger.debug('Stopped DAQ id %s for experiment %s' % (daqId, experimentName))
# Prepare upload on exit
uploadDataDirectoryOnExit = daqInfo.get('uploadDataDirectoryOnExit')
if uploadDataDirectoryOnExit:
self.logger.debug('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName))
daqInfo2 = {}
daqInfo2['originalDaqId'] = daqId
uploadDestDirectoryOnExit = daqInfo.get('uploadDestDirectoryOnExit')
if uploadDestDirectoryOnExit:
self.logger.debug('Automatic upload on exit for DAQ id %s (experiment %s) is using dest directory: %s' % (daqId, experimentName, uploadDestDirectoryOnExit))
daqInfo2['destDirectory'] = uploadDestDirectoryOnExit
try:
uploadInfo = self.uploadFiles(experimentName, uploadDataDirectoryOnExit, daqInfo2)
daqInfo['uploadIdOnExit'] = uploadInfo.get('id')
except Exception, ex:
self.logger.error('Could not automatically upload DAQ id %s: %s' % (daqId, str(ex)))
daqInfo['uploadErrorOnExit'] = str(ex)
return daqInfo
daqInfo = DaqTracker.getInstance().getDaqInfo(id)
if not daqInfo:
raise ObjectNotFound('Daq id %s not found.' % id)
daqInfo.updateStatus()
return daqInfo
def listDaqs(self, status):
daqInfoList = DaqTracker.getInstance().getDaqInfos(status)
return daqInfoList
def uploadFiles(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
fileProcessingManager = FileProcessingManager.getInstance()
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = UploadInfo(daqInfo)
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['storageUrl'] = experiment.get('storageUrl')
uploadInfo['dataDirectory'] = dataDirectory
uploadInfo['nProcessedFiles'] = 0
uploadInfo['nProcessingErrors'] = 0
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '')
if len(skipPlugins):
skipPlugins = skipPlugins.split(',')
uploadInfo['skipPlugins'] = skipPlugins
else:
skipPlugins = []
# Check that there is at least one processor that can process files
processorList = []
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
if processorName not in skipPlugins:
processorList.append(processor)
if not len(processorList):
raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory)
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = 0
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
self.logger.debug('Starting upload timer for %s' % dataDirectory)
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment])
timer.start()
return uploadInfo
def prepareUploadFiles(self, uploadInfo, daqInfo, experiment):
uploadId = uploadInfo.get('id')
self.logger.debug('Preparing upload id: %s' % uploadId)
dataDirectory = uploadInfo.get('dataDirectory')
destDirectory = uploadInfo.get('destDirectory')
fileProcessingManager = FileProcessingManager.getInstance()
try:
# Get files
self.logger.debug('Retrieving file paths for %s' % dataDirectory)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
# Remove hidden files
self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
# Check which files need to be processed
filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if not len(filePathsDict):
raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
except Exception, ex:
self.logger.error('Processing error for upload %s: %s' % (uploadId, str(ex)))
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
uploadInfo['errorMessage'] = str(ex)
return
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
nProcessedFiles = 0
nFiles = len(filePathsDict)
for (filePath,filePathDict) in filePathsDict.items():
try:
# Only create new uploads if we have less than
# UPLOAD_CHUNK_SIZE_IN_FILES waiting to be completed
while True:
status = uploadInfo.get('status')
if status == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
nCancelledFiles = nFiles - nProcessedFiles
uploadInfo.uploadAborted(nCancelledFiles)
self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
return
nCompletedFiles = uploadInfo.get('nCompletedFiles', 0)
nQueuedFiles = fileProcessingManager.getNumberOfQueuedFilesByOwner(uploadId)
if nQueuedFiles < self.UPLOAD_CHUNK_SIZE_IN_FILES:
# We need to add more files for upload
break
self.logger.debug('Upload %s has %s completed files, %s queued files waiting for upload, will not add any more for %s seconds; upload timer has processed %s out of %s files' % (uploadId, nCompletedFiles, nQueuedFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS, nProcessedFiles, nFiles))
time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory)
fileInfo.update(filePathDict)
fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId
fileInfo['statusMonitor'] = uploadInfo
fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', [])
fileInfo['owner'] = uploadId
fileProcessingManager.queueFile(fileInfo)
nProcessedFiles += 1
except Exception, ex:
self.logger.error('Processing error: %s', ex)
nCompletedFiles = uploadInfo.get('nCompletedFiles', 0)
nQueuedFiles = fileProcessingManager.getNumberOfQueuedFilesByOwner(uploadId)
self.logger.debug('Done preparing upload %s; it has a total of %s files (%s completed and %s queued)' % (uploadId, nFiles, nCompletedFiles, nQueuedFiles))
def getUploadInfo(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadInfo.updateStatus()
return uploadInfo
def listUploads(self, status):
uploadInfoList = UploadTracker.getInstance().getUploadInfos(status)
return uploadInfoList
def stopUpload(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING
uploadInfo.updateStatus()
return uploadInfo
def uploadDirectory(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = DirectoryUploadInfo(daqInfo)
uploadInfo['id'] = uploadId
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['storageUrl'] = experiment.get('storageUrl')
uploadInfo['dataDirectory'] = dataDirectory
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '')
if len(skipPlugins):
skipPlugins = skipPlugins.split(',')
uploadInfo['skipPlugins'] = skipPlugins
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = 0
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
fileProcessingManager = FileProcessingManager.getInstance()
processingInfo = {}
uploadInfo['processingInfo'] = processingInfo
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
if processorName in skipPlugins:
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED}
else:
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_PENDING}
self.logger.debug('Starting upload directory %s timer for experiment %s' % (dataDirectory, experimentName))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadDirectory, args=[uploadInfo, daqInfo, experiment])
timer.start()
return uploadInfo
def prepareUploadDirectory(self, uploadInfo, daqInfo, experiment):
uploadId = uploadInfo['id']
dataDirectory = uploadInfo['dataDirectory']
experimentName = uploadInfo['experimentName']
skipPlugins = uploadInfo.get('skipPlugins', [])
self.logger.debug('Preparing directory %s upload for experiment %s' % (dataDirectory, experimentName))
try:
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
self.logger.debug('There are %s files in directory %s (experiment %s)' % (len(filePathsDict), dataDirectory, experimentName))
except Exception, ex:
self.logger.error('Cannot retrieve files for directory upload %s: %s' % (uploadId, str(ex)))
self.logger.error('Marking directory upload %s as failed' % (uploadId))
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
uploadInfo['errorMessage'] = str(ex)
return
fileProcessingManager = FileProcessingManager.getInstance()
self.logger.debug('Preparing plugin timers for directory %s upload (experiment %s)' % (dataDirectory, experimentName))
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict])
timer.start()
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict):
uploadId = uploadInfo.get('id')
dataDirectory = uploadInfo.get('dataDirectory')
processorName = processor.name
processingInfo = uploadInfo.get('processingInfo')
self.logger.debug('Starting %s processing for upload %s by %s' % (dataDirectory, uploadId, processorName))
try:
dependsOn = processor.dependsOn
while True:
# Check status
if uploadInfo['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED
return
# Check that processor can proceed
canProcess = False
if not len(dependsOn):
canProcess = True
for depProcessorName in dependsOn:
depProcessorStatus = processingInfo.get(depProcessorName).get('status')
if depProcessorStatus in [
dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED,
dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED,
dmProcessingStatus.DM_PROCESSING_STATUS_FAILED]:
# We must skip processing
self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED
elif depProcessorStatus in [dmProcessingStatus.DM_PROCESSING_STATUS_PENDING, dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING]:
elif depProcessorStatus == dmProcessingStatus.DM_PROCESSING_STATUS_DONE:
# We can proceed
canProcess = True
else:
# This should not happen
self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED
return
# Process directory if we can
if canProcess:
directoryInfo = {'uploadInfo' : uploadInfo,
'daqInfo' : daqInfo,
'experiment' : experiment,
'filePathsDict' : filePathsDict
}
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
processingStartTime = time.time()
processor.processDirectory(directoryInfo)
if processingInfo[processorName]['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING:
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName))
else:
self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status']))
break
# Wait a bit longer
time.sleep(self.DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS)
except Exception, ex:
self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex)))
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
processingInfo[processorName]['processingError'] = str(ex)
processingEndTime = time.time()
processingInfo[processorName]['processingEndTime'] = processingEndTime
processingInfo[processorName]['processingStartTime'] = processingStartTime
processingInfo[processorName]['processingRunTime'] = processingEndTime-processingStartTime
def getProcessingPlugins(self):
pluginList = []
fileProcessingManager = FileProcessingManager.getInstance()
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
pluginInfo = {'name' : processor.name, 'dependsOn' : processor.dependsOn}
pluginList.append(PluginInfo(pluginInfo))
return pluginList