#!/usr/bin/env python import os from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.osUtility import OsUtility from dm.common.utility.fileUtility import FileUtility from dm.common.exceptions.fileProcessingError import FileProcessingError from dm.common.processing.plugins.fileTransferPlugin import FileTransferPlugin from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): COMMAND = 'rsync -arvlPR' def __init__(self, src=None, dest=None): FileTransferPlugin.__init__(self, self.COMMAND, src, dest) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) def processFile(self, fileInfo): filePath = fileInfo.get('filePath') dataDirectory = fileInfo.get('dataDirectory') experiment = fileInfo.get('experiment') experimentName = experiment.get('name') experimentFilePath = fileInfo.get('experimentFilePath') storageHost = experiment.get('storageHost') storageDirectory = experiment.get('storageDirectory') destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) # Use relative path with respect to data directory as a source os.chdir(dataDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) # Calculate checksum FileUtility.statFile(filePath, fileInfo) FileUtility.getMd5Sum(filePath, fileInfo) self.logger.debug('File info before transfer: %s' % fileInfo) # Transfer file self.start(srcUrl, destUrl) # Get remote checksum fileInfo2 = {} fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) self.logger.debug('File stat after transfer: %s' % fileMetadata) # Verify checksum if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): self.logger.error('Checksum mismatch for file: %s' % filePath) raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) # Remove file self.logger.debug('Checksum test passed, deleting file %s' % filePath) OsUtility.removeFile(srcUrl) ####################################################################### # Testing. if __name__ == '__main__': ft = RsyncWithChecksumAndDeleteFileTransferPlugin('/tmp/xyz', '/tmp/xyz2') ft.start() print 'StdOut: ', ft.getStdOut() print 'StdErr: ', ft.getStdErr() print 'Exit Status: ', ft.getExitStatus()