Skip to content
Snippets Groups Projects
Commit 36c4f983 authored by sveseli's avatar sveseli
Browse files

moved plugins to common area

parent 691ee9fc
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi
class MongoDbFileCatalogPlugin(FileProcessor):
def __init__(self):
FileProcessor.__init__(self)
self.fileMongoDbApi = FileMongoDbApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
self.logger.debug('Processing file %s for experiment %s' % (fileInfo, experimentName))
daqInfo = fileInfo.get('daqInfo')
storageDirectory = daqInfo.get('storageDirectory')
storageHost = daqInfo.get('storageHost')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
fileProcessingTime = time.time()
fileProcessingTimeStamp = TimeUtility.formatLocalTimeStamp(fileProcessingTime)
# Prepare catalogging entry
fileInfo2 = {}
for key in ['md5Sum', 'fileSize', 'fileCreationTime', 'fileCreationTimeStamp']:
if fileInfo.has_key(key):
fileInfo2[key] = fileInfo.get(key, '')
fileInfo2['fileName'] = os.path.basename(experimentFilePath)
fileInfo2['storageDirectory'] = storageDirectory
fileInfo2['storageHost'] = storageHost
fileInfo2['storageFilePath'] = storageFilePath
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileInfo2['fileProcessingTime'] = fileProcessingTime
fileInfo2['fileProcessingTimeStamp'] = fileProcessingTimeStamp
self.logger.debug('Daq info: %s' % (daqInfo))
fileInfo2.update(daqInfo)
if daqInfo.has_key('id'):
fileInfo2['daqId'] = daqInfo.get('id')
del fileInfo2['id']
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi
from dm.common.utility.dmSubprocess import DmSubprocess
class ScriptProcessingPlugin(FileProcessor):
PROCESSING_SCRIPT_KEY = 'processingScript'
def __init__(self):
FileProcessor.__init__(self)
self.fileMongoDbApi = FileMongoDbApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentName = fileInfo.get('experimentName')
experimentFilePath = fileInfo.get('experimentFilePath')
daqInfo = fileInfo.get('daqInfo', {})
processingScript = daqInfo.get(self.PROCESSING_SCRIPT_KEY)
if not processingScript:
self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName))
return
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
storageDirectory = fileInfo.get('storageDirectory')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
# Process file
try:
p = DmSubprocess('%s %s' % (processingScript, storageFilePath))
p.run()
stdOut = p.getStdOut()
except Exception, ex:
self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex))
return
# Prepare catalogging entry
fileInfo2 = {}
fileInfo2['fileName'] = os.path.basename(experimentFilePath)
fileInfo2['experimentName'] = experimentName
fileInfo2['processingScriptOutput'] = '%s' % stdOut.strip()
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi
class SddsParameterProcessingPlugin(FileProcessor):
PROCESS_SDDS_PARAMETERS_KEY = 'processSddsParameters'
def __init__(self):
FileProcessor.__init__(self)
self.fileMongoDbApi = FileMongoDbApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
daqInfo = fileInfo.get('daqInfo', {})
processSddsParameters = daqInfo.get(self.PROCESS_SDDS_PARAMETERS_KEY)
if not processSddsParameters:
self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName))
return
self.logger.debug('Processing file %s for experiment %s' % (fileInfo, experimentName))
storageDirectory = fileInfo.get('storageDirectory')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
# Load file
try:
import sddsdata
from sdds import SDDS
s = SDDS(0)
self.logger.error('Loading file %s for experiment %s' % (experimentFilePath, experimentName))
s.load(storageFilePath)
except Exception, ex:
self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex))
return
# Prepare catalogging entry
fileInfo2 = {}
fileInfo2['fileName'] = os.path.basename(experimentFilePath)
fileInfo2['experimentName'] = experimentName
for i in range(0,len(s.parameterName)):
parameterName = s.parameterName[i]
parameterData = s.parameterData[i][0]
fileInfo2[parameterName] = parameterData
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
self.logger.error('SDDS terminate file %s for experiment %s' % (experimentFilePath, experimentName))
sddsdata.Terminate(0)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.utility.dmSubprocess import DmSubprocess
class SgeJobSubmissionPlugin(FileProcessor):
SGE_JOB_SCRIPT_KEY = 'sgeJobScript'
def __init__(self, sgeRoot):
FileProcessor.__init__(self)
self.sgeRoot = sgeRoot
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentName = fileInfo.get('experimentName')
experimentFilePath = fileInfo.get('experimentFilePath')
daqInfo = fileInfo.get('daqInfo', {})
sgeJobScript = daqInfo.get(self.SGE_JOB_SCRIPT_KEY)
if not sgeJobScript:
self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName))
return
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
storageDirectory = fileInfo.get('storageDirectory')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
# Process file
try:
p = DmSubprocess('. %s/default/common/settings.sh ; qsub -v DM_EXPERIMENT_NAME=%s %s %s' % (self.sgeRoot, experimentName, sgeJobScript, storageFilePath))
p.run()
except Exception, ex:
self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex))
return
#######################################################################
# Testing.
if __name__ == '__main__':
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment