#!/usr/bin/env python import os from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.dmSubprocess import DmSubprocess from dm.common.exceptions.invalidArgument import InvalidArgument from dm.common.exceptions.invalidRequest import InvalidRequest from fileProcessor import FileProcessor class FileTransferPlugin(FileProcessor): def __init__(self, command, src=None, dest=None): FileProcessor.__init__(self) self.src = src self.dest = dest self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) if command is None or not len(command): raise InvalidArgument('File transfer command must be non-empty string.') self.command = command self.subprocess = None def processFile(self, fileInfo): filePath = fileInfo.get('filePath') dataDirectory = fileInfo.get('dataDirectory') experiment = fileInfo.get('experiment') storageHost = experiment.get('storageHost') storageDirectory = experiment.get('storageDirectory') destUrl = self.getDestUrl(storageHost, storageDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) self.start(srcUrl, destUrl) def getSrcUrl(self, filePath, dataDirectory): # Use relative path with respect to data directory as a source os.chdir(dataDirectory) srcUrl = os.path.relpath(filePath, dataDirectory) return srcUrl def getDestUrl(self, storageHost, storageDirectory): destUrl = '%s:%s' % (storageHost, storageDirectory) return destUrl def getFullCommand(self, src, dest): return '%s %s %s' % (self.command, src, dest) def setSrc(self, src): self.src = src def setDest(self, dest): self.dest = dest def start(self, src=None, dest=None): # Use preconfigured source if provided source is None fileSrc = src if src is None: fileSrc = self.src # Use provided destination only if preconfigured destination is None # Plugins may have desired destination preconfigured for all files fileDest = self.dest if self.dest is None: fileDest = dest if not fileSrc or not fileDest: raise InvalidRequest('Both source and destination must be non-empty strings.') self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest)) return self.subprocess.run() def wait(self): if self.subprocess: return self.subprocess.wait() return None def poll(self): if self.subprocess: return self.subprocess.poll() return None def getStdOut(self): if self.subprocess: return self.subprocess.getStdOut() return None def getStdErr(self): if self.subprocess: return self.subprocess.getStdErr() return None def getExitStatus(self): if self.subprocess: return self.subprocess.getExitStatus() return None def reset(self): self.subprocess = None ####################################################################### # Testing. if __name__ == '__main__': ft = FileTransfer('rsync -arv', '/tmp/xyz', '/tmp/xyz2') ft.start() print 'StdOut: ', ft.getStdOut() print 'StdErr: ', ft.getStdErr() print 'Exit Status: ', ft.getExitStatus()