Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 664 additions and 0 deletions
#!/usr/bin/env python
#
# Base object manager class.
#
#######################################################################
import threading
from dm.common.utility.loggingManager import LoggingManager
#######################################################################
class DmObjectManager:
""" Base object manager class. """
def __init__(self):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.lock = threading.RLock()
def getLogger(self):
return self.logger
def acquireLock(self):
self.lock.acquire()
def releaseLock(self):
self.lock.release()
#!/usr/bin/env python
import time
from dmObject import DmObject
class Experiment(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'dataDirectory', 'startDate', 'endDate', 'daqStartTime', 'daqEndTime' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
import time
from dmObject import DmObject
class ExperimentType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description`', 'rootDataPath' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dmObject import DmObject
class FileMetadata(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'locationList' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import time
import os
from dmObject import DmObject
class ObservedFile(DmObject):
DEFAULT_KEY_LIST = [ 'filePath', 'lastUpdatedTimestamp' ]
def __init__(self, dict={}, filePath=None, dataDirectory=None, experiment=None):
DmObject.__init__(self, dict)
if filePath:
self['filePath'] = filePath
if dataDirectory:
self['dataDirectory'] = dataDirectory
if filePath:
self['experimentFilePath'] = os.path.relpath(filePath, dataDirectory)
if experiment:
self['experiment'] = experiment
def setLastUpdatedTimestampToNow(self):
self['lastUpdateTimestamp'] = time.time()
def getLastUpdatedTimestamp(self):
self.get('lastUpdateTimestamp')
def getFilePath(self):
return self.get('filePath')
def getDataDirectory(self):
return self.get('dataDirectory')
def getExperiment(self):
return self.get('experiment')
####################################################################
# Testing
if __name__ == '__main__':
of = ObservedFile(filePath='tmp/xyz')
print of
of.setLastUpdatedTimestampToNow()
print of
#!/usr/bin/env python
from dmObject import DmObject
class RoleType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class UserExperimentRole(DmObject):
DEFAULT_KEY_LIST = [ 'user_id', 'experiment_id', 'role_type_id' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class UserInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'username', 'firstName', 'lastName', 'middleName', 'email', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class UserSystemRole(DmObject):
DEFAULT_KEY_LIST = [ 'user_id', 'role_type_id' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import abc
class FileProcessorInterface:
@abc.abstractmethod
def processFile(self, filePath, daqPath, experiment):
return NotImplemented
#!/usr/bin/env python
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.invalidRequest import InvalidRequest
from fileProcessorInterface import FileProcessorInterface
class FileTransferPlugin(FileProcessorInterface):
def __init__(self, command, src=None, dest=None):
self.src = src
self.dest = dest
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
if command is None or not len(command):
raise InvalidArgument('File transfer command must be non-empty string.')
self.command = command
self.subprocess = None
def processFile(self, filePath, daqPath, experiment):
storageHost = experiment.get('storageHost')
storageDirectory = experiment.get('storageDirectory')
dest = '%s:%s' % (storageHost, storageDirectory)
# Use relative path with respect to daq directory as a source
os.chdir(daqPath)
src = os.path.relpath(filePath, daqPath)
self.start(src, dest)
def getFullCommand(self, src, dest):
return '%s %s %s' % (self.command, src, dest)
def setSrc(self, src):
self.src = src
def setDest(self, dest):
self.dest = dest
def start(self, src=None, dest=None):
# Use preconfigured source if provided source is None
fileSrc = src
if src is None:
fileSrc = self.src
# Use provided destination only if preconfigured destination is None
# Plugins may have desired destination preconfigured for all files
fileDest = self.dest
if self.dest is None:
fileDest = dest
if not fileSrc or not fileDest:
raise InvalidRequest('Both source and destination must be non-empty strings.')
self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest))
return self.subprocess.run()
def wait(self):
if self.subprocess:
return self.subprocess.wait()
return None
def poll(self):
if self.subprocess:
return self.subprocess.poll()
return None
def getStdOut(self):
if self.subprocess:
return self.subprocess.getStdOut()
return None
def getStdErr(self):
if self.subprocess:
return self.subprocess.getStdErr()
return None
def getExitStatus(self):
if self.subprocess:
return self.subprocess.getExitStatus()
return None
def reset(self):
self.subprocess = None
#######################################################################
# Testing.
if __name__ == '__main__':
ft = FileTransfer('rsync -arv', '/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
#!/usr/bin/env python
from fileTransferPlugin import FileTransferPlugin
class RsyncFileTransferPlugin(FileTransferPlugin):
COMMAND = 'rsync -arvlPR'
def __init__(self, src=None, dest=None):
FileTransferPlugin.__init__(self, self.COMMAND, src, dest)
#######################################################################
# Testing.
if __name__ == '__main__':
ft = RsyncFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
#!/usr/bin/env python
import threading
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue
from dm.common.utility.singleton import Singleton
from fileProcessingThread import FileProcessingThread
class FileProcessingManager(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileProcessingManager'
NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads'
DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries'
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds'
FILE_PROCESSOR_KEY = 'fileprocessor'
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
FileProcessingManager.__instanceLock.acquire()
try:
if FileProcessingManager.__instance:
return
FileProcessingManager.__instance = self
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.fileProcessingThreadDict = {}
self.eventFlag = threading.Event()
self.fileProcessorDict = {}
self.fileProcessorKeyList = []
self.fileProcessingQueue = TimeBasedProcessingQueue()
self.processedFileDict = {}
self.unprocessedFileDict = {}
self.__configure()
self.logger.debug('Initialization complete')
finally:
FileProcessingManager.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY))
self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY))
self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
# Create processors
for (key,value) in configItems:
if key.startswith(FileProcessingManager.FILE_PROCESSOR_KEY):
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value)
self.logger.debug('Creating file processor instance of class %s' % className)
fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.logger.debug('Configuring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
fileProcessor.configure()
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList = self.fileProcessorDict.keys()
self.fileProcessorKeyList.sort()
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
def processFile(self, fileInfo):
self.fileProcessingQueue.push(fileInfo)
self.eventFlag.set()
def start(self):
self.lock.acquire()
try:
self.logger.debug('Starting file processing threads')
for i in range(0, self.nProcessingThreads):
tName = 'FileProcessingThread-%s' % i
t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue, self.processedFileDict, self.unprocessedFileDict)
t.start()
self.fileProcessingThreadDict[tName] = t
finally:
self.lock.release()
def stop(self):
self.logger.debug('Stopping file processing threads')
for (tName, t) in self.fileProcessingThreadDict.items():
t.stop()
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
for (tName, t) in self.fileProcessingThreadDict.items():
t.join()
def setEvent(self):
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
def clearEvent(self):
self.lock.acquire()
try:
self.eventFlag.clear()
finally:
self.lock.release()
def waitOnEvent(self, timeoutInSeconds=None):
self.eventFlag.wait(timeoutInSeconds)
####################################################################
# Testing
if __name__ == '__main__':
fp = FileProcessingManager.getInstance()
print fp
#fp.start()
#time.sleep(30)
#fp.stop()
#!/usr/bin/env python
import threading
from dm.common.utility.loggingManager import LoggingManager
class FileProcessingThread(threading.Thread):
THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0
def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict):
threading.Thread.__init__(self)
self.setName(name)
self.exitFlag = False
self.fileProcessingManager = fileProcessingManager
self.fileProcessorDict = fileProcessorDict
self.fileProcessorKeyList = fileProcessorKeyList
self.fileProcessingQueue = fileProcessingQueue
self.processedFileDict = processedFileDict
self.unprocessedFileDict = unprocessedFileDict
self.logger = LoggingManager.getInstance().getLogger(name)
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
while True:
self.fileProcessingManager.clearEvent()
if self.exitFlag:
self.logger.debug('Exit flag is set')
break
while True:
fileInfo = self.fileProcessingQueue.pop()
if fileInfo is None:
break
filePath = fileInfo.get('filePath')
try:
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
processorName = processor.__class__.__name__
fileProcessedByDict = fileInfo.get('processedByDict', {})
fileInfo['processedByDict'] = fileProcessedByDict
if fileProcessedByDict.has_key(processorName):
self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName))
continue
self.logger.debug('%s is about to process file %s ' % (processorName, fileInfo))
try:
processor.processFile(fileInfo)
fileProcessedByDict[processorName] = True
self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
except Exception, ex:
self.logger.exception(ex)
self.logger.debug('%s processing failed for file at path %s ' % (processorName, filePath))
fileProcessingDict = fileInfo.get('processingDict', {})
fileInfo['processingDict'] = fileProcessingDict
processorDict = fileProcessingDict.get(processorName, {})
fileProcessingDict[processorName] = processorDict
processorDict['lastError'] = str(ex)
nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries())
self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
if nRetriesLeft <= 0:
self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo))
self.unprocessedFileDict[filePath] = fileInfo
else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
# Do not process this file firther until
# this plugin is done
break
except Exception, ex:
self.logger.exception(ex)
self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS)
self.logger.debug('%s is done' % self.getName())
def stop(self):
self.exitFlag = True
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
import abc
class FileProcessor:
DEFAULT_NUMBER_OF_RETRIES = 0
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS = 60
def __init__(self):
self.configDict = {}
@abc.abstractmethod
def processFile(self, fileInfo):
return NotImplemented
def configure(self):
# Use this method for processor configuration
pass
def setConfigKeyValue(self, key, value):
self.configDict[key] = value
def getConfigKeyValue(self, key):
return self.configDict.get(key)
def setNumberOfRetries(self, nRetries):
self.configDict['numberOfRetries'] = nRetries
def getNumberOfRetries(self):
return self.configDict.get('numberOfRetries', self.DEFAULT_NUMBER_OF_RETRIES)
def setRetryWaitPeriodInSeconds(self, waitPeriod):
self.configDict['retryWaitPeriodInSeconds'] = waitPeriod
def getRetryWaitPeriodInSeconds(self):
return self.configDict.get('retryWaitPeriodInSeconds', self.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS)
#!/usr/bin/env python
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.invalidRequest import InvalidRequest
from fileProcessor import FileProcessor
class FileTransferPlugin(FileProcessor):
def __init__(self, command, src=None, dest=None):
FileProcessor.__init__(self)
self.src = src
self.dest = dest
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
if command is None or not len(command):
raise InvalidArgument('File transfer command must be non-empty string.')
self.command = command
self.subprocess = None
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experiment = fileInfo.get('experiment')
storageHost = experiment.get('storageHost')
storageDirectory = experiment.get('storageDirectory')
destUrl = self.getDestUrl(storageHost, storageDirectory)
# Use relative path with respect to data directory as a source
os.chdir(dataDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
self.start(srcUrl, destUrl)
def getSrcUrl(self, filePath, dataDirectory):
srcUrl = os.path.relpath(filePath, dataDirectory)
return srcUrl
def getDestUrl(self, storageHost, storageDirectory):
destUrl = '%s:%s' % (storageHost, storageDirectory)
return destUrl
def getFullCommand(self, src, dest):
return '%s %s %s' % (self.command, src, dest)
def setSrc(self, src):
self.src = src
def setDest(self, dest):
self.dest = dest
def start(self, src=None, dest=None):
# Use preconfigured source if provided source is None
fileSrc = src
if src is None:
fileSrc = self.src
# Use provided destination only if preconfigured destination is None
# Plugins may have desired destination preconfigured for all files
fileDest = self.dest
if self.dest is None:
fileDest = dest
if not fileSrc or not fileDest:
raise InvalidRequest('Both source and destination must be non-empty strings.')
self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest))
return self.subprocess.run()
def wait(self):
if self.subprocess:
return self.subprocess.wait()
return None
def poll(self):
if self.subprocess:
return self.subprocess.poll()
return None
def getStdOut(self):
if self.subprocess:
return self.subprocess.getStdOut()
return None
def getStdErr(self):
if self.subprocess:
return self.subprocess.getStdErr()
return None
def getExitStatus(self):
if self.subprocess:
return self.subprocess.getExitStatus()
return None
def reset(self):
self.subprocess = None
#######################################################################
# Testing.
if __name__ == '__main__':
ft = FileTransfer('rsync -arv', '/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
#!/usr/bin/env python
from fileTransferPlugin import FileTransferPlugin
class GridftpFileTransferPlugin(FileTransferPlugin):
COMMAND = 'globus-url-copy -v'
def __init__(self, src=None, dest=None):
FileTransferPlugin.__init__(self, self.COMMAND, src, dest)
def getSrcUrl(self, filePath, dataDirectory):
srcUrl = 'file://%s' % filePath
return srcUrl
def getDestUrl(self, storageHost, storageDirectory):
destUrl = 'sshftp://%s/%s/' % (storageHost, storageDirectory)
return destUrl
#######################################################################
# Testing.
if __name__ == '__main__':
ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()