From 36c4f9835540d4cb221edd9abd5db119f84a95c3 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Sat, 30 Jan 2016 21:34:50 +0000 Subject: [PATCH] moved plugins to common area --- .../service/impl/mongoDbFileCatalogPlugin.py | 54 ---------------- .../service/impl/scriptProcessingPlugin.py | 54 ---------------- .../impl/sddsParameterProcessingPlugin.py | 62 ------------------- .../service/impl/sgeJobSubmissionPlugin.py | 44 ------------- 4 files changed, 214 deletions(-) delete mode 100755 src/python/dm/ds_web_service/service/impl/mongoDbFileCatalogPlugin.py delete mode 100755 src/python/dm/ds_web_service/service/impl/scriptProcessingPlugin.py delete mode 100755 src/python/dm/ds_web_service/service/impl/sddsParameterProcessingPlugin.py delete mode 100755 src/python/dm/ds_web_service/service/impl/sgeJobSubmissionPlugin.py diff --git a/src/python/dm/ds_web_service/service/impl/mongoDbFileCatalogPlugin.py b/src/python/dm/ds_web_service/service/impl/mongoDbFileCatalogPlugin.py deleted file mode 100755 index 5796944a..00000000 --- a/src/python/dm/ds_web_service/service/impl/mongoDbFileCatalogPlugin.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python - -import os -import time -from dm.common.utility.loggingManager import LoggingManager -from dm.common.utility.timeUtility import TimeUtility -from dm.common.processing.plugins.fileProcessor import FileProcessor -from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi - -class MongoDbFileCatalogPlugin(FileProcessor): - - def __init__(self): - FileProcessor.__init__(self) - self.fileMongoDbApi = FileMongoDbApi() - self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) - - def processFile(self, fileInfo): - experimentFilePath = fileInfo.get('experimentFilePath') - experimentName = fileInfo.get('experimentName') - self.logger.debug('Processing file %s for experiment %s' % (fileInfo, experimentName)) - - daqInfo = fileInfo.get('daqInfo') - storageDirectory = daqInfo.get('storageDirectory') - storageHost = daqInfo.get('storageHost') - storageFilePath = os.path.join(storageDirectory, experimentFilePath) - fileProcessingTime = time.time() - fileProcessingTimeStamp = TimeUtility.formatLocalTimeStamp(fileProcessingTime) - - # Prepare catalogging entry - fileInfo2 = {} - for key in ['md5Sum', 'fileSize', 'fileCreationTime', 'fileCreationTimeStamp']: - if fileInfo.has_key(key): - fileInfo2[key] = fileInfo.get(key, '') - fileInfo2['fileName'] = os.path.basename(experimentFilePath) - fileInfo2['storageDirectory'] = storageDirectory - fileInfo2['storageHost'] = storageHost - fileInfo2['storageFilePath'] = storageFilePath - fileInfo2['experimentFilePath'] = experimentFilePath - fileInfo2['experimentName'] = experimentName - fileInfo2['fileProcessingTime'] = fileProcessingTime - fileInfo2['fileProcessingTimeStamp'] = fileProcessingTimeStamp - self.logger.debug('Daq info: %s' % (daqInfo)) - fileInfo2.update(daqInfo) - if daqInfo.has_key('id'): - fileInfo2['daqId'] = daqInfo.get('id') - del fileInfo2['id'] - - self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) - - -####################################################################### -# Testing. -if __name__ == '__main__': - pass diff --git a/src/python/dm/ds_web_service/service/impl/scriptProcessingPlugin.py b/src/python/dm/ds_web_service/service/impl/scriptProcessingPlugin.py deleted file mode 100755 index c892f6dd..00000000 --- a/src/python/dm/ds_web_service/service/impl/scriptProcessingPlugin.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python - -import os -import time -from dm.common.utility.loggingManager import LoggingManager -from dm.common.utility.timeUtility import TimeUtility -from dm.common.processing.plugins.fileProcessor import FileProcessor -from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi -from dm.common.utility.dmSubprocess import DmSubprocess - -class ScriptProcessingPlugin(FileProcessor): - - PROCESSING_SCRIPT_KEY = 'processingScript' - - def __init__(self): - FileProcessor.__init__(self) - self.fileMongoDbApi = FileMongoDbApi() - self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) - - def processFile(self, fileInfo): - experimentName = fileInfo.get('experimentName') - experimentFilePath = fileInfo.get('experimentFilePath') - daqInfo = fileInfo.get('daqInfo', {}) - processingScript = daqInfo.get(self.PROCESSING_SCRIPT_KEY) - if not processingScript: - self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName)) - return - - self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName)) - - storageDirectory = fileInfo.get('storageDirectory') - storageFilePath = os.path.join(storageDirectory, experimentFilePath) - - # Process file - try: - p = DmSubprocess('%s %s' % (processingScript, storageFilePath)) - p.run() - stdOut = p.getStdOut() - except Exception, ex: - self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex)) - return - - # Prepare catalogging entry - fileInfo2 = {} - fileInfo2['fileName'] = os.path.basename(experimentFilePath) - fileInfo2['experimentName'] = experimentName - fileInfo2['processingScriptOutput'] = '%s' % stdOut.strip() - self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) - - -####################################################################### -# Testing. -if __name__ == '__main__': - pass diff --git a/src/python/dm/ds_web_service/service/impl/sddsParameterProcessingPlugin.py b/src/python/dm/ds_web_service/service/impl/sddsParameterProcessingPlugin.py deleted file mode 100755 index 54c99808..00000000 --- a/src/python/dm/ds_web_service/service/impl/sddsParameterProcessingPlugin.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python - -import os -import time -from dm.common.utility.loggingManager import LoggingManager -from dm.common.utility.timeUtility import TimeUtility -from dm.common.processing.plugins.fileProcessor import FileProcessor -from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi - -class SddsParameterProcessingPlugin(FileProcessor): - - PROCESS_SDDS_PARAMETERS_KEY = 'processSddsParameters' - - def __init__(self): - FileProcessor.__init__(self) - self.fileMongoDbApi = FileMongoDbApi() - self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) - - - def processFile(self, fileInfo): - experimentFilePath = fileInfo.get('experimentFilePath') - experimentName = fileInfo.get('experimentName') - daqInfo = fileInfo.get('daqInfo', {}) - processSddsParameters = daqInfo.get(self.PROCESS_SDDS_PARAMETERS_KEY) - if not processSddsParameters: - self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName)) - return - - self.logger.debug('Processing file %s for experiment %s' % (fileInfo, experimentName)) - - storageDirectory = fileInfo.get('storageDirectory') - storageFilePath = os.path.join(storageDirectory, experimentFilePath) - - # Load file - try: - import sddsdata - from sdds import SDDS - s = SDDS(0) - self.logger.error('Loading file %s for experiment %s' % (experimentFilePath, experimentName)) - s.load(storageFilePath) - except Exception, ex: - self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex)) - return - - # Prepare catalogging entry - fileInfo2 = {} - fileInfo2['fileName'] = os.path.basename(experimentFilePath) - fileInfo2['experimentName'] = experimentName - for i in range(0,len(s.parameterName)): - parameterName = s.parameterName[i] - parameterData = s.parameterData[i][0] - fileInfo2[parameterName] = parameterData - self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) - - self.logger.error('SDDS terminate file %s for experiment %s' % (experimentFilePath, experimentName)) - sddsdata.Terminate(0) - - -####################################################################### -# Testing. -if __name__ == '__main__': - pass diff --git a/src/python/dm/ds_web_service/service/impl/sgeJobSubmissionPlugin.py b/src/python/dm/ds_web_service/service/impl/sgeJobSubmissionPlugin.py deleted file mode 100755 index a63e7134..00000000 --- a/src/python/dm/ds_web_service/service/impl/sgeJobSubmissionPlugin.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python - -import os -import time -from dm.common.utility.loggingManager import LoggingManager -from dm.common.utility.timeUtility import TimeUtility -from dm.common.processing.plugins.fileProcessor import FileProcessor -from dm.common.utility.dmSubprocess import DmSubprocess - -class SgeJobSubmissionPlugin(FileProcessor): - - SGE_JOB_SCRIPT_KEY = 'sgeJobScript' - - def __init__(self, sgeRoot): - FileProcessor.__init__(self) - self.sgeRoot = sgeRoot - self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) - - def processFile(self, fileInfo): - experimentName = fileInfo.get('experimentName') - experimentFilePath = fileInfo.get('experimentFilePath') - daqInfo = fileInfo.get('daqInfo', {}) - sgeJobScript = daqInfo.get(self.SGE_JOB_SCRIPT_KEY) - if not sgeJobScript: - self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName)) - return - - self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName)) - - storageDirectory = fileInfo.get('storageDirectory') - storageFilePath = os.path.join(storageDirectory, experimentFilePath) - - # Process file - try: - p = DmSubprocess('. %s/default/common/settings.sh ; qsub -v DM_EXPERIMENT_NAME=%s %s %s' % (self.sgeRoot, experimentName, sgeJobScript, storageFilePath)) - p.run() - except Exception, ex: - self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex)) - return - -####################################################################### -# Testing. -if __name__ == '__main__': - pass -- GitLab