Skip to content
Snippets Groups Projects
gridftpFileTransferPlugin.py 9.52 KiB
Newer Older
sveseli's avatar
sveseli committed
#!/usr/bin/env python

import copy
sveseli's avatar
sveseli committed
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
sveseli's avatar
sveseli committed
from dm.common.utility.sftpUtility import SftpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
sveseli's avatar
sveseli committed
class GridftpFileTransferPlugin(FileTransferPlugin):

    #DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 3 -sync -sync-level 2'
    #DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 8 -sync -sync-level 2'
    DEFAULT_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8'
    DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8 -cc 8'
sveseli's avatar
sveseli committed

    def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, directoryTransferCommand=DIRECTORY_TRANSFER_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[], nRetries=None, skipOnFailure=False):
        FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
        self.dsFileApi = DsRestApiFactory.getFileDsApi()
        self.localMd5Sum = localMd5Sum
        self.remoteMd5Sum = remoteMd5Sum
        self.deleteOriginal = deleteOriginal
        self.directoryTransferCommand = directoryTransferCommand 
        self.pluginMustProcessFiles = pluginMustProcessFiles 
            self.setNumberOfRetries(nRetries)
        self.setSkipOnFailure(skipOnFailure)
sveseli's avatar
sveseli committed

sveseli's avatar
sveseli committed
    def replaceSpecialCharacters(self, url):
        replacementMap = {
	    '#' : '%23',
	    ' ' : '%20',
	    '~' : '%7E',
	}
	for (original, replacement) in replacementMap.items():
	    url = url.replace(original,replacement)
	return url

sveseli's avatar
sveseli committed
    def getSrcUrl(self, filePath, dataDirectory):
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        if scheme:
            srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath)
        elif self.src is None:
            srcUrl = 'file://%s' % filePath
        else:
            srcUrl = '%s/%s' % (self.src,filePath)
sveseli's avatar
sveseli committed
        return self.replaceSpecialCharacters(srcUrl)
sveseli's avatar
sveseli committed
    
    def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, destDirectory=None):
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip()
        fileName = os.path.basename(filePath)
        if self.dest:
            destUrl = '%s/%s/%s' % (self.dest, dirName, fileName)
        else:
            if destDirectory:
                destUrl = 'sshftp://%s/%s/%s/%s/%s' % (storageHost, storageDirectory, destDirectory, dirName, fileName)
            else:
                destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
sveseli's avatar
sveseli committed
        return self.replaceSpecialCharacters(destUrl)
sveseli's avatar
sveseli committed

    def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
        if not self.pluginMustProcessFiles:
            return {}
        storageHost = uploadInfo['storageHost']
        storageDirectory = uploadInfo['storageDirectory']
        dataDirectory = uploadInfo['dataDirectory']
        destDirectory = uploadInfo.get('destDirectory')
sveseli's avatar
sveseli committed
        self.logger.debug('Upload info: %s' % uploadInfo)
        # Original data directory may contain host/port
        (scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
        self.logger.debug('Replacement dir path: %s' % replacementDirPath)
sveseli's avatar
sveseli committed
        self.logger.debug('Number of original files: %s' % len(filePathsDict))
sveseli's avatar
sveseli committed
        ftpUtility = SftpUtility(storageHost)
sveseli's avatar
sveseli committed
        targetDirectory = storageDirectory
        if destDirectory:
sveseli's avatar
sveseli committed
            targetDirectory = '%s/%s/' % (storageDirectory, destDirectory)
sveseli's avatar
sveseli committed
            self.logger.debug('Looking for existing files in %s' % targetDirectory)
            storageFilePathsDict = ftpUtility.getFiles(targetDirectory, {}, replacementDirPath)
            self.logger.debug('There are %s files in %s' % (len(storageFilePathsDict), targetDirectory))
sveseli's avatar
sveseli committed
            self.logger.warn('Could not find existing files in %s, assuming there are none (got error: %s)' % (targetDirectory,ex))
        
        # Remove file from plugin dict if we do not need to transfer it
        for (filePath,storageFilePathDict) in storageFilePathsDict.items():
            filePathDict = filePathsDict.get(filePath)
            if filePathDict is None:
                # We are not attempting to transfer this file
                # No need to change plugin file dict
                continue
sveseli's avatar
sveseli committed

            # Check size
            fSize = filePathDict.get('fileSize') 
            sfSize = storageFilePathDict.get('fileSize') 
sveseli's avatar
sveseli committed
            if fSize is None or sfSize is None or fSize != sfSize:
                # Sizes differ, need to transfer file
                continue

            # Sizes are the same, check modify time
            mTime = filePathDict.get('fileModificationTime') 
            smTime = storageFilePathDict.get('fileModificationTime') 
            if not mTime or not smTime or mTime > smTime:
                # Source time is later than storage time, need to transfer file
                continue

            # No need to transfer file
sveseli's avatar
sveseli committed
        self.logger.debug('Number of files that require processing: %s' % len(filePathsDict))

    def processFile(self, fileInfo):
        filePath = fileInfo.get('filePath')
        dataDirectory = fileInfo.get('dataDirectory')
        experimentFilePath = fileInfo.get('experimentFilePath')
        experimentName = fileInfo.get('experimentName')
        storageHost = fileInfo.get('storageHost')
        storageDirectory = fileInfo.get('storageDirectory')
        destDirectory = fileInfo.get('destDirectory')
        destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, destDirectory)
        srcUrl = self.getSrcUrl(filePath, dataDirectory)

        # Calculate checksum
sveseli's avatar
sveseli committed
        statUtility = self.statUtility
        if not statUtility:
sveseli's avatar
sveseli committed
            (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(srcUrl, defaultPort=self.DEFAULT_PORT)
sveseli's avatar
sveseli committed
            statUtility = FtpUtility(host, port)
sveseli's avatar
sveseli committed
            statUtility.statFile(filePath, fileInfo)
sveseli's avatar
sveseli committed
            statUtility.getMd5Sum(filePath, fileInfo)
        self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
sveseli's avatar
sveseli committed
        self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)

        # Get remote checksum
        if self.remoteMd5Sum:
            fileInfo2 = {}
            fileInfo2['experimentFilePath'] = experimentFilePath
            fileInfo2['experimentName'] = experimentName
            fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
            # If we have not done md5 locally, update file info
sveseli's avatar
sveseli committed
            if not self.localMd5Sum:
                fileInfo['md5Sum'] = fileMetadata.get('md5Sum')

            # Verify checksum
            if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
                self.logger.error('Checksum mismatch for file: %s' % filePath)
                raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
            self.logger.debug('Checksum test passed for file %s' % filePath)

        # Remove file
        #if self.deleteOriginal:
        #    self.logger.debug('Deleting file %s' % filePath)
        #    OsUtility.removeFile(srcUrl)
    def getSrcDirUrl(self, dataDirectory):
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        if scheme:
            srcUrl = '%s/' % (dataDirectory)
        elif self.src is None:
            srcUrl = 'file://%s/' % (dataDirectory)
        else:
            srcUrl = '%s/%s/' % (self.src,dataDirectory)
        return srcUrl
    
    def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, destDirectory=None):
        if self.dest:
sveseli's avatar
sveseli committed
            destUrl = '%s/' % (self.dest)
sveseli's avatar
sveseli committed
            destUrl = 'sshftp://%s/%s/' % (storageHost, storageDirectory)
        if destDirectory:
            destUrl = '%s/%s/' % (destUrl, destDirectory)
        return destUrl

    def processDirectory(self, directoryInfo):
        uploadInfo = directoryInfo.get('uploadInfo')
        dataDirectory = uploadInfo.get('dataDirectory')
        experimentName = uploadInfo.get('experimentName')
        storageHost = uploadInfo.get('storageHost')
        storageDirectory = uploadInfo.get('storageDirectory')
        destDirectory = uploadInfo.get('destDirectory')
        destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, destDirectory)
        srcUrl = self.getSrcDirUrl(dataDirectory)

        # Transfer directory
        self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
        self.start(src=srcUrl, dest=destUrl, command=self.directoryTransferCommand, templateInfo=uploadInfo)
sveseli's avatar
sveseli committed
#######################################################################
# Testing.
if __name__ == '__main__':
    ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
    ft.start()
    print 'StdOut: ', ft.getStdOut()
    print 'StdErr: ', ft.getStdErr()
    print 'Exit Status: ', ft.getExitStatus()