Skip to content
Snippets Groups Projects
Forked from DM / dm-docs
261 commits behind, 558 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gridftpFileTransferPlugin.py 5.83 KiB
#!/usr/bin/env python

import os
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
from dm.common.utility.sftpUtility import SftpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory

class GridftpFileTransferPlugin(FileTransferPlugin):

    DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2'
    DEFAULT_PORT = 2811


    def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False):
        FileTransferPlugin.__init__(self, command, src, dest)
        self.dsFileApi = DsRestApiFactory.getFileRestApi()
        self.localMd5Sum = localMd5Sum
        self.remoteMd5Sum = remoteMd5Sum
        self.deleteOriginal = deleteOriginal

    def getSrcUrl(self, filePath, dataDirectory):
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        if scheme:
            srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath)
        elif self.src is None:
            srcUrl = 'file://%s' % filePath
        else:
            srcUrl = '%s/%s' % (self.src,filePath)
        return srcUrl
    
    def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip()
        fileName = os.path.basename(filePath)
        destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
        return destUrl

    def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
        storageHost = uploadInfo['storageHost']
        storageDirectory = uploadInfo['storageDirectory']
        dataDirectory = uploadInfo['dataDirectory']
        self.logger.debug('Upload info: %s', uploadInfo)
        #(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(storageUrl, defaultPort=self.DEFAULT_PORT)
        ftpUtility = SftpUtility(storageHost)
        storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {})
        pluginFilePathsDict = {}
        filePaths = filePathsDict.keys()
        for filePath in filePaths:
            filePathDict = filePathsDict.get(filePath)
            experimentFilePath = os.path.relpath(filePath, dataDirectory)
            storageFilePath = os.path.join(storageDirectory, experimentFilePath)
            storageFilePathDict = storageFilePathsDict.get(storageFilePath)
            if not storageFilePathDict:
                # remote directory does not have the file
                pluginFilePathsDict[filePath] = filePathDict
            else:
                fSize = filePathDict.get('fileSize') 
                sfSize = storageFilePathDict.get('fileSize') 
                # check size
                if not fSize or not sfSize or fSize != sfSize:
                    pluginFilePathsDict[filePath] = filePathDict
                else:
                    # sizes are the same, check modify time
                    mTime = filePathDict.get('fileModificationTime') 
                    sfTime = storageFilePathDict.get('fileModificationTime') 
                    if not mTime or not sfTime or mTime > sfTime:
                        pluginFilePathsDict[filePath] = filePathDict

        self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict))
        return pluginFilePathsDict

    def processFile(self, fileInfo):
        filePath = fileInfo.get('filePath')
        dataDirectory = fileInfo.get('dataDirectory')
        experimentFilePath = fileInfo.get('experimentFilePath')
        experimentName = fileInfo.get('experimentName')
        storageHost = fileInfo.get('storageHost')
        storageDirectory = fileInfo.get('storageDirectory')

        destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
        srcUrl = self.getSrcUrl(filePath, dataDirectory)

        # Calculate checksum
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        ftpUtility = FtpUtility(host, port)
        ftpUtility.statFile(filePath, fileInfo)
        if self.localMd5Sum:
            ftpUtility.getMd5Sum(filePath, fileInfo)

        # Transfer file
        self.logger.debug('Starting transfer: %s' % fileInfo)
        self.start(srcUrl, destUrl)

        # Get remote checksum
        if self.remoteMd5Sum:
            fileInfo2 = {}
            fileInfo2['experimentFilePath'] = experimentFilePath
            fileInfo2['experimentName'] = experimentName
            fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
            # If we have not done md5 locally, update file info
            if not self.md5Sum:
                fileInfo['md5Sum'] = fileMetadata.get('md5Sum')

            # Verify checksum
            if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
                self.logger.error('Checksum mismatch for file: %s' % filePath)
                raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
            self.logger.debug('Checksum test passed for file %s' % filePath)

        # Remove file
        #if self.deleteOriginal:
        #    self.logger.debug('Deleting file %s' % filePath)
        #    OsUtility.removeFile(srcUrl)

#######################################################################
# Testing.
if __name__ == '__main__':
    ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
    ft.start()
    print 'StdOut: ', ft.getStdOut()
    print 'StdErr: ', ft.getStdErr()
    print 'Exit Status: ', ft.getExitStatus()