Skip to content
Snippets Groups Projects
gridftpFileTransferPlugin.py 6.29 KiB
Newer Older
sveseli's avatar
sveseli committed
#!/usr/bin/env python

import copy
sveseli's avatar
sveseli committed
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
sveseli's avatar
sveseli committed
from dm.common.utility.sftpUtility import SftpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
sveseli's avatar
sveseli committed
class GridftpFileTransferPlugin(FileTransferPlugin):

    DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2'
    DEFAULT_PORT = 2811
sveseli's avatar
sveseli committed

    def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True,
    remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True):
        FileTransferPlugin.__init__(self, command, src, dest)
        self.dsFileApi = DsRestApiFactory.getFileRestApi()
        self.localMd5Sum = localMd5Sum
        self.remoteMd5Sum = remoteMd5Sum
        self.deleteOriginal = deleteOriginal
        self.pluginMustProcessFiles = pluginMustProcessFiles 
sveseli's avatar
sveseli committed

    def getSrcUrl(self, filePath, dataDirectory):
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        if scheme:
            srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath)
        elif self.src is None:
            srcUrl = 'file://%s' % filePath
        else:
            srcUrl = '%s/%s' % (self.src,filePath)
sveseli's avatar
sveseli committed
        return srcUrl
    
    def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip()
        fileName = os.path.basename(filePath)
        if self.dest:
            destUrl = '%s/%s/%s' % (self.dest, dirName, fileName)
        else:
            destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
sveseli's avatar
sveseli committed
        return destUrl

    def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
        if not self.pluginMustProcessFiles:
            return {}
        storageHost = uploadInfo['storageHost']
        storageDirectory = uploadInfo['storageDirectory']
        dataDirectory = uploadInfo['dataDirectory']
sveseli's avatar
sveseli committed
        self.logger.debug('Upload info: %s', uploadInfo)
        # Original data directory may contain host/port
        (scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
        self.logger.debug('Number of original files: %s', len(filePathsDict))
        self.logger.debug('Looking for existing files in %s', storageDirectory)
sveseli's avatar
sveseli committed
        ftpUtility = SftpUtility(storageHost)
        storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, replacementDirPath)
        self.logger.debug('There are %s files in %s', (len(storageFilePathsDict), storageDirectory))
        # Remove file from plugin dict if we do not need to transfer it
        for (filePath,storageFilePathDict) in storageFilePathsDict.items():
            filePathDict = filePathsDict.get(filePath)
            if filePathDict is None:
                # We are not attempting to transfer this file
                # No need to change plugin file dict
                continue
sveseli's avatar
sveseli committed

            # Check size
            fSize = filePathDict.get('fileSize') 
            sfSize = storageFilePathDict.get('fileSize') 
            if not fSize or not sfSize or fSize != sfSize:
                # Sizes differ, need to transfer file
                continue

            # Sizes are the same, check modify time
            mTime = filePathDict.get('fileModificationTime') 
            smTime = storageFilePathDict.get('fileModificationTime') 
            if not mTime or not smTime or mTime > smTime:
                # Source time is later than storage time, need to transfer file
                continue

            # No need to transfer file
        self.logger.debug('Number of files that require processing: %s', len(filePathsDict))
        return filePathsDict

    def processFile(self, fileInfo):
        filePath = fileInfo.get('filePath')
        dataDirectory = fileInfo.get('dataDirectory')
        experimentFilePath = fileInfo.get('experimentFilePath')
        experimentName = fileInfo.get('experimentName')
        storageHost = fileInfo.get('storageHost')
        storageDirectory = fileInfo.get('storageDirectory')

        destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
        srcUrl = self.getSrcUrl(filePath, dataDirectory)

        # Calculate checksum
        (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
        ftpUtility = FtpUtility(host, port)
        ftpUtility.statFile(filePath, fileInfo)
            ftpUtility.getMd5Sum(filePath, fileInfo)
        self.logger.debug('Starting transfer: %s -> %s (fileInfo: %s)' % (srcUrl, destUrl, fileInfo))
        self.start(srcUrl, destUrl, fileInfo)

        # Get remote checksum
        if self.remoteMd5Sum:
            fileInfo2 = {}
            fileInfo2['experimentFilePath'] = experimentFilePath
            fileInfo2['experimentName'] = experimentName
            fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
            # If we have not done md5 locally, update file info
sveseli's avatar
sveseli committed
            if not self.localMd5Sum:
                fileInfo['md5Sum'] = fileMetadata.get('md5Sum')

            # Verify checksum
            if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
                self.logger.error('Checksum mismatch for file: %s' % filePath)
                raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
            self.logger.debug('Checksum test passed for file %s' % filePath)

        # Remove file
        #if self.deleteOriginal:
        #    self.logger.debug('Deleting file %s' % filePath)
        #    OsUtility.removeFile(srcUrl)
sveseli's avatar
sveseli committed
#######################################################################
# Testing.
if __name__ == '__main__':
    ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
    ft.start()
    print 'StdOut: ', ft.getStdOut()
    print 'StdErr: ', ft.getStdErr()
    print 'Exit Status: ', ft.getExitStatus()