Skip to content
Snippets Groups Projects
gridftpFileTransferPlugin.py 6.2 KiB
Newer Older
sveseli's avatar
sveseli committed
#!/usr/bin/env python

sveseli's avatar
sveseli committed
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
sveseli's avatar
sveseli committed
from dm.common.utility.sftpUtility import SftpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
sveseli's avatar
sveseli committed
class GridftpFileTransferPlugin(FileTransferPlugin):

    DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2'
    DEFAULT_PORT = 2811
sveseli's avatar
sveseli committed

    def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False):
        FileTransferPlugin.__init__(self, command, src, dest)
        self.dsFileApi = DsRestApiFactory.getFileRestApi()
        self.localMd5Sum = localMd5Sum
        self.remoteMd5Sum = remoteMd5Sum
        self.deleteOriginal = deleteOriginal
sveseli's avatar
sveseli committed

    def getSrcUrl(self, filePath, dataDirectory):
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        if scheme:
            srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath)
        elif self.src is None:
            srcUrl = 'file://%s' % filePath
        else:
            srcUrl = '%s/%s' % (self.src,filePath)
sveseli's avatar
sveseli committed
        return srcUrl
    
    def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip()
        fileName = os.path.basename(filePath)
        destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
sveseli's avatar
sveseli committed
        return destUrl

    def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
        storageHost = uploadInfo['storageHost']
        storageDirectory = uploadInfo['storageDirectory']
        dataDirectory = uploadInfo['dataDirectory']
sveseli's avatar
sveseli committed
        self.logger.debug('Upload info: %s', uploadInfo)
        #(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(storageUrl, defaultPort=self.DEFAULT_PORT)
sveseli's avatar
sveseli committed
        ftpUtility = SftpUtility(storageHost)
sveseli's avatar
sveseli committed
        self.logger.debug("STORAGE DIR: %s", storageDirectory)
        self.logger.debug("STORAGE HOST: %s", storageHost)
sveseli's avatar
sveseli committed
        storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, dataDirectory)
sveseli's avatar
sveseli committed
        self.logger.debug("STORAGE PATHS: %s", storageFilePathsDict)
        self.logger.debug("ORIG PATHS: %s", filePathsDict)
        pluginFilePathsDict = {}
        filePaths = filePathsDict.keys()
        for filePath in filePaths:
            filePathDict = filePathsDict.get(filePath)
            experimentFilePath = os.path.relpath(filePath, dataDirectory)
            storageFilePath = os.path.join(storageDirectory, experimentFilePath)
            storageFilePathDict = storageFilePathsDict.get(storageFilePath)
sveseli's avatar
sveseli committed
            self.logger.debug("SRC: %s", filePathDict)
            self.logger.debug("DEST: %s", storageFilePathDict)

            if not storageFilePathDict:
                # remote directory does not have the file
                pluginFilePathsDict[filePath] = filePathDict
            else:
sveseli's avatar
sveseli committed
                fSize = filePathDict.get('fileSize') 
                sfSize = storageFilePathDict.get('fileSize') 
                # check size
                if not fSize or not sfSize or fSize != sfSize:
                    pluginFilePathsDict[filePath] = filePathDict
                else:
                    # sizes are the same, check modify time
sveseli's avatar
sveseli committed
                    mTime = filePathDict.get('fileModificationTime') 
                    sfTime = storageFilePathDict.get('fileModificationTime') 
                    if not mTime or not sfTime or mTime > sfTime:
                        pluginFilePathsDict[filePath] = filePathDict

        self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict))
        return pluginFilePathsDict

    def processFile(self, fileInfo):
        filePath = fileInfo.get('filePath')
        dataDirectory = fileInfo.get('dataDirectory')
        experimentFilePath = fileInfo.get('experimentFilePath')
        experimentName = fileInfo.get('experimentName')
        storageHost = fileInfo.get('storageHost')
        storageDirectory = fileInfo.get('storageDirectory')

        destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
        srcUrl = self.getSrcUrl(filePath, dataDirectory)

        # Calculate checksum
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        ftpUtility = FtpUtility(host, port)
        ftpUtility.statFile(filePath, fileInfo)
            ftpUtility.getMd5Sum(filePath, fileInfo)

        # Transfer file
        self.logger.debug('Starting transfer: %s' % fileInfo)
        self.start(srcUrl, destUrl)

        # Get remote checksum
        if self.remoteMd5Sum:
            fileInfo2 = {}
            fileInfo2['experimentFilePath'] = experimentFilePath
            fileInfo2['experimentName'] = experimentName
            fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
            # If we have not done md5 locally, update file info
            if not self.md5Sum:
                fileInfo['md5Sum'] = fileMetadata.get('md5Sum')

            # Verify checksum
            if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
                self.logger.error('Checksum mismatch for file: %s' % filePath)
                raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
            self.logger.debug('Checksum test passed for file %s' % filePath)

        # Remove file
        #if self.deleteOriginal:
        #    self.logger.debug('Deleting file %s' % filePath)
        #    OsUtility.removeFile(srcUrl)
sveseli's avatar
sveseli committed
#######################################################################
# Testing.
if __name__ == '__main__':
    ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
    ft.start()
    print 'StdOut: ', ft.getStdOut()
    print 'StdErr: ', ft.getStdErr()
    print 'Exit Status: ', ft.getExitStatus()