Skip to content
Snippets Groups Projects
Commit b1de9b3a authored by sveseli's avatar sveseli
Browse files

Merge branch 'master' into 'master'

add ability to include or exclude certain file extensions for DAQs and uploads

See merge request DM/dm!11
parents fd23a323 8d20b29a
No related branches found
No related tags found
No related merge requests found
Showing with 124 additions and 0 deletions
Release 3.2.0 (11/05/2019)
=============================
- Implemented ability to include or exclude file extensions for uploads and DAQs
- New API Options for starting DAQ/Upload
* includeFileExtensions
* excludeFileExtensions
- New CLI Options for starting DAQ/Upload
* include-extensions
* exclude-extensions
Release 3.1.0 (10/25/2019)
=============================
- Added new ESAF interfaces for support of extended data retrieval
......
......@@ -49,6 +49,8 @@ class DaqCli(ApsBeamlineCli):
self.addOptionToGroup(daqGroup, '', '--upload-dest-directory-on-exit', dest='uploadDestDirectoryOnExit', help='Destination directory relative to experiment root path for automatic upload after DAQ is stopped. Requires upload data directory to be specified.')
self.addOptionToGroup(daqGroup, '', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
self.addOptionToGroup(daqGroup, '', '--process-existing', dest='processExisting', action='store_true', default=False, help='Process existing source files.')
self.addOptionToGroup(daqGroup, '', '--include-extensions', dest='includeExtensions', help='Comma-separated list of file extensions that should be processed (e.g., "h5,hdf5"). If not specified, all files will be processed.')
self.addOptionToGroup(daqGroup, '', '--exclude-extensions', dest='excludeExtensions', help='Comma-separated list of file extensions that should not be processed (e.g., "txt,log"). If not specified, no files will be excluded from processing.')
self.addOptionToGroup(daqGroup, '', '--workflow-name', dest='workflowName', help='This option must be together with --workflow-owner in order to specify workflow to be used for processing files.')
self.addOptionToGroup(daqGroup, '', '--workflow-owner', dest='workflowOwner', help='This option must be together with --workflow-name in order to specify workflow to be used for processing files.')
self.addOptionToGroup(daqGroup, '', '--workflow-job-owner', dest='workflowJobOwner', help='User who owns processing job based on the specified workflow. By default, user who submits DAQ request will own the processing job.')
......@@ -80,6 +82,10 @@ class DaqCli(ApsBeamlineCli):
def updateDaqInfoFromOptions(self, daqInfo):
if self.options.processHidden:
daqInfo['processHiddenFiles'] = True
if self.options.includeExtensions:
daqInfo['includeFileExtensions'] = self.options.includeExtensions
if self.options.excludeExtensions:
daqInfo['excludeFileExtensions'] = self.options.excludeExtensions
if self.options.processExisting:
daqInfo['processExistingFiles'] = True
if self.options.skipPlugins:
......
......@@ -46,6 +46,8 @@ class UploadCli(ApsBeamlineCli):
self.addOptionToGroup(uploadGroup, '', '--dest-directory', dest='destDirectory', help='Destination directory relative to experiment root path.')
self.addOptionToGroup(uploadGroup, '', '--file-path', dest='experimentFilePath', help='This option may be used in case when a single file needs to be uploaded. It denotes experiment file path, and must be relative to the specified data directory.')
self.addOptionToGroup(uploadGroup, '', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
self.addOptionToGroup(uploadGroup, '', '--include-extensions', dest='includeExtensions', help='Comma-separated list of file extensions that should be processed (e.g., "h5,hdf5"). If not specified, all files will be processed.')
self.addOptionToGroup(uploadGroup, '', '--exclude-extensions', dest='excludeExtensions', help='Comma-separated list of file extensions that should not be processed (e.g., "txt,log"). If not specified, no files will be excluded from processing.')
self.addOptionToGroup(uploadGroup, '', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.')
self.addOptionToGroup(uploadGroup, '', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY))
self.addOptionToGroup(uploadGroup, '', '--workflow-name', dest='workflowName', help='This option must be together with --workflow-owner in order to specify workflow to be used for processing files.')
......@@ -86,6 +88,10 @@ class UploadCli(ApsBeamlineCli):
daqInfo['reprocessFiles'] = True
if self.options.processHidden:
daqInfo['processHiddenFiles'] = True
if self.options.includeExtensions:
daqInfo['includeFileExtensions'] = self.options.includeExtensions
if self.options.excludeExtensions:
daqInfo['excludeFileExtensions'] = self.options.excludeExtensions
if self.options.skipPlugins:
daqInfo['skipPlugins'] = self.options.skipPlugins
daqInfo['processingMode'] = self.options.processingMode
......
......@@ -20,6 +20,8 @@ DM_SERVICE_PROTOCOL_HTTPS = 'https'
DM_PROCESS_HIDDEN_FILES_KEY = 'processHiddenFiles'
DM_DEST_DIRECTORY_KEY = 'destDirectory'
DM_SKIP_PLUGINS_KEY = 'skipPlugins'
DM_INCLUDE_FILE_EXTENSIONS_KEY = 'includeFileExtensions'
DM_EXCLUDE_FILE_EXTENSIONS_KEY = 'excludeFileExtensions'
# DM Experiment File Constants
DM_MAX_FILE_RETRIEVAL_COUNT = 500000
......@@ -59,6 +61,8 @@ DM_SYSTEM_KEYS = [
DM_PROCESS_HIDDEN_FILES_KEY,
DM_DEST_DIRECTORY_KEY,
DM_SKIP_PLUGINS_KEY,
DM_INCLUDE_FILE_EXTENSIONS_KEY,
DM_EXCLUDE_FILE_EXTENSIONS_KEY,
DM_MAX_RUN_TIME_IN_HOURS_KEY,
DM_PROCESS_EXISTING_FILES_KEY,
......
......@@ -10,6 +10,7 @@ from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue
from dm.common.utility.singleton import Singleton
from dm.common.utility.regexUtility import RegexUtility
from .fileProcessingThread import FileProcessingThread
class FileProcessingManager(threading.Thread,Singleton):
......@@ -113,6 +114,42 @@ class FileProcessingManager(threading.Thread,Singleton):
self.logger.debug('Removed %s hidden files, %s candidates remaining' % (nRemoved, len(filePathsDict)))
return filePathsDict
# Check file extensions for processing
def checkFileExtensionsForProcessing(self, filePathsDict, uploadInfo):
includeFileExtensions = uploadInfo.get('includeFileExtensions', [])
includeRegex = None
if includeFileExtensions:
del uploadInfo['includeFileExtensions']
self.logger.debug('Will include the following extensions: %s' % (includeFileExtensions))
includeFileExtensions = includeFileExtensions.split(',')
includeRegex = RegexUtility.formRegexForListOfFileExtensions(includeFileExtensions)
excludeFileExtensions = uploadInfo.get('excludeFileExtensions', [])
excludeRegex = None
if excludeFileExtensions:
del uploadInfo['excludeFileExtensions']
self.logger.debug('Will exclude the following extensions: %s' % (excludeFileExtensions))
excludeFileExtensions = excludeFileExtensions.split(',')
excludeRegex = RegexUtility.formRegexForListOfFileExtensions(excludeFileExtensions)
if includeRegex or excludeRegex:
nRemoved = 0
for (filePath,filePathDict) in list(filePathsDict.items()):
fileName = os.path.basename(filePath)
if includeRegex:
# If file is to be included, skip exclude regex
if not includeRegex.findall(fileName):
del filePathsDict[filePath]
nRemoved += 1
else:
continue
if excludeRegex:
if excludeRegex.findall(fileName):
del filePathsDict[filePath]
nRemoved += 1
self.logger.debug('Removed %s files based on extension criteria, %s candidates remaining' % (nRemoved, len(filePathsDict)))
return filePathsDict
# Each plugin calculates list of files that need to be processed
# Final result is union of all plugins
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
......
#!/usr/bin/env python
import re
class RegexUtility:
@classmethod
def formRegexForListOfFileExtensions(cls, extList):
if not extList:
return None
p = '.*\.('
delimiter = ''
for ext in extList:
p = '%s%s%s' % (p, delimiter, ext)
delimiter = '|'
p = '%s)$' % (p)
return re.compile(p)
......@@ -52,6 +52,8 @@ class ExperimentDaqApi(DaqRestApi):
- *processHiddenFiles* (bool): if set to True, hidden files will be processed
- *processExistingFiles* (bool): if set to True, existing files will be processed
- *includeFileExtensions* (str): comma-separated list of file extensions that should be processed (e.g., "hdf5,h5"); if not provided, all files will be processed
- *excludeFileExtensions* (str): comma-separated list of file extensions that should be not processed (e.g., "txt,log,doc"); if not provided, all files will be processed
- *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
- *maxRunTimeInHours* (int): specifies maximum data acquisition run time in hours
- *uploadDataDirectoryOnExit* (str): specifies URL of the data directory that should be uploaded after data acquisition completes
......@@ -214,6 +216,8 @@ class ExperimentDaqApi(DaqRestApi):
- *experimentFilePath* (str): specifies path relative to the given data directory; if set, only this file will be processed
- *processHiddenFiles* (bool): if set to True, hidden files will be processed
- *includeFileExtensions* (str): comma-separated list of file extensions that should be processed (e.g., "hdf5,h5"); if not provided, all files will be processed
- *excludeFileExtensions* (str): comma-separated list of file extensions that should be not processed (e.g., "txt,log,doc"); if not provided, all files will be processed
- *reprocessFiles* (bool): if set to True, files will be uploaded regardless of whether or not they already exist in storage and have not changed
- *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
- *processingMode* (str): specifies processing mode, and can be set to "files" (service plugins process individual files one at a time) or "directory" (service plugins process entire directory at once; works faster for uploads of a large number of small files)
......
......@@ -16,6 +16,8 @@ class StartDaqCli(DaqWebServiceSessionCli):
self.addOption('', '--upload-data-directory-on-exit', dest='uploadDataDirectoryOnExit', help='Data directory that will be uploaded automatically after DAQ is stopped.')
self.addOption('', '--upload-dest-directory-on-exit', dest='uploadDestDirectoryOnExit', help='Destination directory relative to experiment root path for automatic upload after DAQ is stopped. Requires upload data directory to be specified.')
self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
self.addOption('', '--include-extensions', dest='includeExtensions', help='Comma-separated list of file extensions that should be processed (e.g., "h5,hdf5"). If not specified, all files will be processed.')
self.addOption('', '--exclude-extensions', dest='excludeExtensions', help='Comma-separated list of file extensions that should not be processed (e.g., "txt,log"). If not specified, no files will be excluded from processing.')
self.addOption('', '--process-existing', dest='processExisting', action='store_true', default=False, help='Process existing source files.')
self.addOption('', '--workflow-name', dest='workflowName', help='This option must be together with --workflow-owner in order to specify workflow to be used for processing files.')
self.addOption('', '--workflow-owner', dest='workflowOwner', help='This option must be together with --workflow-name in order to specify workflow to be used for processing files.')
......@@ -44,6 +46,10 @@ class StartDaqCli(DaqWebServiceSessionCli):
def updateDaqInfoFromOptions(self, daqInfo):
if self.options.processHidden:
daqInfo['processHiddenFiles'] = True
if self.options.includeExtensions:
daqInfo['includeFileExtensions'] = self.options.includeExtensions
if self.options.excludeExtensions:
daqInfo['excludeFileExtensions'] = self.options.excludeExtensions
if self.options.processExisting:
daqInfo['processExistingFiles'] = True
if self.options.skipPlugins:
......
......@@ -14,6 +14,8 @@ class UploadCli(DaqWebServiceSessionCli):
self.addOption('', '--file-path', dest='experimentFilePath', help='This option may be used in case when a single file needs to be uploaded. It denotes experiment file path, and must be relative to the specified data directory.')
self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.')
self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
self.addOption('', '--include-extensions', dest='includeExtensions', help='Comma-separated list of file extensions that should be processed (e.g., "h5,hdf5"). If not specified, all files will be processed.')
self.addOption('', '--exclude-extensions', dest='excludeExtensions', help='Comma-separated list of file extensions that should not be processed (e.g., "txt,log"). If not specified, no files will be excluded from processing.')
self.addOption('', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY))
self.addOption('', '--workflow-name', dest='workflowName', help='This option must be together with --workflow-owner in order to specify workflow to be used for processing files.')
self.addOption('', '--workflow-owner', dest='workflowOwner', help='This option must be together with --workflow-name in order to specify workflow to be used for processing files.')
......@@ -50,6 +52,10 @@ class UploadCli(DaqWebServiceSessionCli):
daqInfo['reprocessFiles'] = True
if self.options.processHidden:
daqInfo['processHiddenFiles'] = True
if self.options.includeExtensions:
daqInfo['includeFileExtensions'] = self.options.includeExtensions
if self.options.excludeExtensions:
daqInfo['excludeFileExtensions'] = self.options.excludeExtensions
if self.options.skipPlugins:
daqInfo['skipPlugins'] = self.options.skipPlugins
daqInfo['processingMode'] = self.options.processingMode
......
......@@ -101,6 +101,9 @@ class ExperimentSessionControllerImpl(DmObjectManager):
self.logger.debug('Checking %s existing DAQ file candidates' % len(filePathsDict))
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, daqInfo)
# Filter files based on extensions
filePathsDict = fileProcessingManager.checkFileExtensionsForProcessing(filePathsDict, daqInfo)
# Check which files need to be processed
filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, daqInfo)
......@@ -275,6 +278,9 @@ class ExperimentSessionControllerImpl(DmObjectManager):
self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
# Filter files based on extensions
filePathsDict = fileProcessingManager.checkFileExtensionsForProcessing(filePathsDict, daqInfo)
# Check which files need to be processed
filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
uploadInfo['nFiles'] = len(filePathsDict)
......
......@@ -3,6 +3,7 @@
import threading
import time
import os
import re
from watchdog.observers.polling import PollingObserver
from collections import OrderedDict
......@@ -14,6 +15,7 @@ from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.singleton import Singleton
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.utility.regexUtility import RegexUtility
from dm.common.processing.fileProcessingManager import FileProcessingManager
from .dmFileSystemEventHandler import DmFileSystemEventHandler
......@@ -110,6 +112,24 @@ class FileSystemObserver(threading.Thread,Singleton):
self.logger.debug('File path %s is hidden file, will not process it' % filePath)
return
# Check file extension
includeFileExtensions = daqInfo.get('includeFileExtensions', [])
excludeFileExtensions = uploadInfo.get('excludeFileExtensions', [])
if includeFileExtensions:
includeFileExtensions = includeFileExtensions.split(',')
includeRegex = RegexUtility.formRegexForListOfFileExtensions(includeFileExtensions)
if not includeRegex.findall(fileName):
self.logger.debug('File path %s will not be processed as it does not match include extension specification: %s' % (filePath, includeFileExtensions))
return
elif excludeFileExtensions:
excludeFileExtensions = excludeFileExtensions.split(',')
excludeRegex = RegexUtility.formRegexForListOfFileExtensions(excludeFileExtensions)
if excludeRegex.findall(fileName):
self.logger.debug('File path %s will not be processed as it matches exclude extension specification: %s' % (filePath, excludeFileExtensions))
return
# Process file
daqId = daqInfo['id']
observedFile = self.observedFileMap.get(filePath)
destDirectory = daqInfo.get('destDirectory')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment