diff --git a/src/python/dm/common/constants/dmProcessingStatus.py b/src/python/dm/common/constants/dmProcessingStatus.py index 523bd70eee72161981db13dd919b83eed5e2123e..3cc2825741ee4ac3ef890bda6b6656cf30867c81 100755 --- a/src/python/dm/common/constants/dmProcessingStatus.py +++ b/src/python/dm/common/constants/dmProcessingStatus.py @@ -5,6 +5,7 @@ DM_PROCESSING_STATUS_ANY = 'any' DM_PROCESSING_STATUS_PENDING = 'pending' DM_PROCESSING_STATUS_RUNNING = 'running' +DM_PROCESSING_STATUS_FINALIZING = 'finalizing' DM_PROCESSING_STATUS_DONE = 'done' DM_PROCESSING_STATUS_FAILED = 'failed' DM_PROCESSING_STATUS_SKIPPED = 'skipped' @@ -15,6 +16,7 @@ DM_ALLOWED_PROCESSING_STATUS_LIST = [ DM_PROCESSING_STATUS_ANY, DM_PROCESSING_STATUS_PENDING, DM_PROCESSING_STATUS_RUNNING, + DM_PROCESSING_STATUS_FINALIZING, DM_PROCESSING_STATUS_DONE, DM_PROCESSING_STATUS_FAILED, DM_PROCESSING_STATUS_SKIPPED, diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index 2368786103a89957ce43aa937e779967c18abe68..468f4ea7eac00bb402a18407a3cdc3f423274d45 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -76,10 +76,12 @@ class DaqInfo(DmObject): self['percentageProcessed'] = '%.2f' % percentageProcessed self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors - if self.get('endTime') and nCompletedFiles == nFiles: - daqStatus = 'done' - if nProcessingErrors: - daqStatus = 'failed' + if self.get('endTime'): + daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FINALIZING + if nCompletedFiles == nFiles: + daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_DONE + if nProcessingErrors: + daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime') lastFileProcessedTime = self.get('lastFileProcessedTime') endTime = lastFileProcessedTime diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index f83762d32fb35737d7e7051b543b0734ae8a6f40..f535b56fafc795645219726c86d236db36821778 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -2,6 +2,7 @@ import threading import time +from dm.common.constants import dmProcessingStatus from dm.common.utility.loggingManager import LoggingManager @@ -31,7 +32,7 @@ class FileProcessingThread(threading.Thread): try: statusMonitor = fileInfo.get('statusMonitor') - if statusMonitor and statusMonitor.get('status') == 'aborting': + if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING: self.logger.debug('File %s processing is cancelled' % (filePath)) endProcessingTime = time.time() statusMonitor.fileProcessingCancelled(filePath, endProcessingTime) @@ -87,7 +88,7 @@ class FileProcessingThread(threading.Thread): return else: retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() - self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) + self.logger.debug('%s may retry processing file %s after at least %s seconds' % (processorName, filePath, retryWaitPeriod)) self.fileProcessingQueue.push(fileInfo, retryWaitPeriod) # Do not process this file further until # this plugin is done diff --git a/src/python/dm/common/processing/plugins/fileTransferPlugin.py b/src/python/dm/common/processing/plugins/fileTransferPlugin.py index 25158a41fc9b58f44908c6e73b058646677cd87c..2c2228ac20383ffa7429d7ded98183860d44aa0e 100755 --- a/src/python/dm/common/processing/plugins/fileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/fileTransferPlugin.py @@ -34,15 +34,15 @@ class FileTransferPlugin(FileProcessor): self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo) def getSrcUrl(self, filePath, dataDirectory): - # Use relative path with respect to data directory as a source - srcUrl = os.path.relpath(filePath, dataDirectory) - return srcUrl + return filePath def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory): + # Use relative path with respect to data directory as a source + srcUrl = os.path.relpath(filePath, dataDirectory) if self.dest: - destUrl = '%s' % (self.dest) + destUrl = '%s/%s' % (self.dest, srcUrl) else: - destUrl = '%s:%s' % (storageHost, storageDirectory) + destUrl = '%s:%s/%s' % (storageHost, storageDirectory, srcUrl) return destUrl def getSrcDirUrl(self, dataDirectory): diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index 4c0bac6c123ee335b944567f4ac401144defbb9d..0b44c3db33f4981db5e357d5f9ae75a4cb68151c 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -11,9 +11,9 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class RsyncFileTransferPlugin(FileTransferPlugin): - DEFAULT_COMMAND = 'rsync -arvlPR' - DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP' - DRY_RUN_COMMAND = 'rsync -arvlP' + DEFAULT_COMMAND = 'rsync -arvlPR --' + DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP --' + DRY_RUN_COMMAND = 'rsync -arvlP --dry-run --' def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn) @@ -29,7 +29,7 @@ class RsyncFileTransferPlugin(FileTransferPlugin): storageDirectory = uploadInfo['storageDirectory'] storageHost = uploadInfo['storageHost'] dataDirectory = uploadInfo['dataDirectory'] - dryRunCommand = '%s --dry-run %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory) + dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory) subprocess = DmSubprocess.getSubprocess(dryRunCommand) subprocess.run() lines = subprocess.getStdOut().split('\n')