Skip to content
Snippets Groups Projects
Commit 62d03240 authored by sveseli's avatar sveseli
Browse files

move processing classes under common area

parent 84e63feb
No related branches found
No related tags found
No related merge requests found
......@@ -6,14 +6,16 @@ import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.threadSafeQueue import ThreadSafeQueue
from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue
from dm.common.utility.singleton import Singleton
from dm.daq_web_service.service.impl.fileProcessingThread import FileProcessingThread
from fileProcessingThread import FileProcessingThread
class FileProcessingManager(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileProcessingManager'
N_PROCESSING_THREADS_KEY = 'nprocessingthreads'
NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads'
DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries'
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds'
FILE_PROCESSOR_KEY = 'fileprocessor'
# Singleton.
......@@ -34,7 +36,7 @@ class FileProcessingManager(threading.Thread,Singleton):
self.eventFlag = threading.Event()
self.fileProcessorDict = {}
self.fileProcessorKeyList = []
self.fileProcessingQueue = ThreadSafeQueue()
self.fileProcessingQueue = TimeBasedProcessingQueue()
self.processedFileDict = {}
self.unprocessedFileDict = {}
self.__configure()
......@@ -46,7 +48,9 @@ class FileProcessingManager(threading.Thread,Singleton):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.N_PROCESSING_THREADS_KEY))
self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY))
self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY))
self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
# Create processors
for (key,value) in configItems:
......@@ -54,6 +58,10 @@ class FileProcessingManager(threading.Thread,Singleton):
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value)
self.logger.debug('Creating file processor instance of class %s' % className)
fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.logger.debug('Configring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
fileProcessor.configure()
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList = self.fileProcessorDict.keys()
self.fileProcessorKeyList.sort()
......
......@@ -33,19 +33,39 @@ class FileProcessingThread(threading.Thread):
filePath = file.getFilePath()
daqPath = file.getDaqPath()
experiment = file.getExperiment()
try:
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
processorName = processor.__class__.__name__
fileProcessedByDict = file.get('processedByDict', {})
if fileProcessedByDict.has_key(processorName):
self.logger.debug('%s has already been processed by %s ' % (file, processorName))
continue
self.logger.debug('%s is about to process file %s ' % (processorName, file))
try:
processor.processFile(filePath, daqPath, experiment)
fileProcessedByDict[processorName] = True
file['processedByDict'] = fileProcessedByDict
self.logger.debug('%s processed file %s ' % (processorName, file))
except Exception, ex:
self.logger.exception(ex)
self.logger.debug('%s processing failed for file %s ' % (processorName, file))
file[processorName] = {'error' : ex}
self.unprocessedFileDict[filePath] = file
fileProcessingDict = file.get('processingDict', {})
file['processingDict'] = fileProcessingDict
processorDict = fileProcessingDict.get(processorName, {})
processorDict['lastError'] = ex
nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries())
self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, file, nRetriesLeft))
processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
if nRetriesLeft <= 0:
self.logger.debug('No more %s retries left for file %s' % (processorName, file))
self.unprocessedFileDict[filePath] = file
else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, file, retryWaitPeriod))
self.fileProcessingQueue.push(file, retryWaitPeriod)
except Exception, ex:
self.logger.exception(ex)
......
#!/usr/bin/env python
import abc
class FileProcessor:
DEFAULT_NUMBER_OF_RETRIES = 0
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS = 60
def __init__(self):
self.configDict = {}
@abc.abstractmethod
def processFile(self, filePath, daqPath, experiment):
return NotImplemented
def configure(self):
# Use this method for processor configuration
pass
def setConfigKeyValue(self, key, value):
self.configDict[key] = value
def getConfigKeyValue(self, key):
return self.configDict.get(key)
def setNumberOfRetries(self, nRetries):
self.configDict['numberOfRetries'] = nRetries
def getNumberOfRetries(self):
self.configDict.get('numberOfRetries', self.DEFAULT_NUMBER_OF_RETRIES)
def setRetryWaitPeriodInSeconds(self, waitPeriod):
self.configDict['retryWaitPeriodInSeconds'] = waitPeriod
def getRetryWaitPeriodInSeconds(self):
self.configDict.get('retryWaitPeriodInSeconds', DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS)
#!/usr/bin/env python
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.invalidRequest import InvalidRequest
from fileProcessor import FileProcessor
class FileTransferPlugin(FileProcessor):
def __init__(self, command, src=None, dest=None):
FileProcessor.__init__(self)
self.src = src
self.dest = dest
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
if command is None or not len(command):
raise InvalidArgument('File transfer command must be non-empty string.')
self.command = command
self.subprocess = None
def processFile(self, filePath, daqPath, experiment):
storageHost = experiment.get('storageHost')
storageDirectory = experiment.get('storageDirectory')
dest = '%s:%s' % (storageHost, storageDirectory)
# Use relative path with respect to daq directory as a source
os.chdir(daqPath)
src = os.path.relpath(filePath, daqPath)
self.start(src, dest)
def getFullCommand(self, src, dest):
return '%s %s %s' % (self.command, src, dest)
def setSrc(self, src):
self.src = src
def setDest(self, dest):
self.dest = dest
def start(self, src=None, dest=None):
# Use preconfigured source if provided source is None
fileSrc = src
if src is None:
fileSrc = self.src
# Use provided destination only if preconfigured destination is None
# Plugins may have desired destination preconfigured for all files
fileDest = self.dest
if self.dest is None:
fileDest = dest
if not fileSrc or not fileDest:
raise InvalidRequest('Both source and destination must be non-empty strings.')
self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest))
return self.subprocess.run()
def wait(self):
if self.subprocess:
return self.subprocess.wait()
return None
def poll(self):
if self.subprocess:
return self.subprocess.poll()
return None
def getStdOut(self):
if self.subprocess:
return self.subprocess.getStdOut()
return None
def getStdErr(self):
if self.subprocess:
return self.subprocess.getStdErr()
return None
def getExitStatus(self):
if self.subprocess:
return self.subprocess.getExitStatus()
return None
def reset(self):
self.subprocess = None
#######################################################################
# Testing.
if __name__ == '__main__':
ft = FileTransfer('rsync -arv', '/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
#!/usr/bin/env python
from fileTransferPlugin import FileTransferPlugin
class RsyncFileTransferPlugin(FileTransferPlugin):
COMMAND = 'rsync -arvlPR'
def __init__(self, src=None, dest=None):
FileTransferPlugin.__init__(self, self.COMMAND, src, dest)
#######################################################################
# Testing.
if __name__ == '__main__':
ft = RsyncFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
......@@ -10,9 +10,9 @@ from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.singleton import Singleton
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dmFileSystemEventHandler import DmFileSystemEventHandler
from fileProcessingManager import FileProcessingManager
class FileSystemObserver(threading.Thread,Singleton):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment