Skip to content
Snippets Groups Projects
Forked from DM / dm-docs
261 commits behind, 691 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
fileTransferPlugin.py 4.44 KiB
#!/usr/bin/env python

import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.common.utility.osUtility import OsUtility
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.invalidRequest import InvalidRequest
from fileProcessor import FileProcessor

class FileTransferPlugin(FileProcessor):

    def __init__(self, command, src=None, dest=None, dependsOn=[]):
        FileProcessor.__init__(self, dependsOn)
        self.src = src
        self.dest = dest
        self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
        if command is None or not len(command):
            raise InvalidArgument('File transfer command must be non-empty string.')
        self.command = command
        self.subprocess = None

    def checkUploadFilesForProcessing(self, filePaths, uploadInfo):
        pass

    def processFile(self, fileInfo):
        filePath = fileInfo.get('filePath')
        dataDirectory = fileInfo.get('dataDirectory')

        storageHost = fileInfo.get('storageHost')
        storageDirectory = fileInfo.get('storageDirectory')
        destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
        srcUrl = self.getSrcUrl(filePath, dataDirectory)
        self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)

    def getSrcUrl(self, filePath, dataDirectory):
        srcUrl = '%s/./%s' % (dataDirectory, os.path.relpath(filePath, dataDirectory))
        return srcUrl
    
    def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
        if self.dest:
            destUrl = '%s/' % (self.dest)
        else:
            destUrl = '%s:%s/' % (storageHost, storageDirectory)
        return destUrl
    
    def getSrcDirUrl(self, dataDirectory):
        return '%s/' % dataDirectory
    
    def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory):
        if self.dest:
            destUrl = '%s/' % (self.dest)
        else:
            destUrl = '%s:%s/' % (storageHost, storageDirectory)
        return destUrl
    
    def getFullCommand(self, src, dest, command=None):
        if command:
            return '%s "%s" "%s"' % (command, src, dest)
        else:
            return '%s "%s" "%s"' % (self.command, src, dest)

    def setSrc(self, src):
        self.src = src

    def setDest(self, dest):
        self.dest = dest

    def start(self, src=None, dest=None, command=None, templateInfo={}, cwd=None):
        # Use preconfigured source if provided source is None
        fileSrc = src
        if src is None:
            fileSrc = self.src
        fileDest = dest
        if dest is None:
            fileDest = self.dest

        # If destination is local, attempt to create it
        if self.dest is not None and self.dest.find(':') < 0:
            destDir = self.replaceTemplates(self.dest, templateInfo)
            try:
                OsUtility.createDir(destDir)
            except Exception, ex:
                self.logger.warn('Transfer may fail due to failure to create destination directory %s: %s' % (destDir, str(ex))) 

        fileSrc = self.replaceTemplates(fileSrc, templateInfo)
        fileDest = self.replaceTemplates(fileDest, templateInfo)
        if not fileSrc or not fileDest:
            raise InvalidRequest('Both source and destination must be non-empty strings.')
        self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest, command), cwd=cwd)
        return self.subprocess.run()

    def wait(self):
        if self.subprocess:
            return self.subprocess.wait()
        return None

    def poll(self):
        if self.subprocess:
            return self.subprocess.poll()
        return None

    def getStdOut(self):
        if self.subprocess:
            return self.subprocess.getStdOut()
        return None


    def getStdErr(self):
        if self.subprocess:
            return self.subprocess.getStdErr()
        return None

    def getExitStatus(self):
        if self.subprocess:
            return self.subprocess.getExitStatus()
        return None

    def reset(self):
        self.subprocess = None

#######################################################################
# Testing.
if __name__ == '__main__':
    ft = FileTransfer('rsync -arv', '/tmp/xyz', '/tmp/xyz2')
    ft.start()
    print 'StdOut: ', ft.getStdOut()
    print 'StdErr: ', ft.getStdErr()
    print 'Exit Status: ', ft.getExitStatus()