Skip to content
Snippets Groups Projects
Commit e370f2d4 authored by sveseli's avatar sveseli
Browse files

fixes for upload status updates

parent 687a24cc
No related branches found
No related tags found
No related merge requests found
...@@ -33,7 +33,7 @@ class DirectoryUploadInfo(DmObject): ...@@ -33,7 +33,7 @@ class DirectoryUploadInfo(DmObject):
if status in ['aborted', 'failed']: if status in ['aborted', 'failed']:
uploadStatus = status uploadStatus = status
if not processingEndTime: if not processingEndTime and status != 'skipped':
endTime = None endTime = None
break break
......
...@@ -36,19 +36,24 @@ class FileProcessingThread(threading.Thread): ...@@ -36,19 +36,24 @@ class FileProcessingThread(threading.Thread):
endProcessingTime = time.time() endProcessingTime = time.time()
statusMonitor.fileProcessingCancelled(filePath, endProcessingTime) statusMonitor.fileProcessingCancelled(filePath, endProcessingTime)
return return
self.logger.debug('Starting processing file %s' % filePath) self.logger.debug('Starting to process file %s' % filePath)
fileInfo['startProcessingTime'] = time.time() startProcessingTime = fileInfo.get('startProcessingTime', time.time())
fileInfo['startProcessingTime'] = startProcessingTime
nProcessors = len(self.fileProcessorKeyList) nProcessors = len(self.fileProcessorKeyList)
processedByDict = fileInfo.get('processedByDict', {}) processedByDict = fileInfo.get('processedByDict', {})
fileInfo['processedByDict'] = processedByDict fileInfo['processedByDict'] = processedByDict
skipPlugins = fileInfo.get('skipPlugins', [])
processorNumber = 0 processorNumber = 0
for processorKey in self.fileProcessorKeyList: for processorKey in self.fileProcessorKeyList:
processorNumber += 1 processorNumber += 1
processor = self.fileProcessorDict.get(processorKey) processor = self.fileProcessorDict.get(processorKey)
processorName = processor.name processorName = processor.name
if processorName in skipPlugins:
self.logger.debug('%s will be skipped by %s ' % (filePath, processorName))
continue
if processedByDict.has_key(processorName): if processedByDict.has_key(processorName):
self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName)) self.logger.debug('%s has already been processed by %s ' % (filePath, processorName))
continue continue
self.logger.debug('%s is about to process file %s ' % (processorName, filePath)) self.logger.debug('%s is about to process file %s ' % (processorName, filePath))
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
from daqWebServiceSessionCli import DaqWebServiceSessionCli from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.constants import dmProcessingMode
class UploadCli(DaqWebServiceSessionCli): class UploadCli(DaqWebServiceSessionCli):
def __init__(self): def __init__(self):
...@@ -11,24 +12,34 @@ class UploadCli(DaqWebServiceSessionCli): ...@@ -11,24 +12,34 @@ class UploadCli(DaqWebServiceSessionCli):
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR) self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR)
self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.') self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.')
self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
self.addOption('', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY))
self.addOption('', '--skip-plugins', dest='skipPlugins', help='Comma-separated list of plugins which should not process the given directory.')
def checkArgs(self): def checkArgs(self):
if self.options.experimentName is None: if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.') raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None: if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.') raise InvalidRequest('Experiment data directory must be provided.')
if self.options.processingMode not in dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST:
raise InvalidRequest('Processing mode must be one of %s.' % dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST)
def updateDaqInfoFromOptions(self, daqInfo): def updateDaqInfoFromOptions(self, daqInfo):
if self.options.reprocess: if self.options.reprocess:
daqInfo['reprocessFiles'] = True daqInfo['reprocessFiles'] = True
if self.options.processHidden: if self.options.processHidden:
daqInfo['processHiddenFiles'] = True daqInfo['processHiddenFiles'] = True
if self.options.skipPlugins:
daqInfo['skipPlugins'] = self.options.skipPlugins
daqInfo['processingMode'] = self.options.processingMode
def runCommand(self): def runCommand(self):
self.parseArgs(usage=""" self.parseArgs(usage="""
dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[--reprocess] [--reprocess]
[--process-hidden] [--process-hidden]
[--processing-mode=PROCESSINGMODE]
[--skip-plugins=SKIPPLUGINS]
[key1:value1, key2:value2, ...] [key1:value1, key2:value2, ...]
Description: Description:
......
...@@ -94,6 +94,24 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -94,6 +94,24 @@ class ExperimentSessionControllerImpl(DmObjectManager):
daqInfo['dataDirectory'] = dataDirectory daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId daqInfo['uploadId'] = uploadId
skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '')
if len(skipPlugins):
skipPlugins = skipPlugins.split(',')
uploadInfo['skipPlugins'] = skipPlugins
else:
skipPlugins = []
# Check that there is at least one processor that can process files
processorList = []
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
if processorName not in skipPlugins:
processorList.append(processor)
if not len(processorList):
raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory)
# Remove hidden files # Remove hidden files
self.logger.debug('Checking %s processing candidates' % len(filePathsDict)) self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
...@@ -123,6 +141,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -123,6 +141,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
fileInfo['daqInfo'] = daqInfo fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId fileInfo['uploadId'] = uploadId
fileInfo['statusMonitor'] = uploadInfo fileInfo['statusMonitor'] = uploadInfo
fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', [])
try: try:
if uploadInfo.get('status') != 'aborting': if uploadInfo.get('status') != 'aborting':
fileProcessingManager.processFile(fileInfo) fileProcessingManager.processFile(fileInfo)
...@@ -180,15 +199,19 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -180,15 +199,19 @@ class ExperimentSessionControllerImpl(DmObjectManager):
daqInfo['dataDirectory'] = dataDirectory daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId daqInfo['uploadId'] = uploadId
skipProcessing = DictUtility.getAndRemoveKey(daqInfo, 'skipProcessing', '') skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '')
skipProcessingList = skipProcessing.split(',') if len(skipPlugins):
skipPlugins = skipPlugins.split(',')
uploadInfo['skipPlugins'] = skipPlugins
else:
skipPlugins = []
fileProcessingManager = FileProcessingManager.getInstance() fileProcessingManager = FileProcessingManager.getInstance()
processingInfo = {} processingInfo = {}
uploadInfo['processingInfo'] = processingInfo uploadInfo['processingInfo'] = processingInfo
for processorKey in fileProcessingManager.fileProcessorKeyList: for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey) processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name processorName = processor.name
if processorName in skipProcessingList: if processorName in skipPlugins:
processingInfo[processorName] = {'status' : 'skipped'} processingInfo[processorName] = {'status' : 'skipped'}
else: else:
self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory)) self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory))
......
...@@ -23,6 +23,7 @@ class UploadTracker(ObjectTracker): ...@@ -23,6 +23,7 @@ class UploadTracker(ObjectTracker):
if uploadId: if uploadId:
uploadInfo = self.get(uploadId) uploadInfo = self.get(uploadId)
if uploadInfo is not None: if uploadInfo is not None:
uploadInfo.updateStatus()
if uploadInfo.get('status') == 'running': if uploadInfo.get('status') == 'running':
raise ObjectAlreadyExists('Upload id %s is already active for experiment %s in data directory %s.' % (uploadId, experimentName, dataDir)) raise ObjectAlreadyExists('Upload id %s is already active for experiment %s in data directory %s.' % (uploadId, experimentName, dataDir))
del self.activeUploadDict[activeUploadKey] del self.activeUploadDict[activeUploadKey]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment