Forked from
DM / dm-docs
261 commits behind, 811 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
sgeJobSubmissionPlugin.py 1.70 KiB
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.utility.dmSubprocess import DmSubprocess
class SgeJobSubmissionPlugin(FileProcessor):
SGE_JOB_SCRIPT_KEY = 'sgeJobScript'
def __init__(self, sgeRoot):
FileProcessor.__init__(self)
self.sgeRoot = sgeRoot
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentName = fileInfo.get('experimentName')
experimentFilePath = fileInfo.get('experimentFilePath')
daqInfo = fileInfo.get('daqInfo', {})
sgeJobScript = daqInfo.get(self.SGE_JOB_SCRIPT_KEY)
if not sgeJobScript:
self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName))
return
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
storageDirectory = fileInfo.get('storageDirectory')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
# Process file
try:
p = DmSubprocess('. %s/default/common/settings.sh ; qsub -v DM_EXPERIMENT_NAME=%s %s %s' % (self.sgeRoot, experimentName, sgeJobScript, storageFilePath))
p.run()
except Exception, ex:
self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex))
return
#######################################################################
# Testing.
if __name__ == '__main__':
pass