Skip to content
Snippets Groups Projects
Commit aedda75c authored by sveseli's avatar sveseli
Browse files

file processing manager now pushes stat utility into processors; mongo plugin can calculate md5sum

parent d5be3d7f
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@ class FileProcessingManager(threading.Thread,Singleton):
NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads'
DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries'
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds'
STAT_UTILITY_KEY = 'statutility'
FILE_PROCESSOR_KEY = 'fileprocessor'
# Singleton.
......@@ -51,6 +52,11 @@ class FileProcessingManager(threading.Thread,Singleton):
self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY))
self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY))
self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
statUtility = cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.STAT_UTILITY_KEY)
if statUtility:
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(statUtility)
self.logger.debug('Creating stat utility of class %s' % className)
statUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor)
# Create processors
for (key,value) in configItems:
......@@ -61,6 +67,7 @@ class FileProcessingManager(threading.Thread,Singleton):
self.logger.debug('Configuring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
fileProcessor.statUtility = statUtility
fileProcessor.configure()
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList = self.fileProcessorDict.keys()
......
......@@ -10,6 +10,7 @@ class FileProcessor:
self.configDict = {}
self.processorName = self.__class__.__name__
self.dependsOn = dependsOn
self.statUtility = None
@abc.abstractmethod
def processDirectory(self, directoryInfo):
......
......@@ -17,12 +17,13 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
DEFAULT_PORT = 2811
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, directoryTransferCommand=DIRECTORY_TRANSFER_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
self.directoryTransferCommand = directoryTransferCommand
self.pluginMustProcessFiles = pluginMustProcessFiles
def getSrcUrl(self, filePath, dataDirectory):
......@@ -160,7 +161,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
# Transfer directory
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, command=self.DIRECTORY_TRANSFER_COMMAND, templateInfo=uploadInfo)
self.start(src=srcUrl, dest=destUrl, command=self.directoryTransferCommand, templateInfo=uploadInfo)
#######################################################################
# Testing.
......
......@@ -13,12 +13,20 @@ class MongoDbFileCatalogPlugin(FileProcessor):
DEFAULT_HDF5_METADATA_COMMAND = None
def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, dependsOn=[]):
def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, md5Sum=False, dependsOn=[]):
FileProcessor.__init__(self, dependsOn=dependsOn)
self.fileMongoDbApi = FileMongoDbApi()
self.hdf5MetadataCommand = hdf5MetadataCommand
self.md5Sum = md5Sum
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processMd5Sum(self, filePath, fileInfo={}):
if not self.md5Sum or not self.statUtility:
return fileInfo
if not fileInfo.has_key('md5Sum'):
fileInfo['md5Sum'] = self.statUtility.getMd5Sum(filePath)
return fileInfo
def processHdf5Metadata(self, filePath, fileInfo={}):
if not self.hdf5MetadataCommand:
return fileInfo
......@@ -42,6 +50,7 @@ class MongoDbFileCatalogPlugin(FileProcessor):
fileInfo[key] = value
else:
self.logger.warn('Key %s already exists for file %s (experiment: %s)' % (key, filePath, experimentName))
return fileInfo
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
......@@ -85,6 +94,7 @@ class MongoDbFileCatalogPlugin(FileProcessor):
del fileInfo2[key]
self.processHdf5Metadata(filePath, fileInfo2)
self.processMd5Sum(filePath, fileInfo2)
self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2)))
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment