Forked from
DM / dm-docs
261 commits behind, 664 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
fileTransferPlugin.py 4.44 KiB
#!/usr/bin/env python
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.common.utility.osUtility import OsUtility
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.invalidRequest import InvalidRequest
from fileProcessor import FileProcessor
class FileTransferPlugin(FileProcessor):
def __init__(self, command, src=None, dest=None, dependsOn=[]):
FileProcessor.__init__(self, dependsOn)
self.src = src
self.dest = dest
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
if command is None or not len(command):
raise InvalidArgument('File transfer command must be non-empty string.')
self.command = command
self.subprocess = None
def checkUploadFilesForProcessing(self, filePaths, uploadInfo):
pass
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)
def getSrcUrl(self, filePath, dataDirectory):
srcUrl = '%s/./%s' % (dataDirectory, os.path.relpath(filePath, dataDirectory))
return srcUrl
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
if self.dest:
destUrl = '%s/' % (self.dest)
else:
destUrl = '%s:%s/' % (storageHost, storageDirectory)
return destUrl
def getSrcDirUrl(self, dataDirectory):
return '%s/' % dataDirectory
def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory):
if self.dest:
destUrl = '%s/' % (self.dest)
else:
destUrl = '%s:%s/' % (storageHost, storageDirectory)
return destUrl
def getFullCommand(self, src, dest, command=None):
if command:
return '%s "%s" "%s"' % (command, src, dest)
else:
return '%s "%s" "%s"' % (self.command, src, dest)
def setSrc(self, src):
self.src = src
def setDest(self, dest):
self.dest = dest
def start(self, src=None, dest=None, command=None, templateInfo={}, cwd=None):
# Use preconfigured source if provided source is None
fileSrc = src
if src is None:
fileSrc = self.src
fileDest = dest
if dest is None:
fileDest = self.dest
# If destination is local, attempt to create it
if self.dest is not None and self.dest.find(':') < 0:
destDir = self.replaceTemplates(self.dest, templateInfo)
try:
OsUtility.createDir(destDir)
except Exception, ex:
self.logger.warn('Transfer may fail due to failure to create destination directory %s: %s' % (destDir, str(ex)))
fileSrc = self.replaceTemplates(fileSrc, templateInfo)
fileDest = self.replaceTemplates(fileDest, templateInfo)
if not fileSrc or not fileDest:
raise InvalidRequest('Both source and destination must be non-empty strings.')
self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest, command), cwd=cwd)
return self.subprocess.run()
def wait(self):
if self.subprocess:
return self.subprocess.wait()
return None
def poll(self):
if self.subprocess:
return self.subprocess.poll()
return None
def getStdOut(self):
if self.subprocess:
return self.subprocess.getStdOut()
return None
def getStdErr(self):
if self.subprocess:
return self.subprocess.getStdErr()
return None
def getExitStatus(self):
if self.subprocess:
return self.subprocess.getExitStatus()
return None
def reset(self):
self.subprocess = None
#######################################################################
# Testing.
if __name__ == '__main__':
ft = FileTransfer('rsync -arv', '/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()