diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index dde6b4754be052ae81392ab89eb77b7e04077f58..76aebefb7becf2c27cda684b870186b8326d460b 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -18,6 +18,7 @@ class FileProcessingManager(threading.Thread,Singleton): NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads' DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries' DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds' + STAT_UTILITY_KEY = 'statutility' FILE_PROCESSOR_KEY = 'fileprocessor' # Singleton. @@ -51,6 +52,11 @@ class FileProcessingManager(threading.Thread,Singleton): self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY)) self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY)) self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY)) + statUtility = cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.STAT_UTILITY_KEY) + if statUtility: + (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(statUtility) + self.logger.debug('Creating stat utility of class %s' % className) + statUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor) # Create processors for (key,value) in configItems: @@ -61,6 +67,7 @@ class FileProcessingManager(threading.Thread,Singleton): self.logger.debug('Configuring file processor %s' % fileProcessor) fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries) fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds) + fileProcessor.statUtility = statUtility fileProcessor.configure() self.fileProcessorDict[key] = fileProcessor self.fileProcessorKeyList = self.fileProcessorDict.keys() diff --git a/src/python/dm/common/processing/plugins/fileProcessor.py b/src/python/dm/common/processing/plugins/fileProcessor.py index 435c81598a445b80a697d13f023e58f4741bf39b..853452ffa5f1542acb584442b5867aa9de4c26a8 100755 --- a/src/python/dm/common/processing/plugins/fileProcessor.py +++ b/src/python/dm/common/processing/plugins/fileProcessor.py @@ -10,6 +10,7 @@ class FileProcessor: self.configDict = {} self.processorName = self.__class__.__name__ self.dependsOn = dependsOn + self.statUtility = None @abc.abstractmethod def processDirectory(self, directoryInfo): diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 40f1c7baf8bfe25c405c3ac35ab51b6e29abbbfb..9b5aa797430734a4f1d48c6b6db166fb6cd1ab2c 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -17,12 +17,13 @@ class GridftpFileTransferPlugin(FileTransferPlugin): DEFAULT_PORT = 2811 - def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): + def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, directoryTransferCommand=DIRECTORY_TRANSFER_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal + self.directoryTransferCommand = directoryTransferCommand self.pluginMustProcessFiles = pluginMustProcessFiles def getSrcUrl(self, filePath, dataDirectory): @@ -160,7 +161,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): # Transfer directory self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) - self.start(src=srcUrl, dest=destUrl, command=self.DIRECTORY_TRANSFER_COMMAND, templateInfo=uploadInfo) + self.start(src=srcUrl, dest=destUrl, command=self.directoryTransferCommand, templateInfo=uploadInfo) ####################################################################### # Testing. diff --git a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py index 64ea7638e351b7a8e3801a263eadf51718aff871..92afba0ba3a025504cfe84ce6d739ecbfeadbddd 100755 --- a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py +++ b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py @@ -13,12 +13,20 @@ class MongoDbFileCatalogPlugin(FileProcessor): DEFAULT_HDF5_METADATA_COMMAND = None - def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, dependsOn=[]): + def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, md5Sum=False, dependsOn=[]): FileProcessor.__init__(self, dependsOn=dependsOn) self.fileMongoDbApi = FileMongoDbApi() self.hdf5MetadataCommand = hdf5MetadataCommand + self.md5Sum = md5Sum self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + def processMd5Sum(self, filePath, fileInfo={}): + if not self.md5Sum or not self.statUtility: + return fileInfo + if not fileInfo.has_key('md5Sum'): + fileInfo['md5Sum'] = self.statUtility.getMd5Sum(filePath) + return fileInfo + def processHdf5Metadata(self, filePath, fileInfo={}): if not self.hdf5MetadataCommand: return fileInfo @@ -42,6 +50,7 @@ class MongoDbFileCatalogPlugin(FileProcessor): fileInfo[key] = value else: self.logger.warn('Key %s already exists for file %s (experiment: %s)' % (key, filePath, experimentName)) + return fileInfo def processFile(self, fileInfo): filePath = fileInfo.get('filePath') @@ -85,6 +94,7 @@ class MongoDbFileCatalogPlugin(FileProcessor): del fileInfo2[key] self.processHdf5Metadata(filePath, fileInfo2) + self.processMd5Sum(filePath, fileInfo2) self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2))) self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)