From aedda75c2aa7c8d69114d605b0afc6efc5ae5e14 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Fri, 27 May 2016 19:33:03 +0000 Subject: [PATCH] file processing manager now pushes stat utility into processors; mongo plugin can calculate md5sum --- .../dm/common/processing/fileProcessingManager.py | 7 +++++++ .../dm/common/processing/plugins/fileProcessor.py | 1 + .../processing/plugins/gridftpFileTransferPlugin.py | 5 +++-- .../processing/plugins/mongoDbFileCatalogPlugin.py | 12 +++++++++++- 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index dde6b475..76aebefb 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -18,6 +18,7 @@ class FileProcessingManager(threading.Thread,Singleton): NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads' DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries' DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds' + STAT_UTILITY_KEY = 'statutility' FILE_PROCESSOR_KEY = 'fileprocessor' # Singleton. @@ -51,6 +52,11 @@ class FileProcessingManager(threading.Thread,Singleton): self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY)) self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY)) self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY)) + statUtility = cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.STAT_UTILITY_KEY) + if statUtility: + (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(statUtility) + self.logger.debug('Creating stat utility of class %s' % className) + statUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor) # Create processors for (key,value) in configItems: @@ -61,6 +67,7 @@ class FileProcessingManager(threading.Thread,Singleton): self.logger.debug('Configuring file processor %s' % fileProcessor) fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries) fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds) + fileProcessor.statUtility = statUtility fileProcessor.configure() self.fileProcessorDict[key] = fileProcessor self.fileProcessorKeyList = self.fileProcessorDict.keys() diff --git a/src/python/dm/common/processing/plugins/fileProcessor.py b/src/python/dm/common/processing/plugins/fileProcessor.py index 435c8159..853452ff 100755 --- a/src/python/dm/common/processing/plugins/fileProcessor.py +++ b/src/python/dm/common/processing/plugins/fileProcessor.py @@ -10,6 +10,7 @@ class FileProcessor: self.configDict = {} self.processorName = self.__class__.__name__ self.dependsOn = dependsOn + self.statUtility = None @abc.abstractmethod def processDirectory(self, directoryInfo): diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 40f1c7ba..9b5aa797 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -17,12 +17,13 @@ class GridftpFileTransferPlugin(FileTransferPlugin): DEFAULT_PORT = 2811 - def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): + def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, directoryTransferCommand=DIRECTORY_TRANSFER_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal + self.directoryTransferCommand = directoryTransferCommand self.pluginMustProcessFiles = pluginMustProcessFiles def getSrcUrl(self, filePath, dataDirectory): @@ -160,7 +161,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): # Transfer directory self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) - self.start(src=srcUrl, dest=destUrl, command=self.DIRECTORY_TRANSFER_COMMAND, templateInfo=uploadInfo) + self.start(src=srcUrl, dest=destUrl, command=self.directoryTransferCommand, templateInfo=uploadInfo) ####################################################################### # Testing. diff --git a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py index 64ea7638..92afba0b 100755 --- a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py +++ b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py @@ -13,12 +13,20 @@ class MongoDbFileCatalogPlugin(FileProcessor): DEFAULT_HDF5_METADATA_COMMAND = None - def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, dependsOn=[]): + def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, md5Sum=False, dependsOn=[]): FileProcessor.__init__(self, dependsOn=dependsOn) self.fileMongoDbApi = FileMongoDbApi() self.hdf5MetadataCommand = hdf5MetadataCommand + self.md5Sum = md5Sum self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + def processMd5Sum(self, filePath, fileInfo={}): + if not self.md5Sum or not self.statUtility: + return fileInfo + if not fileInfo.has_key('md5Sum'): + fileInfo['md5Sum'] = self.statUtility.getMd5Sum(filePath) + return fileInfo + def processHdf5Metadata(self, filePath, fileInfo={}): if not self.hdf5MetadataCommand: return fileInfo @@ -42,6 +50,7 @@ class MongoDbFileCatalogPlugin(FileProcessor): fileInfo[key] = value else: self.logger.warn('Key %s already exists for file %s (experiment: %s)' % (key, filePath, experimentName)) + return fileInfo def processFile(self, fileInfo): filePath = fileInfo.get('filePath') @@ -85,6 +94,7 @@ class MongoDbFileCatalogPlugin(FileProcessor): del fileInfo2[key] self.processHdf5Metadata(filePath, fileInfo2) + self.processMd5Sum(filePath, fileInfo2) self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2))) self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) -- GitLab