diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index ddd77402095316f03b065c37b78c6a56cd36ff24..23638d8c55010a8fbcc9490187c6550d0705fcab 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -71,6 +71,13 @@ class FileProcessingManager(threading.Thread,Singleton): self.fileProcessingQueue.push(fileInfo) self.eventFlag.set() + def appendFileProcessor(self, fileProcessor): + key = fileProcessor.__class__.__name__ + self.logger.debug('Adding file processor: %s' % key) + self.fileProcessorDict[key] = fileProcessor + self.fileProcessorKeyList.append(key) + self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList) + def start(self): self.lock.acquire() try: diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index 79d202ad0dda9f2b855b09f6ba44205fc1ab34c0..8a85fe6eef00c93cda2557179ae71ad1dd9a1974 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -70,7 +70,7 @@ class FileProcessingThread(threading.Thread): retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) self.fileProcessingQueue.push(fileInfo, retryWaitPeriod) - # Do not process this file firther until + # Do not process this file further until # this plugin is done break diff --git a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py new file mode 100755 index 0000000000000000000000000000000000000000..f397d56652dde41830f6b58ba202dd037010fe59 --- /dev/null +++ b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +import os +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.osUtility import OsUtility +from dm.common.utility.fileUtility import FileUtility +from dm.common.exceptions.fileProcessingError import FileProcessingError +from dm.common.processing.plugins.fileTransferPlugin import FileTransferPlugin +from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory + +class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): + + COMMAND = 'rsync -arvlPR' + + def __init__(self, src=None, dest=None): + FileTransferPlugin.__init__(self, self.COMMAND, src, dest) + self.dsFileApi = DsRestApiFactory.getFileRestApi() + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + + def processFile(self, fileInfo): + filePath = fileInfo.get('filePath') + dataDirectory = fileInfo.get('dataDirectory') + experiment = fileInfo.get('experiment') + experimentName = experiment.get('name') + experimentFilePath = fileInfo.get('experimentFilePath') + storageHost = experiment.get('storageHost') + storageDirectory = experiment.get('storageDirectory') + destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) + # Use relative path with respect to data directory as a source + os.chdir(dataDirectory) + srcUrl = self.getSrcUrl(filePath, dataDirectory) + + # Calculate checksum + FileUtility.statFile(filePath, fileInfo) + FileUtility.getMd5Sum(filePath, fileInfo) + self.logger.debug('File info before transfer: %s' % fileInfo) + + # Transfer file + self.start(srcUrl, destUrl) + + # Get remote checksum + fileInfo2 = {} + fileInfo2['experimentFilePath'] = experimentFilePath + fileInfo2['experimentName'] = experimentName + fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) + self.logger.debug('File stat after transfer: %s' % fileMetadata) + + # Verify checksum + if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): + self.logger.error('Checksum mismatch for file: %s' % filePath) + raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) + + # Remove file + self.logger.debug('Checksum test passed, deleting file %s' % filePath) + OsUtility.removeFile(srcUrl) + +####################################################################### +# Testing. +if __name__ == '__main__': + ft = RsyncWithChecksumAndDeleteFileTransferPlugin('/tmp/xyz', '/tmp/xyz2') + ft.start() + print 'StdOut: ', ft.getStdOut() + print 'StdErr: ', ft.getStdErr() + print 'Exit Status: ', ft.getExitStatus()