From 0bba4a4e9f81670631a1faf8c99812ff35028b07 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Wed, 8 Jul 2015 20:05:23 +0000 Subject: [PATCH] added couple of processing plugins --- .../service/impl/scriptProcessingPlugin.py | 55 +++++++++++++++++++ .../impl/sddsParameterProcessingPlugin.py | 23 +++++++- .../service/impl/sgeJobSubmissionPlugin.py | 45 +++++++++++++++ 3 files changed, 120 insertions(+), 3 deletions(-) create mode 100755 src/python/dm/ds_web_service/service/impl/scriptProcessingPlugin.py create mode 100755 src/python/dm/ds_web_service/service/impl/sgeJobSubmissionPlugin.py diff --git a/src/python/dm/ds_web_service/service/impl/scriptProcessingPlugin.py b/src/python/dm/ds_web_service/service/impl/scriptProcessingPlugin.py new file mode 100755 index 00000000..75be4c78 --- /dev/null +++ b/src/python/dm/ds_web_service/service/impl/scriptProcessingPlugin.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python + +import os +import time +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.timeUtility import TimeUtility +from dm.common.processing.plugins.fileProcessor import FileProcessor +from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi +from dm.common.utility.dmSubprocess import DmSubprocess + +class ScriptProcessingPlugin(FileProcessor): + + PROCESSING_SCRIPT_KEY = 'processingScript' + + def __init__(self): + FileProcessor.__init__(self) + self.fileMongoDbApi = FileMongoDbApi() + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + + def processFile(self, fileInfo): + experiment = fileInfo.get('experiment') + experimentName = experiment.get('name') + experimentFilePath = fileInfo.get('experimentFilePath') + daqInfo = fileInfo.get('daqInfo', {}) + processingScript = daqInfo.get(self.PROCESSING_SCRIPT_KEY) + if not processingScript: + self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName)) + return + + self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName)) + + experimentStorageDirectory = experiment.get('storageDirectory') + storageFilePath = os.path.join(experimentStorageDirectory, experimentFilePath) + + # Process file + try: + p = DmSubprocess('%s %s' % (processingScript, storageFilePath)) + p.run() + stdOut = p.getStdOut() + except Exception, ex: + self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex)) + return + + # Prepare catalogging entry + fileInfo2 = {} + fileInfo2['fileName'] = os.path.basename(experimentFilePath) + fileInfo2['experimentName'] = experimentName + fileInfo2['processingScriptOutput'] = '%s' % stdOut.strip() + self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) + + +####################################################################### +# Testing. +if __name__ == '__main__': + pass diff --git a/src/python/dm/ds_web_service/service/impl/sddsParameterProcessingPlugin.py b/src/python/dm/ds_web_service/service/impl/sddsParameterProcessingPlugin.py index a60cc136..ed2f8f35 100755 --- a/src/python/dm/ds_web_service/service/impl/sddsParameterProcessingPlugin.py +++ b/src/python/dm/ds_web_service/service/impl/sddsParameterProcessingPlugin.py @@ -9,6 +9,8 @@ from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi class SddsParameterProcessingPlugin(FileProcessor): + PROCESS_SDDS_PARAMETERS_KEY = 'processSddsParameters' + def __init__(self): FileProcessor.__init__(self) self.fileMongoDbApi = FileMongoDbApi() @@ -19,15 +21,27 @@ class SddsParameterProcessingPlugin(FileProcessor): experimentFilePath = fileInfo.get('experimentFilePath') experiment = fileInfo.get('experiment') experimentName = experiment.get('name') + daqInfo = fileInfo.get('daqInfo', {}) + processSddsParameters = daqInfo.get(self.PROCESS_SDDS_PARAMETERS_KEY) + if not processSddsParameters: + self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName)) + return + self.logger.debug('Processing file %s for experiment %s' % (fileInfo, experimentName)) experimentStorageDirectory = experiment.get('storageDirectory') storageFilePath = os.path.join(experimentStorageDirectory, experimentFilePath) # Load file - from sdds import SDDS - s = SDDS(0) - s.load(storageFilePath) + try: + import sddsdata + from sdds import SDDS + s = SDDS(0) + self.logger.error('Loading file %s for experiment %s' % (experimentFilePath, experimentName)) + s.load(storageFilePath) + except Exception, ex: + self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex)) + return # Prepare catalogging entry fileInfo2 = {} @@ -39,6 +53,9 @@ class SddsParameterProcessingPlugin(FileProcessor): fileInfo2[parameterName] = parameterData self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) + self.logger.error('SDDS terminate file %s for experiment %s' % (experimentFilePath, experimentName)) + sddsdata.Terminate(0) + ####################################################################### # Testing. diff --git a/src/python/dm/ds_web_service/service/impl/sgeJobSubmissionPlugin.py b/src/python/dm/ds_web_service/service/impl/sgeJobSubmissionPlugin.py new file mode 100755 index 00000000..42ccb7ec --- /dev/null +++ b/src/python/dm/ds_web_service/service/impl/sgeJobSubmissionPlugin.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +import os +import time +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.timeUtility import TimeUtility +from dm.common.processing.plugins.fileProcessor import FileProcessor +from dm.common.utility.dmSubprocess import DmSubprocess + +class SgeJobSubmissionPlugin(FileProcessor): + + SGE_JOB_SCRIPT_KEY = 'sgeJobScript' + + def __init__(self, sgeRoot): + FileProcessor.__init__(self) + self.sgeRoot = sgeRoot + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + + def processFile(self, fileInfo): + experiment = fileInfo.get('experiment') + experimentName = experiment.get('name') + experimentFilePath = fileInfo.get('experimentFilePath') + daqInfo = fileInfo.get('daqInfo', {}) + sgeJobScript = daqInfo.get(self.SGE_JOB_SCRIPT_KEY) + if not sgeJobScript: + self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName)) + return + + self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName)) + + experimentStorageDirectory = experiment.get('storageDirectory') + storageFilePath = os.path.join(experimentStorageDirectory, experimentFilePath) + + # Process file + try: + p = DmSubprocess('. %s/default/common/settings.sh ; qsub -v DM_EXPERIMENT_NAME=%s %s %s' % (self.sgeRoot, experimentName, sgeJobScript, storageFilePath)) + p.run() + except Exception, ex: + self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex)) + return + +####################################################################### +# Testing. +if __name__ == '__main__': + pass -- GitLab