diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index 9a50ae9e108fb639249a302a5e61ea8f1ddd173d..f3320c20fa61ba0b0197cdd650323cb8dd94871c 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -45,7 +45,7 @@ class FileProcessingThread(threading.Thread): for processorKey in self.fileProcessorKeyList: processorNumber += 1 processor = self.fileProcessorDict.get(processorKey) - processorName = processor.__class__.__name__ + processorName = '%s-%s' % (processor.__class__.__name__,processorNumber) fileProcessedByDict = fileInfo.get('processedByDict', {}) fileInfo['processedByDict'] = fileProcessedByDict diff --git a/src/python/dm/common/processing/plugins/fileTransferPlugin.py b/src/python/dm/common/processing/plugins/fileTransferPlugin.py index ea5e579a138b339a081c57811c1685f56f0047e6..e1a9b54924d19d081e8b29343c064d55f4dab510 100755 --- a/src/python/dm/common/processing/plugins/fileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/fileTransferPlugin.py @@ -30,7 +30,7 @@ class FileTransferPlugin(FileProcessor): storageDirectory = fileInfo.get('storageDirectory') destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) - self.start(srcUrl, destUrl) + self.start(srcUrl, destUrl, fileInfo) def getSrcUrl(self, filePath, dataDirectory): # Use relative path with respect to data directory as a source @@ -51,17 +51,17 @@ class FileTransferPlugin(FileProcessor): def setDest(self, dest): self.dest = dest - def start(self, src=None, dest=None): + def start(self, src=None, dest=None, fileInfo={}): # Use preconfigured source if provided source is None fileSrc = src if src is None: fileSrc = self.src - # Use provided destination only if preconfigured destination is None - # Plugins may have desired destination preconfigured for all files - fileDest = self.dest - if self.dest is None: - fileDest = dest + fileDest = dest + if dest is None: + fileDest = self.dest + fileSrc = self.replaceTemplates(fileSrc, fileInfo) + fileDest = self.replaceTemplates(fileDest, fileInfo) if not fileSrc or not fileDest: raise InvalidRequest('Both source and destination must be non-empty strings.') self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest)) diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 929dc77ba3803eb212a5b0d8aa5ed80ae31e63be..7993a59bd67d9e28df391bddc48358d8a15a9ba2 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -37,7 +37,10 @@ class GridftpFileTransferPlugin(FileTransferPlugin): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip() fileName = os.path.basename(filePath) - destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) + if self.dest: + destUrl = '%s/%s/%s' % (self.dest, dirName, fileName) + else: + destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) return destUrl def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): @@ -97,10 +100,8 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ftpUtility.getMd5Sum(filePath, fileInfo) # Transfer file - srcUrl = self.replaceTemplates(srcUrl, fileInfo) - destUrl = self.replaceTemplates(destUrl, fileInfo) - self.logger.debug('Starting transfer: %s' % fileInfo) - self.start(srcUrl, destUrl) + self.logger.debug('Starting transfer: %s -> %s (fileInfo: %s)' % (srcUrl, destUrl, fileInfo)) + self.start(srcUrl, destUrl, fileInfo) # Get remote checksum if self.remoteMd5Sum: diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index 38ec8ec1d307eecb4b271d8016111d8f3e23c04b..659e9849db79902eebf873ddb07f69528b237e78 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -60,10 +60,8 @@ class RsyncFileTransferPlugin(FileTransferPlugin): FileUtility.getMd5Sum(filePath, fileInfo) # Transfer file - srcUrl = self.replaceTemplates(srcUrl, fileInfo) - destUrl = self.replaceTemplates(destUrl, fileInfo) self.logger.debug('Starting transfer: %s' % fileInfo) - self.start(srcUrl, destUrl) + self.start(srcUrl, destUrl, fileInfo) # Get remote checksum if self.remoteMd5Sum: diff --git a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py index f821f5e742ce21b4b41aa390d00d7e844b59e6f5..55cfef1b72aa2725e4d06bf1d2a60621498083e9 100755 --- a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py @@ -35,7 +35,7 @@ class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): self.logger.debug('File info before transfer: %s' % fileInfo) # Transfer file - self.start(srcUrl, destUrl) + self.start(srcUrl, destUrl, fileInfo) # Get remote checksum fileInfo2 = {}