diff --git a/src/python/dm/__init__.py b/src/python/dm/__init__.py index f1b10e83f9ba4b27bbb24396bc46ee49c75cea7f..3e7336477bd032807e0d036f7b1519735eccccfa 100644 --- a/src/python/dm/__init__.py +++ b/src/python/dm/__init__.py @@ -1 +1 @@ -__version__ = "1.1 (2017.03.11)" +__version__ = "1.1 (2017.03.13)" diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index 9a16c6e1887d7b5ea3b7247d63842c6371ce3dda..8ca4c999f5988a9d6fe6feb6440c57dfa65d3004 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -94,12 +94,13 @@ class FileProcessingThread(threading.Thread): statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime) statusMonitor.updateStatus() self.logger.debug('No more %s retries left for file %s, remaining plugins will not process it' % (processorName, filePath)) + return else: if statusMonitor: statusMonitor.fileProcessingSkipped(processorName, filePath, processingError, endProcessingTime) statusMonitor.updateStatus() self.logger.debug('No more %s retries left for file %s, skipping it' % (processorName, filePath)) - return + continue else: retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() self.logger.debug('%s may retry processing file %s after at least %s seconds' % (processorName, filePath, retryWaitPeriod)) diff --git a/src/python/dm/common/processing/plugins/fileProcessor.py b/src/python/dm/common/processing/plugins/fileProcessor.py index 0a4f22d0ab30dfc4881e45e4add63adebbf5e1ea..2df3edebbcbf0f76ab51137c160006451a843a46 100755 --- a/src/python/dm/common/processing/plugins/fileProcessor.py +++ b/src/python/dm/common/processing/plugins/fileProcessor.py @@ -61,6 +61,6 @@ class FileProcessor: def setSkipOnFailure(self, skipOnFailure): self.configDict['skipOnFailure'] = skipOnFailure - def getSkipOnFailure(self, skipOnFailure): + def getSkipOnFailure(self): return self.configDict.get('skipOnFailure', False) diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index f480597c75e7af24f379251a548542219a68ca37..5dc6835267c65171f6a8a1383f5bb41ac0fc1f25 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -27,7 +27,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): self.deleteOriginal = deleteOriginal self.directoryTransferCommand = directoryTransferCommand self.pluginMustProcessFiles = pluginMustProcessFiles - if nRetries: + if nRetries is not None: self.setNumberOfRetries(nRetries) self.setSkipOnFailure(skipOnFailure) diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index 1b20993682b2e9e44c69664d00ac2ce7ea5c38ce..210cc79af9a1c92e858f2fbf68dbb8544dd728e6 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -5,6 +5,7 @@ import os from fileTransferPlugin import FileTransferPlugin from dm.common.utility.osUtility import OsUtility from dm.common.utility.fileUtility import FileUtility +from dm.common.utility.ftpUtility import FtpUtility from dm.common.exceptions.fileProcessingError import FileProcessingError from dm.common.utility.dmSubprocess import DmSubprocess from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory @@ -24,7 +25,7 @@ class RsyncFileTransferPlugin(FileTransferPlugin): self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal self.pluginMustProcessFiles = pluginMustProcessFiles - if nRetries: + if nRetries is not None: self.setNumberOfRetries(nRetries) self.setSkipOnFailure(skipOnFailure) @@ -76,8 +77,9 @@ class RsyncFileTransferPlugin(FileTransferPlugin): self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) command = self.command if destDirectory: - targetDirectory = '%s/%s' % (storageDirectory,destDirectory) + (scheme, host, port, targetDirectory) = FtpUtility.parseUrl(destUrl) command = self.RSYNC_WITH_MKDIR_COMMAND % targetDirectory + command = self.replaceTemplates(command, fileInfo) self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=fileInfo, cwd=dataDirectory) # Get remote checksum @@ -116,8 +118,9 @@ class RsyncFileTransferPlugin(FileTransferPlugin): self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) command = self.DIRECTORY_TRANSFER_COMMAND if destDirectory: - targetDirectory = '%s/%s' % (storageDirectory,destDirectory) + (scheme, host, port, targetDirectory) = FtpUtility.parseUrl(destUrl) command = self.DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND % targetDirectory + command = self.replaceTemplates(command, uploadInfo) self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=uploadInfo, cwd=dataDirectory) #######################################################################