diff --git a/src/python/dm/common/objects/directoryUploadInfo.py b/src/python/dm/common/objects/directoryUploadInfo.py index b39e551d8b62a47bae12bc27c9ba91d6cc79c337..6c8f16a8ab2fdb990e3676d5ce07bf86a5e0a01d 100755 --- a/src/python/dm/common/objects/directoryUploadInfo.py +++ b/src/python/dm/common/objects/directoryUploadInfo.py @@ -33,7 +33,7 @@ class DirectoryUploadInfo(DmObject): if status in ['aborted', 'failed']: uploadStatus = status - if not processingEndTime: + if not processingEndTime and status != 'skipped': endTime = None break diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index 9c58a16aa0be52bccada0c796d44f407792664c3..76bcdd79d9b62b9187f7ffd2fac309c8dc149b83 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -36,19 +36,24 @@ class FileProcessingThread(threading.Thread): endProcessingTime = time.time() statusMonitor.fileProcessingCancelled(filePath, endProcessingTime) return - self.logger.debug('Starting processing file %s' % filePath) - fileInfo['startProcessingTime'] = time.time() + self.logger.debug('Starting to process file %s' % filePath) + startProcessingTime = fileInfo.get('startProcessingTime', time.time()) + fileInfo['startProcessingTime'] = startProcessingTime nProcessors = len(self.fileProcessorKeyList) processedByDict = fileInfo.get('processedByDict', {}) fileInfo['processedByDict'] = processedByDict + skipPlugins = fileInfo.get('skipPlugins', []) processorNumber = 0 for processorKey in self.fileProcessorKeyList: processorNumber += 1 processor = self.fileProcessorDict.get(processorKey) processorName = processor.name + if processorName in skipPlugins: + self.logger.debug('%s will be skipped by %s ' % (filePath, processorName)) + continue if processedByDict.has_key(processorName): - self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName)) + self.logger.debug('%s has already been processed by %s ' % (filePath, processorName)) continue self.logger.debug('%s is about to process file %s ' % (processorName, filePath)) diff --git a/src/python/dm/daq_web_service/cli/uploadCli.py b/src/python/dm/daq_web_service/cli/uploadCli.py index dae08440cc84b062fdacf857b8ce1a7f8158d8d2..f578309d336ff912763306e85b519b0c33bf5872 100755 --- a/src/python/dm/daq_web_service/cli/uploadCli.py +++ b/src/python/dm/daq_web_service/cli/uploadCli.py @@ -3,6 +3,7 @@ from daqWebServiceSessionCli import DaqWebServiceSessionCli from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi from dm.common.exceptions.invalidRequest import InvalidRequest +from dm.common.constants import dmProcessingMode class UploadCli(DaqWebServiceSessionCli): def __init__(self): @@ -11,24 +12,34 @@ class UploadCli(DaqWebServiceSessionCli): self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR) self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.') self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') + self.addOption('', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY)) + self.addOption('', '--skip-plugins', dest='skipPlugins', help='Comma-separated list of plugins which should not process the given directory.') def checkArgs(self): if self.options.experimentName is None: raise InvalidRequest('Experiment name must be provided.') if self.options.dataDirectory is None: raise InvalidRequest('Experiment data directory must be provided.') + if self.options.processingMode not in dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST: + raise InvalidRequest('Processing mode must be one of %s.' % dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST) + def updateDaqInfoFromOptions(self, daqInfo): if self.options.reprocess: daqInfo['reprocessFiles'] = True if self.options.processHidden: daqInfo['processHiddenFiles'] = True + if self.options.skipPlugins: + daqInfo['skipPlugins'] = self.options.skipPlugins + daqInfo['processingMode'] = self.options.processingMode def runCommand(self): self.parseArgs(usage=""" dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY [--reprocess] [--process-hidden] + [--processing-mode=PROCESSINGMODE] + [--skip-plugins=SKIPPLUGINS] [key1:value1, key2:value2, ...] Description: diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index c05d1f5d37cb9ec5420c683e55bd6a2820e6e242..edeeaaa54824272bf64caec6ffaa071309a05d96 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -94,6 +94,24 @@ class ExperimentSessionControllerImpl(DmObjectManager): daqInfo['dataDirectory'] = dataDirectory daqInfo['uploadId'] = uploadId + skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') + if len(skipPlugins): + skipPlugins = skipPlugins.split(',') + uploadInfo['skipPlugins'] = skipPlugins + else: + skipPlugins = [] + + # Check that there is at least one processor that can process files + processorList = [] + for processorKey in fileProcessingManager.fileProcessorKeyList: + processor = fileProcessingManager.fileProcessorDict.get(processorKey) + processorName = processor.name + if processorName not in skipPlugins: + processorList.append(processor) + + if not len(processorList): + raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory) + # Remove hidden files self.logger.debug('Checking %s processing candidates' % len(filePathsDict)) filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) @@ -123,6 +141,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileInfo['statusMonitor'] = uploadInfo + fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', []) try: if uploadInfo.get('status') != 'aborting': fileProcessingManager.processFile(fileInfo) @@ -180,15 +199,19 @@ class ExperimentSessionControllerImpl(DmObjectManager): daqInfo['dataDirectory'] = dataDirectory daqInfo['uploadId'] = uploadId - skipProcessing = DictUtility.getAndRemoveKey(daqInfo, 'skipProcessing', '') - skipProcessingList = skipProcessing.split(',') + skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') + if len(skipPlugins): + skipPlugins = skipPlugins.split(',') + uploadInfo['skipPlugins'] = skipPlugins + else: + skipPlugins = [] fileProcessingManager = FileProcessingManager.getInstance() processingInfo = {} uploadInfo['processingInfo'] = processingInfo for processorKey in fileProcessingManager.fileProcessorKeyList: processor = fileProcessingManager.fileProcessorDict.get(processorKey) processorName = processor.name - if processorName in skipProcessingList: + if processorName in skipPlugins: processingInfo[processorName] = {'status' : 'skipped'} else: self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory)) diff --git a/src/python/dm/daq_web_service/service/impl/uploadTracker.py b/src/python/dm/daq_web_service/service/impl/uploadTracker.py index 7c751eac6a388ca46ec32684353f27595592d836..8639f59d8246104e6adcf3dca5abe48cee4ca5fd 100755 --- a/src/python/dm/daq_web_service/service/impl/uploadTracker.py +++ b/src/python/dm/daq_web_service/service/impl/uploadTracker.py @@ -23,6 +23,7 @@ class UploadTracker(ObjectTracker): if uploadId: uploadInfo = self.get(uploadId) if uploadInfo is not None: + uploadInfo.updateStatus() if uploadInfo.get('status') == 'running': raise ObjectAlreadyExists('Upload id %s is already active for experiment %s in data directory %s.' % (uploadId, experimentName, dataDir)) del self.activeUploadDict[activeUploadKey]