diff --git a/doc/RELEASE_NOTES.txt b/doc/RELEASE_NOTES.txt index 48c9dcfafb4f9e4157a39dbf2a0e7dbc00617761..755082cd549c9c32980fbaede5cbb893f2cd188f 100644 --- a/doc/RELEASE_NOTES.txt +++ b/doc/RELEASE_NOTES.txt @@ -1,3 +1,13 @@ +Release 3.2.0 (11/05/2019) +============================= +- Implemented ability to include or exclude file extensions for uploads and DAQs + - New API Options for starting DAQ/Upload + * includeFileExtensions + * excludeFileExtensions + - New CLI Options for starting DAQ/Upload + * include-extensions + * exclude-extensions + Release 3.1.0 (10/25/2019) ============================= - Added new ESAF interfaces for support of extended data retrieval diff --git a/src/python/dm/aps_beamline_tools/cli/daqCli.py b/src/python/dm/aps_beamline_tools/cli/daqCli.py index 216e270ff2d9690b4bd8dc15c4b652dd49f16c6d..46e4861c91c24a43aa2d86360c1041ec021ee73c 100755 --- a/src/python/dm/aps_beamline_tools/cli/daqCli.py +++ b/src/python/dm/aps_beamline_tools/cli/daqCli.py @@ -49,6 +49,8 @@ class DaqCli(ApsBeamlineCli): self.addOptionToGroup(daqGroup, '', '--upload-dest-directory-on-exit', dest='uploadDestDirectoryOnExit', help='Destination directory relative to experiment root path for automatic upload after DAQ is stopped. Requires upload data directory to be specified.') self.addOptionToGroup(daqGroup, '', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') self.addOptionToGroup(daqGroup, '', '--process-existing', dest='processExisting', action='store_true', default=False, help='Process existing source files.') + self.addOptionToGroup(daqGroup, '', '--include-extensions', dest='includeExtensions', help='Comma-separated list of file extensions that should be processed (e.g., "h5,hdf5"). If not specified, all files will be processed.') + self.addOptionToGroup(daqGroup, '', '--exclude-extensions', dest='excludeExtensions', help='Comma-separated list of file extensions that should not be processed (e.g., "txt,log"). If not specified, no files will be excluded from processing.') self.addOptionToGroup(daqGroup, '', '--workflow-name', dest='workflowName', help='This option must be together with --workflow-owner in order to specify workflow to be used for processing files.') self.addOptionToGroup(daqGroup, '', '--workflow-owner', dest='workflowOwner', help='This option must be together with --workflow-name in order to specify workflow to be used for processing files.') self.addOptionToGroup(daqGroup, '', '--workflow-job-owner', dest='workflowJobOwner', help='User who owns processing job based on the specified workflow. By default, user who submits DAQ request will own the processing job.') @@ -80,6 +82,10 @@ class DaqCli(ApsBeamlineCli): def updateDaqInfoFromOptions(self, daqInfo): if self.options.processHidden: daqInfo['processHiddenFiles'] = True + if self.options.includeExtensions: + daqInfo['includeFileExtensions'] = self.options.includeExtensions + if self.options.excludeExtensions: + daqInfo['excludeFileExtensions'] = self.options.excludeExtensions if self.options.processExisting: daqInfo['processExistingFiles'] = True if self.options.skipPlugins: diff --git a/src/python/dm/aps_beamline_tools/cli/uploadCli.py b/src/python/dm/aps_beamline_tools/cli/uploadCli.py index 57514737a6290a6c7d731ce8bc56b846356b2afe..feeae5f3f212147b35a19d6d8fbfbeea0050bd1e 100755 --- a/src/python/dm/aps_beamline_tools/cli/uploadCli.py +++ b/src/python/dm/aps_beamline_tools/cli/uploadCli.py @@ -46,6 +46,8 @@ class UploadCli(ApsBeamlineCli): self.addOptionToGroup(uploadGroup, '', '--dest-directory', dest='destDirectory', help='Destination directory relative to experiment root path.') self.addOptionToGroup(uploadGroup, '', '--file-path', dest='experimentFilePath', help='This option may be used in case when a single file needs to be uploaded. It denotes experiment file path, and must be relative to the specified data directory.') self.addOptionToGroup(uploadGroup, '', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') + self.addOptionToGroup(uploadGroup, '', '--include-extensions', dest='includeExtensions', help='Comma-separated list of file extensions that should be processed (e.g., "h5,hdf5"). If not specified, all files will be processed.') + self.addOptionToGroup(uploadGroup, '', '--exclude-extensions', dest='excludeExtensions', help='Comma-separated list of file extensions that should not be processed (e.g., "txt,log"). If not specified, no files will be excluded from processing.') self.addOptionToGroup(uploadGroup, '', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.') self.addOptionToGroup(uploadGroup, '', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY)) self.addOptionToGroup(uploadGroup, '', '--workflow-name', dest='workflowName', help='This option must be together with --workflow-owner in order to specify workflow to be used for processing files.') @@ -86,6 +88,10 @@ class UploadCli(ApsBeamlineCli): daqInfo['reprocessFiles'] = True if self.options.processHidden: daqInfo['processHiddenFiles'] = True + if self.options.includeExtensions: + daqInfo['includeFileExtensions'] = self.options.includeExtensions + if self.options.excludeExtensions: + daqInfo['excludeFileExtensions'] = self.options.excludeExtensions if self.options.skipPlugins: daqInfo['skipPlugins'] = self.options.skipPlugins daqInfo['processingMode'] = self.options.processingMode diff --git a/src/python/dm/common/constants/dmServiceConstants.py b/src/python/dm/common/constants/dmServiceConstants.py index 4b94cf54b5ba5de6897993c60ca53344c07fbfa1..32573f490d3e80d12fcad3067e40e399006fcd5c 100755 --- a/src/python/dm/common/constants/dmServiceConstants.py +++ b/src/python/dm/common/constants/dmServiceConstants.py @@ -20,6 +20,8 @@ DM_SERVICE_PROTOCOL_HTTPS = 'https' DM_PROCESS_HIDDEN_FILES_KEY = 'processHiddenFiles' DM_DEST_DIRECTORY_KEY = 'destDirectory' DM_SKIP_PLUGINS_KEY = 'skipPlugins' +DM_INCLUDE_FILE_EXTENSIONS_KEY = 'includeFileExtensions' +DM_EXCLUDE_FILE_EXTENSIONS_KEY = 'excludeFileExtensions' # DM Experiment File Constants DM_MAX_FILE_RETRIEVAL_COUNT = 500000 @@ -59,6 +61,8 @@ DM_SYSTEM_KEYS = [ DM_PROCESS_HIDDEN_FILES_KEY, DM_DEST_DIRECTORY_KEY, DM_SKIP_PLUGINS_KEY, + DM_INCLUDE_FILE_EXTENSIONS_KEY, + DM_EXCLUDE_FILE_EXTENSIONS_KEY, DM_MAX_RUN_TIME_IN_HOURS_KEY, DM_PROCESS_EXISTING_FILES_KEY, diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index de4e0d6ee85eee7652220384c929c3a9d0671685..739629e20bcfa2812f90a6e34f07bc25e8a885c6 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -10,6 +10,7 @@ from dm.common.utility.objectUtility import ObjectUtility from dm.common.utility.valueUtility import ValueUtility from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue from dm.common.utility.singleton import Singleton +from dm.common.utility.regexUtility import RegexUtility from .fileProcessingThread import FileProcessingThread class FileProcessingManager(threading.Thread,Singleton): @@ -113,6 +114,42 @@ class FileProcessingManager(threading.Thread,Singleton): self.logger.debug('Removed %s hidden files, %s candidates remaining' % (nRemoved, len(filePathsDict))) return filePathsDict + # Check file extensions for processing + def checkFileExtensionsForProcessing(self, filePathsDict, uploadInfo): + includeFileExtensions = uploadInfo.get('includeFileExtensions', []) + includeRegex = None + if includeFileExtensions: + del uploadInfo['includeFileExtensions'] + self.logger.debug('Will include the following extensions: %s' % (includeFileExtensions)) + includeFileExtensions = includeFileExtensions.split(',') + includeRegex = RegexUtility.formRegexForListOfFileExtensions(includeFileExtensions) + + excludeFileExtensions = uploadInfo.get('excludeFileExtensions', []) + excludeRegex = None + if excludeFileExtensions: + del uploadInfo['excludeFileExtensions'] + self.logger.debug('Will exclude the following extensions: %s' % (excludeFileExtensions)) + excludeFileExtensions = excludeFileExtensions.split(',') + excludeRegex = RegexUtility.formRegexForListOfFileExtensions(excludeFileExtensions) + + if includeRegex or excludeRegex: + nRemoved = 0 + for (filePath,filePathDict) in list(filePathsDict.items()): + fileName = os.path.basename(filePath) + if includeRegex: + # If file is to be included, skip exclude regex + if not includeRegex.findall(fileName): + del filePathsDict[filePath] + nRemoved += 1 + else: + continue + if excludeRegex: + if excludeRegex.findall(fileName): + del filePathsDict[filePath] + nRemoved += 1 + self.logger.debug('Removed %s files based on extension criteria, %s candidates remaining' % (nRemoved, len(filePathsDict))) + return filePathsDict + # Each plugin calculates list of files that need to be processed # Final result is union of all plugins def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): diff --git a/src/python/dm/common/utility/regexUtility.py b/src/python/dm/common/utility/regexUtility.py new file mode 100644 index 0000000000000000000000000000000000000000..7e907c57cff9d2e57026f51eb17a1760cae139f2 --- /dev/null +++ b/src/python/dm/common/utility/regexUtility.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python + +import re + +class RegexUtility: + + @classmethod + def formRegexForListOfFileExtensions(cls, extList): + if not extList: + return None + p = '.*\.(' + delimiter = '' + for ext in extList: + p = '%s%s%s' % (p, delimiter, ext) + delimiter = '|' + p = '%s)$' % (p) + return re.compile(p) + + diff --git a/src/python/dm/daq_web_service/api/experimentDaqApi.py b/src/python/dm/daq_web_service/api/experimentDaqApi.py index 315929629646763c7fa25e4322e9bb61706a3f87..3cbf7f2cd24bd1578f956730c62123e8a3960275 100755 --- a/src/python/dm/daq_web_service/api/experimentDaqApi.py +++ b/src/python/dm/daq_web_service/api/experimentDaqApi.py @@ -52,6 +52,8 @@ class ExperimentDaqApi(DaqRestApi): - *processHiddenFiles* (bool): if set to True, hidden files will be processed - *processExistingFiles* (bool): if set to True, existing files will be processed + - *includeFileExtensions* (str): comma-separated list of file extensions that should be processed (e.g., "hdf5,h5"); if not provided, all files will be processed + - *excludeFileExtensions* (str): comma-separated list of file extensions that should be not processed (e.g., "txt,log,doc"); if not provided, all files will be processed - *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored - *maxRunTimeInHours* (int): specifies maximum data acquisition run time in hours - *uploadDataDirectoryOnExit* (str): specifies URL of the data directory that should be uploaded after data acquisition completes @@ -214,6 +216,8 @@ class ExperimentDaqApi(DaqRestApi): - *experimentFilePath* (str): specifies path relative to the given data directory; if set, only this file will be processed - *processHiddenFiles* (bool): if set to True, hidden files will be processed + - *includeFileExtensions* (str): comma-separated list of file extensions that should be processed (e.g., "hdf5,h5"); if not provided, all files will be processed + - *excludeFileExtensions* (str): comma-separated list of file extensions that should be not processed (e.g., "txt,log,doc"); if not provided, all files will be processed - *reprocessFiles* (bool): if set to True, files will be uploaded regardless of whether or not they already exist in storage and have not changed - *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored - *processingMode* (str): specifies processing mode, and can be set to "files" (service plugins process individual files one at a time) or "directory" (service plugins process entire directory at once; works faster for uploads of a large number of small files) diff --git a/src/python/dm/daq_web_service/cli/startDaqCli.py b/src/python/dm/daq_web_service/cli/startDaqCli.py index 77e3ce966a98d505797a7488e25a9b214f61777c..04e8dbfce90bdd08d3d87955edcfe6bab7b7288b 100755 --- a/src/python/dm/daq_web_service/cli/startDaqCli.py +++ b/src/python/dm/daq_web_service/cli/startDaqCli.py @@ -16,6 +16,8 @@ class StartDaqCli(DaqWebServiceSessionCli): self.addOption('', '--upload-data-directory-on-exit', dest='uploadDataDirectoryOnExit', help='Data directory that will be uploaded automatically after DAQ is stopped.') self.addOption('', '--upload-dest-directory-on-exit', dest='uploadDestDirectoryOnExit', help='Destination directory relative to experiment root path for automatic upload after DAQ is stopped. Requires upload data directory to be specified.') self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') + self.addOption('', '--include-extensions', dest='includeExtensions', help='Comma-separated list of file extensions that should be processed (e.g., "h5,hdf5"). If not specified, all files will be processed.') + self.addOption('', '--exclude-extensions', dest='excludeExtensions', help='Comma-separated list of file extensions that should not be processed (e.g., "txt,log"). If not specified, no files will be excluded from processing.') self.addOption('', '--process-existing', dest='processExisting', action='store_true', default=False, help='Process existing source files.') self.addOption('', '--workflow-name', dest='workflowName', help='This option must be together with --workflow-owner in order to specify workflow to be used for processing files.') self.addOption('', '--workflow-owner', dest='workflowOwner', help='This option must be together with --workflow-name in order to specify workflow to be used for processing files.') @@ -44,6 +46,10 @@ class StartDaqCli(DaqWebServiceSessionCli): def updateDaqInfoFromOptions(self, daqInfo): if self.options.processHidden: daqInfo['processHiddenFiles'] = True + if self.options.includeExtensions: + daqInfo['includeFileExtensions'] = self.options.includeExtensions + if self.options.excludeExtensions: + daqInfo['excludeFileExtensions'] = self.options.excludeExtensions if self.options.processExisting: daqInfo['processExistingFiles'] = True if self.options.skipPlugins: diff --git a/src/python/dm/daq_web_service/cli/uploadCli.py b/src/python/dm/daq_web_service/cli/uploadCli.py index 5584c3a019c69b020a897e38b3af1804fd65668e..9bc8be17dcb43e7f65fcf782f4f9953052a86de7 100755 --- a/src/python/dm/daq_web_service/cli/uploadCli.py +++ b/src/python/dm/daq_web_service/cli/uploadCli.py @@ -14,6 +14,8 @@ class UploadCli(DaqWebServiceSessionCli): self.addOption('', '--file-path', dest='experimentFilePath', help='This option may be used in case when a single file needs to be uploaded. It denotes experiment file path, and must be relative to the specified data directory.') self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.') self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') + self.addOption('', '--include-extensions', dest='includeExtensions', help='Comma-separated list of file extensions that should be processed (e.g., "h5,hdf5"). If not specified, all files will be processed.') + self.addOption('', '--exclude-extensions', dest='excludeExtensions', help='Comma-separated list of file extensions that should not be processed (e.g., "txt,log"). If not specified, no files will be excluded from processing.') self.addOption('', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY)) self.addOption('', '--workflow-name', dest='workflowName', help='This option must be together with --workflow-owner in order to specify workflow to be used for processing files.') self.addOption('', '--workflow-owner', dest='workflowOwner', help='This option must be together with --workflow-name in order to specify workflow to be used for processing files.') @@ -50,6 +52,10 @@ class UploadCli(DaqWebServiceSessionCli): daqInfo['reprocessFiles'] = True if self.options.processHidden: daqInfo['processHiddenFiles'] = True + if self.options.includeExtensions: + daqInfo['includeFileExtensions'] = self.options.includeExtensions + if self.options.excludeExtensions: + daqInfo['excludeFileExtensions'] = self.options.excludeExtensions if self.options.skipPlugins: daqInfo['skipPlugins'] = self.options.skipPlugins daqInfo['processingMode'] = self.options.processingMode diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index 154a7ebf2764a3edf5c23f11959787a283a0fe93..db158b80d992f22107ba115e7716bf6e0ba085fe 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -101,6 +101,9 @@ class ExperimentSessionControllerImpl(DmObjectManager): self.logger.debug('Checking %s existing DAQ file candidates' % len(filePathsDict)) filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, daqInfo) + # Filter files based on extensions + filePathsDict = fileProcessingManager.checkFileExtensionsForProcessing(filePathsDict, daqInfo) + # Check which files need to be processed filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, daqInfo) @@ -275,6 +278,9 @@ class ExperimentSessionControllerImpl(DmObjectManager): self.logger.debug('Checking %s processing candidates' % len(filePathsDict)) filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) + # Filter files based on extensions + filePathsDict = fileProcessingManager.checkFileExtensionsForProcessing(filePathsDict, daqInfo) + # Check which files need to be processed filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo) uploadInfo['nFiles'] = len(filePathsDict) diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index b3ae06c8f64d939619396094e8af5054ce556a6e..08877e43983c1c36843e42f309b3eac69cfc88d6 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -3,6 +3,7 @@ import threading import time import os +import re from watchdog.observers.polling import PollingObserver from collections import OrderedDict @@ -14,6 +15,7 @@ from dm.common.utility.valueUtility import ValueUtility from dm.common.utility.objectUtility import ObjectUtility from dm.common.utility.singleton import Singleton from dm.common.utility.threadingUtility import ThreadingUtility +from dm.common.utility.regexUtility import RegexUtility from dm.common.processing.fileProcessingManager import FileProcessingManager from .dmFileSystemEventHandler import DmFileSystemEventHandler @@ -110,6 +112,24 @@ class FileSystemObserver(threading.Thread,Singleton): self.logger.debug('File path %s is hidden file, will not process it' % filePath) return + # Check file extension + includeFileExtensions = daqInfo.get('includeFileExtensions', []) + excludeFileExtensions = uploadInfo.get('excludeFileExtensions', []) + if includeFileExtensions: + includeFileExtensions = includeFileExtensions.split(',') + includeRegex = RegexUtility.formRegexForListOfFileExtensions(includeFileExtensions) + if not includeRegex.findall(fileName): + self.logger.debug('File path %s will not be processed as it does not match include extension specification: %s' % (filePath, includeFileExtensions)) + return + + elif excludeFileExtensions: + excludeFileExtensions = excludeFileExtensions.split(',') + excludeRegex = RegexUtility.formRegexForListOfFileExtensions(excludeFileExtensions) + if excludeRegex.findall(fileName): + self.logger.debug('File path %s will not be processed as it matches exclude extension specification: %s' % (filePath, excludeFileExtensions)) + return + + # Process file daqId = daqInfo['id'] observedFile = self.observedFileMap.get(filePath) destDirectory = daqInfo.get('destDirectory')