#!/usr/bin/env python import os from fileTransferPlugin import FileTransferPlugin from dm.common.utility.osUtility import OsUtility from dm.common.utility.fileUtility import FileUtility from dm.common.exceptions.fileProcessingError import FileProcessingError from dm.common.utility.dmSubprocess import DmSubprocess from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class RsyncFileTransferPlugin(FileTransferPlugin): DEFAULT_COMMAND = 'rsync -arvlPR' DRY_RUN_COMMAND = 'rsync -arvlP' def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True): FileTransferPlugin.__init__(self, command, src, dest) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal self.pluginMustProcessFiles = pluginMustProcessFiles def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): if not self.pluginMustProcessFiles: return {} storageDirectory = uploadInfo['storageDirectory'] storageHost = uploadInfo['storageHost'] dataDirectory = uploadInfo['dataDirectory'] dryRunCommand = '%s --dry-run %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory) subprocess = DmSubprocess.getSubprocess(dryRunCommand) subprocess.run() lines = subprocess.getStdOut().split('\n') pluginFilePathsDict = {} pathBase = dataDirectory for line in lines: if line.endswith(os.sep): continue filePath = os.path.join(pathBase, line) filePathDict = filePathsDict.get(filePath) if filePathDict: pluginFilePathsDict[filePath] = filePathDict self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict)) return pluginFilePathsDict def processFile(self, fileInfo): filePath = fileInfo.get('filePath') dataDirectory = fileInfo.get('dataDirectory') experimentFilePath = fileInfo.get('experimentFilePath') experimentName = fileInfo.get('experimentName') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) # Use relative path with respect to data directory as a source os.chdir(dataDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) # Calculate checksum FileUtility.statFile(filePath, fileInfo) if self.localMd5Sum: FileUtility.statFile(filePath, fileInfo) FileUtility.getMd5Sum(filePath, fileInfo) # Transfer file self.logger.debug('Starting transfer: %s' % fileInfo) self.start(srcUrl, destUrl, fileInfo) # Get remote checksum if self.remoteMd5Sum: fileInfo2 = {} fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) # If we have not done md5 locally, update file info if not self.localMd5Sum: fileInfo['md5Sum'] = fileMetadata.get('md5Sum') # Verify checksum if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): self.logger.error('Checksum mismatch for file: %s' % filePath) raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) self.logger.debug('Checksum test passed for file %s' % filePath) # Remove file if self.deleteOriginal: self.logger.debug('Deleting file %s' % filePath) OsUtility.removeFile(srcUrl) ####################################################################### # Testing. if __name__ == '__main__': ft = RsyncFileTransferPlugin(command='rsync -arvlPR --exclude "*.svn" --exclude "*.pyc"') ft.checkUploadFilesForProcessing(['/home/sveseli/Work/DM/dev/src/python/dm/common/processing/plugins/fileProcessor.py'], {'storageDirectory' : '/opt/DM/dev', 'storageHost' : 'dm@dmstorage', 'dataDirectory' : '/home/sveseli/Work/DM/dev/src'})