Skip to content
Snippets Groups Projects
rsyncFileTransferPlugin.py 6.37 KiB
Newer Older
#!/usr/bin/env python

from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory

class RsyncFileTransferPlugin(FileTransferPlugin):

    DEFAULT_COMMAND = 'rsync -arvlPR --'
    RSYNC_WITH_MKDIR_COMMAND = 'rsync -arvlPR --rsync-path="mkdir -p %s && rsync" --'
    DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP --'
    DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND = 'rsync -arvlP --rsync-path="mkdir -p %s && rsync" --'
    DRY_RUN_COMMAND = 'rsync -arvlP --dry-run --'
    def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[], nRetries=None, skipOnFailure=False):
        FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
        self.dsFileApi = DsRestApiFactory.getFileDsApi()
        self.localMd5Sum = localMd5Sum
        self.remoteMd5Sum = remoteMd5Sum
        self.deleteOriginal = deleteOriginal
        self.pluginMustProcessFiles = pluginMustProcessFiles
            self.setNumberOfRetries(nRetries)
        self.setSkipOnFailure(skipOnFailure)

    def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
        if not self.pluginMustProcessFiles:
            return {}
        storageDirectory = uploadInfo['storageDirectory']
        storageHost = uploadInfo['storageHost']
        dataDirectory = uploadInfo['dataDirectory']
        destDirectory = uploadInfo.get('destDirectory')
        dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory)
        if destDirectory:
            dryRunCommand = '%s/%s/' % (dryRunCommand, destDirectory)
        subprocess = DmSubprocess.getSubprocess(dryRunCommand)
        subprocess.run()
        lines = subprocess.getStdOut().split('\n')
        pluginFilePathsDict = {}
        pathBase = dataDirectory
        for line in lines:
            if line.endswith(os.sep):
                continue
            filePath = os.path.join(pathBase, line)
            filePathDict = filePathsDict.get(filePath)
            if filePathDict:
                pluginFilePathsDict[filePath] = filePathDict
sveseli's avatar
sveseli committed
        self.logger.debug('Number of original files: %s, number of plugin files: %s' % (len(filePathsDict), len(pluginFilePathsDict)))
        return pluginFilePathsDict

    def processFile(self, fileInfo):
        filePath = fileInfo.get('filePath')
        dataDirectory = fileInfo.get('dataDirectory')
        experimentFilePath = fileInfo.get('experimentFilePath')
        experimentName = fileInfo.get('experimentName')
        storageHost = fileInfo.get('storageHost')
        storageDirectory = fileInfo.get('storageDirectory')
        destDirectory = fileInfo.get('destDirectory')
        destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, destDirectory)
        # Use relative path with respect to data directory as a source
        srcUrl = self.getSrcUrl(filePath, dataDirectory)
        if not fileInfo.get('fileSize'):
            FileUtility.statFile(filePath, fileInfo)
        if self.localMd5Sum:
            FileUtility.getMd5Sum(filePath, fileInfo)

        # Transfer file
sveseli's avatar
sveseli committed
        self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
        if destDirectory:
            (scheme, host, port, targetDirectory) = FtpUtility.parseUrl(destUrl)
sveseli's avatar
sveseli committed
            command = self.RSYNC_WITH_MKDIR_COMMAND % targetDirectory
            command = self.replaceTemplates(command, fileInfo)
        self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=fileInfo, cwd=dataDirectory)

        # Get remote checksum
        if self.remoteMd5Sum:
            fileInfo2 = {}
            fileInfo2['experimentFilePath'] = experimentFilePath
            fileInfo2['experimentName'] = experimentName
            fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
            # If we have not done md5 locally, update file info
sveseli's avatar
sveseli committed
            if not self.localMd5Sum:
                fileInfo['md5Sum'] = fileMetadata.get('md5Sum')

            # Verify checksum
            if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
                self.logger.error('Checksum mismatch for file: %s' % filePath)
                raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
            self.logger.debug('Checksum test passed for file %s' % filePath)

        # Remove file
        if self.deleteOriginal:
            self.logger.debug('Deleting file %s' % filePath)
sveseli's avatar
sveseli committed
            OsUtility.removeFile(filePath)
    def processDirectory(self, directoryInfo):
        uploadInfo = directoryInfo.get('uploadInfo')
        dataDirectory = uploadInfo.get('dataDirectory')
        experimentName = uploadInfo.get('experimentName')
        storageHost = uploadInfo.get('storageHost')
        storageDirectory = uploadInfo.get('storageDirectory')
        destDirectory = uploadInfo.get('destDirectory')
        destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, destDirectory)
        srcUrl = self.getSrcDirUrl(dataDirectory)

        # Transfer directory
        self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
        command = self.DIRECTORY_TRANSFER_COMMAND
        if destDirectory:
            (scheme, host, port, targetDirectory) = FtpUtility.parseUrl(destUrl)
sveseli's avatar
sveseli committed
            command = self.DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND % targetDirectory
            command = self.replaceTemplates(command, uploadInfo)
        self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=uploadInfo, cwd=dataDirectory)
#######################################################################
# Testing.
if __name__ == '__main__':
    ft = RsyncFileTransferPlugin(command='rsync -arvlPR --exclude "*.svn" --exclude "*.pyc"')
    ft.checkUploadFilesForProcessing(['/home/sveseli/Work/DM/dev/src/python/dm/common/processing/plugins/fileProcessor.py'], {'storageDirectory' : '/opt/DM/dev', 'storageHost' : 'dm@dmstorage', 'dataDirectory' : '/home/sveseli/Work/DM/dev/src'})