Skip to content
Snippets Groups Projects
Commit 1a120f91 authored by sveseli's avatar sveseli
Browse files

merge fixes from 0.10

parents 62761001 17dafaa1
No related branches found
No related tags found
No related merge requests found
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
DM_PROCESSING_STATUS_ANY = 'any' DM_PROCESSING_STATUS_ANY = 'any'
DM_PROCESSING_STATUS_PENDING = 'pending' DM_PROCESSING_STATUS_PENDING = 'pending'
DM_PROCESSING_STATUS_RUNNING = 'running' DM_PROCESSING_STATUS_RUNNING = 'running'
DM_PROCESSING_STATUS_FINALIZING = 'finalizing'
DM_PROCESSING_STATUS_DONE = 'done' DM_PROCESSING_STATUS_DONE = 'done'
DM_PROCESSING_STATUS_FAILED = 'failed' DM_PROCESSING_STATUS_FAILED = 'failed'
DM_PROCESSING_STATUS_SKIPPED = 'skipped' DM_PROCESSING_STATUS_SKIPPED = 'skipped'
...@@ -15,6 +16,7 @@ DM_ALLOWED_PROCESSING_STATUS_LIST = [ ...@@ -15,6 +16,7 @@ DM_ALLOWED_PROCESSING_STATUS_LIST = [
DM_PROCESSING_STATUS_ANY, DM_PROCESSING_STATUS_ANY,
DM_PROCESSING_STATUS_PENDING, DM_PROCESSING_STATUS_PENDING,
DM_PROCESSING_STATUS_RUNNING, DM_PROCESSING_STATUS_RUNNING,
DM_PROCESSING_STATUS_FINALIZING,
DM_PROCESSING_STATUS_DONE, DM_PROCESSING_STATUS_DONE,
DM_PROCESSING_STATUS_FAILED, DM_PROCESSING_STATUS_FAILED,
DM_PROCESSING_STATUS_SKIPPED, DM_PROCESSING_STATUS_SKIPPED,
......
...@@ -76,10 +76,12 @@ class DaqInfo(DmObject): ...@@ -76,10 +76,12 @@ class DaqInfo(DmObject):
self['percentageProcessed'] = '%.2f' % percentageProcessed self['percentageProcessed'] = '%.2f' % percentageProcessed
self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
if self.get('endTime') and nCompletedFiles == nFiles: if self.get('endTime'):
daqStatus = 'done' daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FINALIZING
if nProcessingErrors: if nCompletedFiles == nFiles:
daqStatus = 'failed' daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
if nProcessingErrors:
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime') lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime')
lastFileProcessedTime = self.get('lastFileProcessedTime') lastFileProcessedTime = self.get('lastFileProcessedTime')
endTime = lastFileProcessedTime endTime = lastFileProcessedTime
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
import threading import threading
import time import time
from dm.common.constants import dmProcessingStatus
from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.loggingManager import LoggingManager
...@@ -31,7 +32,7 @@ class FileProcessingThread(threading.Thread): ...@@ -31,7 +32,7 @@ class FileProcessingThread(threading.Thread):
try: try:
statusMonitor = fileInfo.get('statusMonitor') statusMonitor = fileInfo.get('statusMonitor')
if statusMonitor and statusMonitor.get('status') == 'aborting': if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
self.logger.debug('File %s processing is cancelled' % (filePath)) self.logger.debug('File %s processing is cancelled' % (filePath))
endProcessingTime = time.time() endProcessingTime = time.time()
statusMonitor.fileProcessingCancelled(filePath, endProcessingTime) statusMonitor.fileProcessingCancelled(filePath, endProcessingTime)
...@@ -87,7 +88,7 @@ class FileProcessingThread(threading.Thread): ...@@ -87,7 +88,7 @@ class FileProcessingThread(threading.Thread):
return return
else: else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) self.logger.debug('%s may retry processing file %s after at least %s seconds' % (processorName, filePath, retryWaitPeriod))
self.fileProcessingQueue.push(fileInfo, retryWaitPeriod) self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
# Do not process this file further until # Do not process this file further until
# this plugin is done # this plugin is done
......
...@@ -34,15 +34,15 @@ class FileTransferPlugin(FileProcessor): ...@@ -34,15 +34,15 @@ class FileTransferPlugin(FileProcessor):
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo) self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)
def getSrcUrl(self, filePath, dataDirectory): def getSrcUrl(self, filePath, dataDirectory):
# Use relative path with respect to data directory as a source return filePath
srcUrl = os.path.relpath(filePath, dataDirectory)
return srcUrl
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory): def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
# Use relative path with respect to data directory as a source
srcUrl = os.path.relpath(filePath, dataDirectory)
if self.dest: if self.dest:
destUrl = '%s' % (self.dest) destUrl = '%s/%s' % (self.dest, srcUrl)
else: else:
destUrl = '%s:%s' % (storageHost, storageDirectory) destUrl = '%s:%s/%s' % (storageHost, storageDirectory, srcUrl)
return destUrl return destUrl
def getSrcDirUrl(self, dataDirectory): def getSrcDirUrl(self, dataDirectory):
......
...@@ -11,9 +11,9 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory ...@@ -11,9 +11,9 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class RsyncFileTransferPlugin(FileTransferPlugin): class RsyncFileTransferPlugin(FileTransferPlugin):
DEFAULT_COMMAND = 'rsync -arvlPR' DEFAULT_COMMAND = 'rsync -arvlPR --'
DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP' DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP --'
DRY_RUN_COMMAND = 'rsync -arvlP' DRY_RUN_COMMAND = 'rsync -arvlP --dry-run --'
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn) FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
...@@ -29,7 +29,7 @@ class RsyncFileTransferPlugin(FileTransferPlugin): ...@@ -29,7 +29,7 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
storageDirectory = uploadInfo['storageDirectory'] storageDirectory = uploadInfo['storageDirectory']
storageHost = uploadInfo['storageHost'] storageHost = uploadInfo['storageHost']
dataDirectory = uploadInfo['dataDirectory'] dataDirectory = uploadInfo['dataDirectory']
dryRunCommand = '%s --dry-run %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory) dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory)
subprocess = DmSubprocess.getSubprocess(dryRunCommand) subprocess = DmSubprocess.getSubprocess(dryRunCommand)
subprocess.run() subprocess.run()
lines = subprocess.getStdOut().split('\n') lines = subprocess.getStdOut().split('\n')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment