From 9fa4c3d0a491ed0f1a06134b2cbf1a46e40293ed Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Thu, 8 Oct 2015 07:52:12 +0000 Subject: [PATCH] added rsync plugin with checksums before/after transfer, and file deletion after transfer --- ...WithChecksumAndDeleteFileTransferPlugin.py | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100755 src/python/dm/daq_web_service/service/impl/rsyncWithChecksumAndDeleteFileTransferPlugin.py diff --git a/src/python/dm/daq_web_service/service/impl/rsyncWithChecksumAndDeleteFileTransferPlugin.py b/src/python/dm/daq_web_service/service/impl/rsyncWithChecksumAndDeleteFileTransferPlugin.py new file mode 100755 index 00000000..2db93d4a --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/rsyncWithChecksumAndDeleteFileTransferPlugin.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +import os +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.osUtility import OsUtility +from dm.common.utility.fileUtility import FileUtility +from dm.common.exceptions.fileProcessingError import FileProcessingError +from dm.common.processing.plugins.fileTransferPlugin import FileTransferPlugin +from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory + +class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): + + COMMAND = 'rsync -arvlPR' + + def __init__(self, src=None, dest=None): + FileTransferPlugin.__init__(self, self.COMMAND, src, dest) + self.dsFileApi = DsRestApiFactory.getFileRestApi() + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + + def processFile(self, fileInfo): + filePath = fileInfo.get('filePath') + dataDirectory = fileInfo.get('dataDirectory') + experiment = fileInfo.get('experiment') + experimentName = experiment.get('name') + experimentFilePath = fileInfo.get('experimentFilePath') + storageHost = experiment.get('storageHost') + storageDirectory = experiment.get('storageDirectory') + destUrl = self.getDestUrl(storageHost, storageDirectory) + # Use relative path with respect to data directory as a source + os.chdir(dataDirectory) + srcUrl = self.getSrcUrl(filePath, dataDirectory) + + # Calculate checksum + FileUtility.statFile(filePath, fileInfo) + FileUtility.getMd5Sum(filePath, fileInfo) + self.logger.debug('File info before transfer: %s' % fileInfo) + + # Transfer file + self.start(srcUrl, destUrl) + + # Get remote checksum + fileInfo2 = {} + fileInfo2['experimentFilePath'] = experimentFilePath + fileInfo2['experimentName'] = experimentName + fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) + self.logger.debug('File stat after transfer: %s' % fileMetadata) + + # Verify checksum + if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): + self.logger.error('Checksum mismatch for file: %s' % filePath) + raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) + + # Remove file + self.logger.debug('Checksum test passed, deleting file %s' % filePath) + OsUtility.removeFile(srcUrl) + +####################################################################### +# Testing. +if __name__ == '__main__': + ft = RsyncWithChecksumAndDeleteFileTransferPlugin('/tmp/xyz', '/tmp/xyz2') + ft.start() + print 'StdOut: ', ft.getStdOut() + print 'StdErr: ', ft.getStdErr() + print 'Exit Status: ', ft.getExitStatus() -- GitLab