Newer
Older
#!/usr/bin/env python
#
# Implementation for experiment session controller.
#
import os
import time
import copy
import threading
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.directoryUploadInfo import DirectoryUploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.dictUtility import DictUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory

sveseli
committed
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
UPLOAD_DELAY_IN_SECONDS = 1.0
DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0
def __init__(self):
DmObjectManager.__init__(self)

sveseli
committed
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
Barbara B. Frosik
committed
def startDaq(self, experimentName, dataDirectory, daqInfo):
FileSystemObserver.getInstance().createDirectory(dataDirectory)
experiment = self.dsExperimentApi.getExperimentByName(experimentName)

sveseli
committed
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo)

sveseli
committed
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
return daqInfo
def stopDaq(self, experimentName, dataDirectory):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)

sveseli
committed
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
return daqInfo
daqInfo = DaqTracker.getInstance().getDaqInfo(id)
if not daqInfo:
raise ObjectNotFound('Daq id %s not found.' % id)
daqInfo.updateStatus()
return daqInfo
def uploadFiles(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = UploadInfo(daqInfo)
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['storageUrl'] = experiment.get('storageUrl')
uploadInfo['dataDirectory'] = dataDirectory
uploadInfo['nProcessedFiles'] = 0
uploadInfo['nProcessingErrors'] = 0
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '')
if len(skipPlugins):
skipPlugins = skipPlugins.split(',')
uploadInfo['skipPlugins'] = skipPlugins
else:
skipPlugins = []
# Check that there is at least one processor that can process files
processorList = []
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
if processorName not in skipPlugins:
processorList.append(processor)
if not len(processorList):
raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory)
# Remove hidden files
self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
# Check which files need to be processed
filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if not len(filePathsDict):
raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = len(filePathsDict)
self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment, filePathsDict])
timer.start()
return uploadInfo
def prepareUploadFiles(self, uploadInfo, daqInfo, experiment, filePathsDict):
uploadId = uploadInfo.get('id')
self.logger.debug('Preparing upload id: %s' % uploadId)
dataDirectory = uploadInfo.get('dataDirectory')
fileProcessingManager = FileProcessingManager.getInstance()
nProcessedFiles = 0
nFiles = len(filePathsDict)
for (filePath,filePathDict) in filePathsDict.items():
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo.update(filePathDict)
fileInfo['daqInfo'] = daqInfo
fileInfo['statusMonitor'] = uploadInfo
fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', [])
try:
if uploadInfo.get('status') != 'aborting':
fileProcessingManager.processFile(fileInfo)
nProcessedFiles += 1
else:
nCancelledFiles = nFiles - nProcessedFiles
uploadInfo.uploadAborted(nCancelledFiles)
self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
break
except Exception, ex:
self.logger.error('Processing error: %s', ex)
self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict)))
def getUploadInfo(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadInfo.updateStatus()
return uploadInfo
def stopUpload(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadInfo['status'] = 'aborting'
uploadInfo.updateStatus()
return uploadInfo
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def uploadDirectory(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = DirectoryUploadInfo(daqInfo)
uploadInfo['id'] = uploadId
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['storageUrl'] = experiment.get('storageUrl')
uploadInfo['dataDirectory'] = dataDirectory
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '')
if len(skipPlugins):
skipPlugins = skipPlugins.split(',')
uploadInfo['skipPlugins'] = skipPlugins
else:
skipPlugins = []
fileProcessingManager = FileProcessingManager.getInstance()
processingInfo = {}
uploadInfo['processingInfo'] = processingInfo
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
processingInfo[processorName] = {'status' : 'skipped'}
else:
self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict])
processingInfo[processorName] = {'status' : 'pending'}
timer.start()
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = 'running'
return uploadInfo
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict):
uploadId = uploadInfo.get('id')
dataDirectory = uploadInfo.get('dataDirectory')
processorName = processor.name
processingInfo = uploadInfo.get('processingInfo')
self.logger.debug('Starting %s processing for upload %s by %s' % (dataDirectory, uploadId, processorName))
try:
dependsOn = processor.dependsOn
while True:
# Check status
if uploadInfo['status'] == 'aborting':
processingInfo[processorName]['status'] = 'aborted'
return
# Check that processor can proceed
canProcess = False
if not len(dependsOn):
canProcess = True
for depProcessorName in dependsOn:
depProcessorStatus = processingInfo.get(depProcessorName).get('status')
if depProcessorStatus in ['skipped', 'aborted', 'failed']:
# We must skip processing
self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = 'skipped'
return
elif depProcessorStatus in ['pending', 'running']:
# Do nothing
pass
elif depProcessorStatus == 'done':
# We can proceed
canProcess = True
else:
# This should not happen
self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = 'skipped'
return
# Process directory if we can
if canProcess:
directoryInfo = {'uploadInfo' : uploadInfo,
'daqInfo' : daqInfo,
'experiment' : experiment,
'filePathsDict' : filePathsDict
}
processingInfo[processorName]['status'] = 'running'
processingStartTime = time.time()
processor.processDirectory(directoryInfo)
if processingInfo[processorName]['status'] == 'running':
processingInfo[processorName]['status'] = 'done'
self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName))
else:
self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status']))
break
# Wait a bit longer
time.sleep(self.DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS)
except Exception, ex:
self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex)))
processingInfo[processorName]['status'] = 'failed'
processingInfo[processorName]['processingError'] = str(ex)
processingEndTime = time.time()
processingInfo[processorName]['processingEndTime'] = processingEndTime
processingInfo[processorName]['processingStartTime'] = processingStartTime
processingInfo[processorName]['processingRunTime'] = processingEndTime-processingStartTime
def getProcessingPlugins(self):
pluginList = []
fileProcessingManager = FileProcessingManager.getInstance()
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
pluginInfo = {'name' : processor.name, 'dependsOn' : processor.dependsOn}
pluginList.append(PluginInfo(pluginInfo))
return pluginList