#!/usr/bin/env python import os from fileTransferPlugin import FileTransferPlugin from dm.common.utility.osUtility import OsUtility from dm.common.utility.fileUtility import FileUtility from dm.common.utility.ftpUtility import FtpUtility from dm.common.exceptions.fileProcessingError import FileProcessingError from dm.common.utility.dmSubprocess import DmSubprocess from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class RsyncFileTransferPlugin(FileTransferPlugin): DEFAULT_COMMAND = 'rsync -arvlPR --' RSYNC_WITH_MKDIR_COMMAND = 'rsync -arvlPR --rsync-path="mkdir -p %s && rsync" --' DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP --' DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND = 'rsync -arvlP --rsync-path="mkdir -p %s && rsync" --' DRY_RUN_COMMAND = 'rsync -arvlP --dry-run --' def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[], nRetries=None, skipOnFailure=False): FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn) self.dsFileApi = DsRestApiFactory.getFileDsApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal self.pluginMustProcessFiles = pluginMustProcessFiles if nRetries is not None: self.setNumberOfRetries(nRetries) self.setSkipOnFailure(skipOnFailure) def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): if not self.pluginMustProcessFiles: return {} storageDirectory = uploadInfo['storageDirectory'] storageHost = uploadInfo['storageHost'] dataDirectory = uploadInfo['dataDirectory'] destDirectory = uploadInfo.get('destDirectory') dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory) if destDirectory: dryRunCommand = '%s/%s/' % (dryRunCommand, destDirectory) subprocess = DmSubprocess.getSubprocess(dryRunCommand) subprocess.run() lines = subprocess.getStdOut().split('\n') pluginFilePathsDict = {} pathBase = dataDirectory for line in lines: if line.endswith(os.sep): continue filePath = os.path.join(pathBase, line) filePathDict = filePathsDict.get(filePath) if filePathDict: pluginFilePathsDict[filePath] = filePathDict self.logger.debug('Number of original files: %s, number of plugin files: %s' % (len(filePathsDict), len(pluginFilePathsDict))) return pluginFilePathsDict def processFile(self, fileInfo): filePath = fileInfo.get('filePath') dataDirectory = fileInfo.get('dataDirectory') experimentFilePath = fileInfo.get('experimentFilePath') experimentName = fileInfo.get('experimentName') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') destDirectory = fileInfo.get('destDirectory') destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, destDirectory) # Use relative path with respect to data directory as a source srcUrl = self.getSrcUrl(filePath, dataDirectory) # Calculate checksum if not fileInfo.get('fileSize'): FileUtility.statFile(filePath, fileInfo) if self.localMd5Sum: FileUtility.getMd5Sum(filePath, fileInfo) # Transfer file self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) command = self.command if destDirectory: (scheme, host, port, targetDirectory) = FtpUtility.parseUrl(destUrl) command = self.RSYNC_WITH_MKDIR_COMMAND % targetDirectory command = self.replaceTemplates(command, fileInfo) self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=fileInfo, cwd=dataDirectory) # Get remote checksum if self.remoteMd5Sum: fileInfo2 = {} fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) # If we have not done md5 locally, update file info if not self.localMd5Sum: fileInfo['md5Sum'] = fileMetadata.get('md5Sum') # Verify checksum if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): self.logger.error('Checksum mismatch for file: %s' % filePath) raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) self.logger.debug('Checksum test passed for file %s' % filePath) # Remove file if self.deleteOriginal: self.logger.debug('Deleting file %s' % filePath) OsUtility.removeFile(filePath) def processDirectory(self, directoryInfo): uploadInfo = directoryInfo.get('uploadInfo') dataDirectory = uploadInfo.get('dataDirectory') experimentName = uploadInfo.get('experimentName') storageHost = uploadInfo.get('storageHost') storageDirectory = uploadInfo.get('storageDirectory') destDirectory = uploadInfo.get('destDirectory') destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, destDirectory) srcUrl = self.getSrcDirUrl(dataDirectory) # Transfer directory self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) command = self.DIRECTORY_TRANSFER_COMMAND if destDirectory: (scheme, host, port, targetDirectory) = FtpUtility.parseUrl(destUrl) command = self.DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND % targetDirectory command = self.replaceTemplates(command, uploadInfo) self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=uploadInfo, cwd=dataDirectory) ####################################################################### # Testing. if __name__ == '__main__': ft = RsyncFileTransferPlugin(command='rsync -arvlPR --exclude "*.svn" --exclude "*.pyc"') ft.checkUploadFilesForProcessing(['/home/sveseli/Work/DM/dev/src/python/dm/common/processing/plugins/fileProcessor.py'], {'storageDirectory' : '/opt/DM/dev', 'storageHost' : 'dm@dmstorage', 'dataDirectory' : '/home/sveseli/Work/DM/dev/src'})