Skip to content
Snippets Groups Projects
Commit 422489bc authored by sveseli's avatar sveseli
Browse files

fixed alternative destination issues for file transfer plugins

parent 7c357db6
No related branches found
No related tags found
No related merge requests found
...@@ -45,7 +45,7 @@ class FileProcessingThread(threading.Thread): ...@@ -45,7 +45,7 @@ class FileProcessingThread(threading.Thread):
for processorKey in self.fileProcessorKeyList: for processorKey in self.fileProcessorKeyList:
processorNumber += 1 processorNumber += 1
processor = self.fileProcessorDict.get(processorKey) processor = self.fileProcessorDict.get(processorKey)
processorName = processor.__class__.__name__ processorName = '%s-%s' % (processor.__class__.__name__,processorNumber)
fileProcessedByDict = fileInfo.get('processedByDict', {}) fileProcessedByDict = fileInfo.get('processedByDict', {})
fileInfo['processedByDict'] = fileProcessedByDict fileInfo['processedByDict'] = fileProcessedByDict
......
...@@ -30,7 +30,7 @@ class FileTransferPlugin(FileProcessor): ...@@ -30,7 +30,7 @@ class FileTransferPlugin(FileProcessor):
storageDirectory = fileInfo.get('storageDirectory') storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory)
self.start(srcUrl, destUrl) self.start(srcUrl, destUrl, fileInfo)
def getSrcUrl(self, filePath, dataDirectory): def getSrcUrl(self, filePath, dataDirectory):
# Use relative path with respect to data directory as a source # Use relative path with respect to data directory as a source
...@@ -51,17 +51,17 @@ class FileTransferPlugin(FileProcessor): ...@@ -51,17 +51,17 @@ class FileTransferPlugin(FileProcessor):
def setDest(self, dest): def setDest(self, dest):
self.dest = dest self.dest = dest
def start(self, src=None, dest=None): def start(self, src=None, dest=None, fileInfo={}):
# Use preconfigured source if provided source is None # Use preconfigured source if provided source is None
fileSrc = src fileSrc = src
if src is None: if src is None:
fileSrc = self.src fileSrc = self.src
# Use provided destination only if preconfigured destination is None fileDest = dest
# Plugins may have desired destination preconfigured for all files if dest is None:
fileDest = self.dest fileDest = self.dest
if self.dest is None:
fileDest = dest
fileSrc = self.replaceTemplates(fileSrc, fileInfo)
fileDest = self.replaceTemplates(fileDest, fileInfo)
if not fileSrc or not fileDest: if not fileSrc or not fileDest:
raise InvalidRequest('Both source and destination must be non-empty strings.') raise InvalidRequest('Both source and destination must be non-empty strings.')
self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest)) self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest))
......
...@@ -37,7 +37,10 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -37,7 +37,10 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip() dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip()
fileName = os.path.basename(filePath) fileName = os.path.basename(filePath)
destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) if self.dest:
destUrl = '%s/%s/%s' % (self.dest, dirName, fileName)
else:
destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
return destUrl return destUrl
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
...@@ -97,10 +100,8 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -97,10 +100,8 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
ftpUtility.getMd5Sum(filePath, fileInfo) ftpUtility.getMd5Sum(filePath, fileInfo)
# Transfer file # Transfer file
srcUrl = self.replaceTemplates(srcUrl, fileInfo) self.logger.debug('Starting transfer: %s -> %s (fileInfo: %s)' % (srcUrl, destUrl, fileInfo))
destUrl = self.replaceTemplates(destUrl, fileInfo) self.start(srcUrl, destUrl, fileInfo)
self.logger.debug('Starting transfer: %s' % fileInfo)
self.start(srcUrl, destUrl)
# Get remote checksum # Get remote checksum
if self.remoteMd5Sum: if self.remoteMd5Sum:
......
...@@ -60,10 +60,8 @@ class RsyncFileTransferPlugin(FileTransferPlugin): ...@@ -60,10 +60,8 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
FileUtility.getMd5Sum(filePath, fileInfo) FileUtility.getMd5Sum(filePath, fileInfo)
# Transfer file # Transfer file
srcUrl = self.replaceTemplates(srcUrl, fileInfo)
destUrl = self.replaceTemplates(destUrl, fileInfo)
self.logger.debug('Starting transfer: %s' % fileInfo) self.logger.debug('Starting transfer: %s' % fileInfo)
self.start(srcUrl, destUrl) self.start(srcUrl, destUrl, fileInfo)
# Get remote checksum # Get remote checksum
if self.remoteMd5Sum: if self.remoteMd5Sum:
......
...@@ -35,7 +35,7 @@ class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): ...@@ -35,7 +35,7 @@ class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin):
self.logger.debug('File info before transfer: %s' % fileInfo) self.logger.debug('File info before transfer: %s' % fileInfo)
# Transfer file # Transfer file
self.start(srcUrl, destUrl) self.start(srcUrl, destUrl, fileInfo)
# Get remote checksum # Get remote checksum
fileInfo2 = {} fileInfo2 = {}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment