diff --git a/src/python/dm/common/processing/plugins/scriptProcessingPlugin.py b/src/python/dm/common/processing/plugins/scriptProcessingPlugin.py new file mode 100755 index 0000000000000000000000000000000000000000..c892f6ddfec6b276430414160498affcaa12e719 --- /dev/null +++ b/src/python/dm/common/processing/plugins/scriptProcessingPlugin.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python + +import os +import time +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.timeUtility import TimeUtility +from dm.common.processing.plugins.fileProcessor import FileProcessor +from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi +from dm.common.utility.dmSubprocess import DmSubprocess + +class ScriptProcessingPlugin(FileProcessor): + + PROCESSING_SCRIPT_KEY = 'processingScript' + + def __init__(self): + FileProcessor.__init__(self) + self.fileMongoDbApi = FileMongoDbApi() + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + + def processFile(self, fileInfo): + experimentName = fileInfo.get('experimentName') + experimentFilePath = fileInfo.get('experimentFilePath') + daqInfo = fileInfo.get('daqInfo', {}) + processingScript = daqInfo.get(self.PROCESSING_SCRIPT_KEY) + if not processingScript: + self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName)) + return + + self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName)) + + storageDirectory = fileInfo.get('storageDirectory') + storageFilePath = os.path.join(storageDirectory, experimentFilePath) + + # Process file + try: + p = DmSubprocess('%s %s' % (processingScript, storageFilePath)) + p.run() + stdOut = p.getStdOut() + except Exception, ex: + self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex)) + return + + # Prepare catalogging entry + fileInfo2 = {} + fileInfo2['fileName'] = os.path.basename(experimentFilePath) + fileInfo2['experimentName'] = experimentName + fileInfo2['processingScriptOutput'] = '%s' % stdOut.strip() + self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) + + +####################################################################### +# Testing. +if __name__ == '__main__': + pass diff --git a/src/python/dm/common/processing/plugins/sddsParameterProcessingPlugin.py b/src/python/dm/common/processing/plugins/sddsParameterProcessingPlugin.py new file mode 100755 index 0000000000000000000000000000000000000000..54c99808a31d6e04b00eb76f05e91d36e49517b4 --- /dev/null +++ b/src/python/dm/common/processing/plugins/sddsParameterProcessingPlugin.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +import os +import time +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.timeUtility import TimeUtility +from dm.common.processing.plugins.fileProcessor import FileProcessor +from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi + +class SddsParameterProcessingPlugin(FileProcessor): + + PROCESS_SDDS_PARAMETERS_KEY = 'processSddsParameters' + + def __init__(self): + FileProcessor.__init__(self) + self.fileMongoDbApi = FileMongoDbApi() + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + + + def processFile(self, fileInfo): + experimentFilePath = fileInfo.get('experimentFilePath') + experimentName = fileInfo.get('experimentName') + daqInfo = fileInfo.get('daqInfo', {}) + processSddsParameters = daqInfo.get(self.PROCESS_SDDS_PARAMETERS_KEY) + if not processSddsParameters: + self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName)) + return + + self.logger.debug('Processing file %s for experiment %s' % (fileInfo, experimentName)) + + storageDirectory = fileInfo.get('storageDirectory') + storageFilePath = os.path.join(storageDirectory, experimentFilePath) + + # Load file + try: + import sddsdata + from sdds import SDDS + s = SDDS(0) + self.logger.error('Loading file %s for experiment %s' % (experimentFilePath, experimentName)) + s.load(storageFilePath) + except Exception, ex: + self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex)) + return + + # Prepare catalogging entry + fileInfo2 = {} + fileInfo2['fileName'] = os.path.basename(experimentFilePath) + fileInfo2['experimentName'] = experimentName + for i in range(0,len(s.parameterName)): + parameterName = s.parameterName[i] + parameterData = s.parameterData[i][0] + fileInfo2[parameterName] = parameterData + self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) + + self.logger.error('SDDS terminate file %s for experiment %s' % (experimentFilePath, experimentName)) + sddsdata.Terminate(0) + + +####################################################################### +# Testing. +if __name__ == '__main__': + pass diff --git a/src/python/dm/common/processing/plugins/sgeJobSubmissionPlugin.py b/src/python/dm/common/processing/plugins/sgeJobSubmissionPlugin.py new file mode 100755 index 0000000000000000000000000000000000000000..a63e71343fa2859bf82d85adb82550ccc650e264 --- /dev/null +++ b/src/python/dm/common/processing/plugins/sgeJobSubmissionPlugin.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python + +import os +import time +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.timeUtility import TimeUtility +from dm.common.processing.plugins.fileProcessor import FileProcessor +from dm.common.utility.dmSubprocess import DmSubprocess + +class SgeJobSubmissionPlugin(FileProcessor): + + SGE_JOB_SCRIPT_KEY = 'sgeJobScript' + + def __init__(self, sgeRoot): + FileProcessor.__init__(self) + self.sgeRoot = sgeRoot + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + + def processFile(self, fileInfo): + experimentName = fileInfo.get('experimentName') + experimentFilePath = fileInfo.get('experimentFilePath') + daqInfo = fileInfo.get('daqInfo', {}) + sgeJobScript = daqInfo.get(self.SGE_JOB_SCRIPT_KEY) + if not sgeJobScript: + self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName)) + return + + self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName)) + + storageDirectory = fileInfo.get('storageDirectory') + storageFilePath = os.path.join(storageDirectory, experimentFilePath) + + # Process file + try: + p = DmSubprocess('. %s/default/common/settings.sh ; qsub -v DM_EXPERIMENT_NAME=%s %s %s' % (self.sgeRoot, experimentName, sgeJobScript, storageFilePath)) + p.run() + except Exception, ex: + self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex)) + return + +####################################################################### +# Testing. +if __name__ == '__main__': + pass