From a06e25902e3496a392885a2a425ca4a90f753178 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Fri, 3 Mar 2017 23:41:21 +0000 Subject: [PATCH] renaming several command line options and DAQ keys --- src/python/dm/aps_bss/cli/getProposalCli.py | 6 +- src/python/dm/aps_bss/cli/listProposalsCli.py | 6 +- src/python/dm/aps_bss/cli/listRunsCli.py | 6 +- src/python/dm/common/cli/dmCli.py | 7 ++- src/python/dm/common/cli/dmRestSessionCli.py | 2 +- .../dm/common/constants/dmServiceConstants.py | 4 +- src/python/dm/common/objects/observedFile.py | 8 +-- .../processing/plugins/fileTransferPlugin.py | 16 ++--- .../plugins/gridftpFileTransferPlugin.py | 26 ++++---- .../plugins/mongoDbFileCatalogPlugin.py | 4 +- .../plugins/rsyncFileTransferPlugin.py | 22 +++---- ...WithChecksumAndDeleteFileTransferPlugin.py | 62 ------------------- .../daq_web_service/api/experimentDaqApi.py | 6 +- .../cli/daqWebServiceSessionCli.py | 19 ++++-- .../dm/daq_web_service/cli/startDaqCli.py | 37 +++++------ .../dm/daq_web_service/cli/stopDaqCli.py | 2 +- .../dm/daq_web_service/cli/uploadCli.py | 10 +-- .../impl/experimentSessionControllerImpl.py | 12 ++-- .../service/impl/fileSystemObserver.py | 4 +- .../dm/ds_web_service/cli/addExperimentCli.py | 13 ++-- 20 files changed, 114 insertions(+), 158 deletions(-) delete mode 100755 src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py diff --git a/src/python/dm/aps_bss/cli/getProposalCli.py b/src/python/dm/aps_bss/cli/getProposalCli.py index b2f215c4..0f9e2871 100755 --- a/src/python/dm/aps_bss/cli/getProposalCli.py +++ b/src/python/dm/aps_bss/cli/getProposalCli.py @@ -9,7 +9,7 @@ class GetProposalCli(DmCli): DmCli.__init__(self) self.addOption('', '--id', dest='proposalId', help='Proposal id.') self.addOption('', '--run', dest='runName', help='Run name. If not provided, current run will be used.') - self.addOption('', '--login-file', dest='loginFile', help='BSS login file. Login file may also be specified via environment variable DM_BSS_LOGIN_FILE.') + self.addOption('', '--bss-login-file', dest='bssLoginFile', help='BSS login file. Login file may also be specified via environment variable DM_BSS_LOGIN_FILE.') def checkArgs(self): if not self.options.proposalId: @@ -19,14 +19,14 @@ class GetProposalCli(DmCli): self.parseArgs(usage=""" dm-get-proposal --id=PROPOSALID [--run=RUNNAME] - [--login-file=LOGINFILE] + [--bss-login-file=BSSLOGINFILE] Description: Retrieves beamline proposal for the given id. """) self.checkArgs() proposalId = int(self.options.proposalId) - api = ApsBssApi(loginFile=self.options.loginFile) + api = ApsBssApi(loginFile=self.options.bssLoginFile) proposal = api.getBeamlineProposal(proposalId=proposalId, runName=self.options.runName) print proposal.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) diff --git a/src/python/dm/aps_bss/cli/listProposalsCli.py b/src/python/dm/aps_bss/cli/listProposalsCli.py index 05b1f430..ae5c032f 100755 --- a/src/python/dm/aps_bss/cli/listProposalsCli.py +++ b/src/python/dm/aps_bss/cli/listProposalsCli.py @@ -7,7 +7,7 @@ class ListProposalsCli(DmCli): def __init__(self): DmCli.__init__(self) self.addOption('', '--run', dest='runName', help='Run name. If not provided, current run will be used.') - self.addOption('', '--login-file', dest='loginFile', help='BSS login file. It may be provided via environment variable DM_BSS_LOGIN_FILE.') + self.addOption('', '--bss-login-file', dest='bssLoginFile', help='BSS login file. It may be provided via environment variable DM_BSS_LOGIN_FILE.') def checkArgs(self): pass @@ -16,13 +16,13 @@ class ListProposalsCli(DmCli): self.parseArgs(usage=""" dm-list-proposals [--run=RUNNAME] - [--login-file=LOGINFILE] + [--bss-login-file=BSSLOGINFILE] Description: Retrieves list of beamline proposals for the given run. """) self.checkArgs() - api = ApsBssApi(loginFile=self.options.loginFile) + api = ApsBssApi(loginFile=self.options.bssLoginFile) proposals = api.listBeamlineProposals(runName=self.options.runName) for proposal in proposals: print proposal.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) diff --git a/src/python/dm/aps_bss/cli/listRunsCli.py b/src/python/dm/aps_bss/cli/listRunsCli.py index 2bdcf21c..68202309 100755 --- a/src/python/dm/aps_bss/cli/listRunsCli.py +++ b/src/python/dm/aps_bss/cli/listRunsCli.py @@ -6,20 +6,20 @@ from dm.aps_bss.api.apsBssApi import ApsBssApi class ListRunsCli(DmCli): def __init__(self): DmCli.__init__(self) - self.addOption('', '--login-file', dest='loginFile', help='BSS login file. It may be provided via environment variable DM_BSS_LOGIN_FILE.') + self.addOption('', '--bss-login-file', dest='bssLoginFile', help='BSS login file. It may be provided via environment variable DM_BSS_LOGIN_FILE.') def checkArgs(self): pass def runCommand(self): self.parseArgs(usage=""" - dm-list-runs [--login-file=LOGINFILE] + dm-list-runs [--bss-login-file=BSSLOGINFILE] Description: Retrieves list of available runs. """) self.checkArgs() - api = ApsBssApi(loginFile=self.options.loginFile) + api = ApsBssApi(loginFile=self.options.bssLoginFile) runs = api.listRuns() for run in runs: print run.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) diff --git a/src/python/dm/common/cli/dmCli.py b/src/python/dm/common/cli/dmCli.py index aeaec308..3be3e201 100755 --- a/src/python/dm/common/cli/dmCli.py +++ b/src/python/dm/common/cli/dmCli.py @@ -78,9 +78,12 @@ class DmCli(object): group = self.optionGroupDict.get(groupName) group.add_option(*args, **kwargs) - def addOptionGroup(self, groupName, desc): + def addOptionGroup(self, groupName, desc=None, prepend=False): group = OptionGroup(self.parser, groupName, desc) - self.parser.add_option_group(group) + if not prepend: + self.parser.add_option_group(group) + else: + self.parser.option_groups.insert(0,group) self.optionGroupDict[groupName] = group def processArgs(self): diff --git a/src/python/dm/common/cli/dmRestSessionCli.py b/src/python/dm/common/cli/dmRestSessionCli.py index 2c8cfe09..9b48e040 100755 --- a/src/python/dm/common/cli/dmRestSessionCli.py +++ b/src/python/dm/common/cli/dmRestSessionCli.py @@ -56,4 +56,4 @@ class DmRestSessionCli(DmRestCli): # Testing if __name__ == '__main__': - pass + pass diff --git a/src/python/dm/common/constants/dmServiceConstants.py b/src/python/dm/common/constants/dmServiceConstants.py index b74e76ee..95b98742 100755 --- a/src/python/dm/common/constants/dmServiceConstants.py +++ b/src/python/dm/common/constants/dmServiceConstants.py @@ -6,6 +6,6 @@ DM_SERVICE_PROTOCOL_HTTP = 'http' DM_SERVICE_PROTOCOL_HTTPS = 'https' DM_DAQ_MAX_RUN_TIME_IN_HOURS_KEY = 'maxRunTimeInHours' -DM_DAQ_TARGET_DIRECTORY_KEY = 'targetDirectory' +DM_DAQ_DEST_DIRECTORY_KEY = 'destDirectory' DM_DAQ_UPLOAD_DATA_DIRECTORY_ON_EXIT_KEY = 'uploadDataDirectoryOnExit' -DM_DAQ_UPLOAD_TARGET_DIRECTORY_ON_EXIT_KEY = 'uploadTargetDirectoryOnExit' +DM_DAQ_UPLOAD_DEST_DIRECTORY_ON_EXIT_KEY = 'uploadDestDirectoryOnExit' diff --git a/src/python/dm/common/objects/observedFile.py b/src/python/dm/common/objects/observedFile.py index 2ef2e31c..ffde9efc 100755 --- a/src/python/dm/common/objects/observedFile.py +++ b/src/python/dm/common/objects/observedFile.py @@ -9,7 +9,7 @@ class ObservedFile(DmObject): DEFAULT_KEY_LIST = [ 'filePath', 'lastUpdateTime' ] - def __init__(self, dict={}, filePath=None, dataDirectory=None, experiment=None, targetDirectory=None): + def __init__(self, dict={}, filePath=None, dataDirectory=None, experiment=None, destDirectory=None): DmObject.__init__(self, dict) if filePath: self['filePath'] = filePath @@ -18,9 +18,9 @@ class ObservedFile(DmObject): if filePath: parseResult = urlparse.urlparse(dataDirectory) self['experimentFilePath'] = os.path.relpath(filePath, parseResult.path) - if targetDirectory: - self['targetDirectory'] = targetDirectory - self['experimentFilePath'] = '%s/%s' % (targetDirectory, self['experimentFilePath']) + if destDirectory: + self['destDirectory'] = destDirectory + self['experimentFilePath'] = '%s/%s' % (destDirectory, self['experimentFilePath']) if experiment: self['experimentName'] = experiment.get('name') diff --git a/src/python/dm/common/processing/plugins/fileTransferPlugin.py b/src/python/dm/common/processing/plugins/fileTransferPlugin.py index 310962f6..a0c79eaf 100755 --- a/src/python/dm/common/processing/plugins/fileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/fileTransferPlugin.py @@ -28,11 +28,11 @@ class FileTransferPlugin(FileProcessor): def processFile(self, fileInfo): filePath = fileInfo.get('filePath') dataDirectory = fileInfo.get('dataDirectory') - targetDirectory = fileInfo.get('targetDirectory') + destDirectory = fileInfo.get('destDirectory') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') - destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory) + destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, destDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo) @@ -40,25 +40,25 @@ class FileTransferPlugin(FileProcessor): srcUrl = '%s/./%s' % (dataDirectory, os.path.relpath(filePath, dataDirectory)) return srcUrl - def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, targetDirectory=None): + def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, destDirectory=None): if self.dest: destUrl = '%s/' % (self.dest) else: destUrl = '%s:%s/' % (storageHost, storageDirectory) - if targetDirectory: - destUrl = '%s/%s/' % (destUrl, targetDirectory) + if destDirectory: + destUrl = '%s/%s/' % (destUrl, destDirectory) return destUrl def getSrcDirUrl(self, dataDirectory): return '%s/' % dataDirectory - def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, targetDirectory=None): + def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, destDirectory=None): if self.dest: destUrl = '%s/' % (self.dest) else: destUrl = '%s:%s/' % (storageHost, storageDirectory) - if targetDirectory: - destUrl = '%s/%s/' % (destUrl, targetDirectory) + if destDirectory: + destUrl = '%s/%s/' % (destUrl, destDirectory) return destUrl def getFullCommand(self, src, dest, command=None): diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 41957c3f..1b8544d4 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -51,15 +51,15 @@ class GridftpFileTransferPlugin(FileTransferPlugin): srcUrl = '%s/%s' % (self.src,filePath) return self.replaceSpecialCharacters(srcUrl) - def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, targetDirectory=None): + def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, destDirectory=None): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip() fileName = os.path.basename(filePath) if self.dest: destUrl = '%s/%s/%s' % (self.dest, dirName, fileName) else: - if targetDirectory: - destUrl = 'sshftp://%s/%s/%s/%s/%s' % (storageHost, storageDirectory, targetDirectory, dirName, fileName) + if destDirectory: + destUrl = 'sshftp://%s/%s/%s/%s/%s' % (storageHost, storageDirectory, destDirectory, dirName, fileName) else: destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) return self.replaceSpecialCharacters(destUrl) @@ -70,7 +70,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): storageHost = uploadInfo['storageHost'] storageDirectory = uploadInfo['storageDirectory'] dataDirectory = uploadInfo['dataDirectory'] - targetDirectory = uploadInfo.get('targetDirectory') + destDirectory = uploadInfo.get('destDirectory') self.logger.debug('Upload info: %s' % uploadInfo) # Original data directory may contain host/port (scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory) @@ -78,8 +78,8 @@ class GridftpFileTransferPlugin(FileTransferPlugin): self.logger.debug('Number of original files: %s' % len(filePathsDict)) ftpUtility = SftpUtility(storageHost) destDirectory = storageDirectory - if targetDirectory: - destDirectory = '%s/%s/' % (destDirectory, targetDirectory) + if destDirectory: + destDirectory = '%s/%s/' % (destDirectory, destDirectory) storageFilePathsDict = {} try: self.logger.debug('Looking for existing files in %s' % destDirectory) @@ -123,9 +123,9 @@ class GridftpFileTransferPlugin(FileTransferPlugin): experimentName = fileInfo.get('experimentName') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') - targetDirectory = fileInfo.get('targetDirectory') + destDirectory = fileInfo.get('destDirectory') - destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory) + destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, destDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) # Calculate checksum @@ -173,13 +173,13 @@ class GridftpFileTransferPlugin(FileTransferPlugin): srcUrl = '%s/%s/' % (self.src,dataDirectory) return srcUrl - def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, targetDirectory=None): + def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, destDirectory=None): if self.dest: destUrl = '%s/' % (self.dest) else: destUrl = 'sshftp://%s/%s/' % (storageHost, storageDirectory) - if targetDirectory: - destUrl = '%s/%s/' % (destUrl, targetDirectory) + if destDirectory: + destUrl = '%s/%s/' % (destUrl, destDirectory) return destUrl def processDirectory(self, directoryInfo): @@ -188,9 +188,9 @@ class GridftpFileTransferPlugin(FileTransferPlugin): experimentName = uploadInfo.get('experimentName') storageHost = uploadInfo.get('storageHost') storageDirectory = uploadInfo.get('storageDirectory') - targetDirectory = uploadInfo.get('targetDirectory') + destDirectory = uploadInfo.get('destDirectory') - destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, targetDirectory) + destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, destDirectory) srcUrl = self.getSrcDirUrl(dataDirectory) # Transfer directory diff --git a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py index e7fe8d69..b8801fe4 100755 --- a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py +++ b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py @@ -115,12 +115,12 @@ class MongoDbFileCatalogPlugin(FileProcessor): uploadId = uploadInfo.get('id') dataDirectory = uploadInfo.get('dataDirectory') - targetDirectory = uploadInfo.get('targetDirectory') + destDirectory = uploadInfo.get('destDirectory') nProcessedFiles = 0 nFiles = len(filePathsDict) for (filePath,filePathDict) in filePathsDict.items(): - fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory) + fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory) fileInfo.update(filePathDict) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index a85bfaf7..66c3806b 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -34,10 +34,10 @@ class RsyncFileTransferPlugin(FileTransferPlugin): storageDirectory = uploadInfo['storageDirectory'] storageHost = uploadInfo['storageHost'] dataDirectory = uploadInfo['dataDirectory'] - targetDirectory = uploadInfo.get('targetDirectory') + destDirectory = uploadInfo.get('destDirectory') dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory) - if targetDirectory: - dryRunCommand = '%s/%s/' % (dryRunCommand, targetDirectory) + if destDirectory: + dryRunCommand = '%s/%s/' % (dryRunCommand, destDirectory) subprocess = DmSubprocess.getSubprocess(dryRunCommand) subprocess.run() @@ -61,8 +61,8 @@ class RsyncFileTransferPlugin(FileTransferPlugin): experimentName = fileInfo.get('experimentName') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') - targetDirectory = fileInfo.get('targetDirectory') - destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory) + destDirectory = fileInfo.get('destDirectory') + destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, destDirectory) # Use relative path with respect to data directory as a source srcUrl = self.getSrcUrl(filePath, dataDirectory) @@ -75,8 +75,8 @@ class RsyncFileTransferPlugin(FileTransferPlugin): # Transfer file self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) command = self.command - if targetDirectory: - destDirectory = '%s/%s' % (storageDirectory,targetDirectory) + if destDirectory: + destDirectory = '%s/%s' % (storageDirectory,destDirectory) command = self.RSYNC_WITH_MKDIR_COMMAND % destDirectory self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=fileInfo, cwd=dataDirectory) @@ -107,16 +107,16 @@ class RsyncFileTransferPlugin(FileTransferPlugin): experimentName = uploadInfo.get('experimentName') storageHost = uploadInfo.get('storageHost') storageDirectory = uploadInfo.get('storageDirectory') - targetDirectory = uploadInfo.get('targetDirectory') + destDirectory = uploadInfo.get('destDirectory') - destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, targetDirectory) + destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, destDirectory) srcUrl = self.getSrcDirUrl(dataDirectory) # Transfer directory self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) command = self.DIRECTORY_TRANSFER_COMMAND - if targetDirectory: - destDirectory = '%s/%s' % (storageDirectory,targetDirectory) + if destDirectory: + destDirectory = '%s/%s' % (storageDirectory,destDirectory) command = self.DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND % destDirectory self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=uploadInfo, cwd=dataDirectory) diff --git a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py deleted file mode 100755 index 049ddf47..00000000 --- a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python - -import os -from dm.common.utility.loggingManager import LoggingManager -from dm.common.utility.osUtility import OsUtility -from dm.common.utility.fileUtility import FileUtility -from dm.common.exceptions.fileProcessingError import FileProcessingError -from dm.common.processing.plugins.fileTransferPlugin import FileTransferPlugin -from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory - -class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): - - COMMAND = 'rsync -arvlPR' - - def __init__(self, src=None, dest=None): - FileTransferPlugin.__init__(self, self.COMMAND, src, dest) - self.dsFileApi = DsRestApiFactory.getFileDsApi() - self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) - - def processFile(self, fileInfo): - filePath = fileInfo.get('filePath') - dataDirectory = fileInfo.get('dataDirectory') - experimentFilePath = fileInfo.get('experimentFilePath') - experimentName = fileInfo.get('experimentName') - storageHost = fileInfo.get('storageHost') - storageDirectory = fileInfo.get('storageDirectory') - targetDirectory = fileInfo.get('targetDirectory') - destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory) - # Use relative path with respect to data directory as a source - srcUrl = self.getSrcUrl(filePath, dataDirectory) - - # Calculate checksum - FileUtility.statFile(filePath, fileInfo) - FileUtility.getMd5Sum(filePath, fileInfo) - self.logger.debug('File info before transfer: %s' % fileInfo) - - # Transfer file - self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo, cwd=dataDirectory) - # Get remote checksum - fileInfo2 = {} - fileInfo2['experimentFilePath'] = experimentFilePath - fileInfo2['experimentName'] = experimentName - fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) - self.logger.debug('File stat after transfer: %s' % fileMetadata) - - # Verify checksum - if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): - self.logger.error('Checksum mismatch for file: %s' % filePath) - raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) - - # Remove file - self.logger.debug('Checksum test passed, deleting file %s' % filePath) - OsUtility.removeFile(srcUrl) - -####################################################################### -# Testing. -if __name__ == '__main__': - ft = RsyncWithChecksumAndDeleteFileTransferPlugin('/tmp/xyz', '/tmp/xyz2') - ft.start() - print 'StdOut: ', ft.getStdOut() - print 'StdErr: ', ft.getStdErr() - print 'Exit Status: ', ft.getExitStatus() diff --git a/src/python/dm/daq_web_service/api/experimentDaqApi.py b/src/python/dm/daq_web_service/api/experimentDaqApi.py index fc21edbf..cb68f542 100755 --- a/src/python/dm/daq_web_service/api/experimentDaqApi.py +++ b/src/python/dm/daq_web_service/api/experimentDaqApi.py @@ -59,10 +59,10 @@ class ExperimentDaqApi(DaqRestApi): :param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition; several keys have special meaning: - *processHiddenFiles* (bool): if set to True, hidden files will be processed - - *targetDirectory* (str): specifies directory path relative to experiment root directory where files will be stored + - *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored - *maxRunTimeInHours* (int): specifies maximum data acquisition run time in hours - *uploadDataDirectoryOnExit* (str): specifies URL of the data directory that should be uploaded after data acquisition completes - - *uploadTargetDirectoryOnExit* (str): specifies directory path relative to experiment root directory where uploaded files should be stored + - *uploadDestDirectoryOnExit* (str): specifies directory path relative to experiment root directory where uploaded files should be stored - *skipPlugins* (str): comma-separated list of plugins which should not process files :type daqInfo: dict @@ -193,7 +193,7 @@ class ExperimentDaqApi(DaqRestApi): - *processHiddenFiles* (bool): if set to True, hidden files will be processed - *reprocessFiles* (bool): if set to True, files will be uploaded regardless of whether or not they already exist in storage and have not changed - - *targetDirectory* (str): specifies directory path relative to experiment root directory where files will be stored + - *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored - *processingMode* (str): specifies processing mode, and can be set to "files" (service plugins process individual files one at a time) or "directory" (service plugins process entire directory at once; works faster for uploads of a large number of small files) - *skipPlugins* (str): comma-separated list of plugins which should not process files :type daqInfo: dict diff --git a/src/python/dm/daq_web_service/cli/daqWebServiceSessionCli.py b/src/python/dm/daq_web_service/cli/daqWebServiceSessionCli.py index e2e74172..48b090e6 100755 --- a/src/python/dm/daq_web_service/cli/daqWebServiceSessionCli.py +++ b/src/python/dm/daq_web_service/cli/daqWebServiceSessionCli.py @@ -8,7 +8,6 @@ from dm.common.utility.configurationManager import ConfigurationManager class DaqWebServiceSessionCli(DmRestSessionCli): """ DM DAQ web service session cli class. """ - DM_FILE_SERVER_URL_ENV_VAR = 'DM_FILE_SERVER_URL' DEFAULT_SESSION_CACHE_FILE = OsUtility.getUserHomeDir() + '/.dm/.daq.session.cache' def __init__(self, validArgCount=0): @@ -29,9 +28,19 @@ class DaqWebServiceSessionCli(DmRestSessionCli): def getDataDirectory(self): dataDirectory = self.options.dataDirectory - # Make sure data directory already does not have url scheme - if dataDirectory and dataDirectory.find('://') < 0: - fileServerUrl = os.environ.get(self.DM_FILE_SERVER_URL_ENV_VAR, '') + if not dataDirectory: + return None + if dataDirectory.find('://') < 0: + fileServerUrl = os.environ.get('DM_FILE_SERVER_URL') dataDirectory = '%s%s' % (fileServerUrl, dataDirectory) - return dataDirectory + replacementMap = os.environ.get('DM_DATA_DIRECTORY_MAP', '') + (scheme, host, port, dirPath) = FtpUtility.parseUrl(dataDirectory) + if dirPath and replacementMap: + # Map entries are expected to be in the form + # <original>|<replacement>;<original>|<replacement>;... + for entry in replacementMap.split(';'): + original = entry.split('|')[0] + replacement = entry.split('|')[1] + dirPath = dataDirectory.replace(original,replacement) + return FtpUtility.assembleUrl(scheme, host, port, dirPath) diff --git a/src/python/dm/daq_web_service/cli/startDaqCli.py b/src/python/dm/daq_web_service/cli/startDaqCli.py index 91248279..00c3c991 100755 --- a/src/python/dm/daq_web_service/cli/startDaqCli.py +++ b/src/python/dm/daq_web_service/cli/startDaqCli.py @@ -11,11 +11,12 @@ class StartDaqCli(DaqWebServiceSessionCli): def __init__(self): DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') - self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR) - self.addOption('', '--target-directory', dest='targetDirectory', help='Target directory relative to experiment root path.') - self.addOption('', '--run-time', dest='runTime', help='Maximum run time; it must be specified in hours (h) or days (d). Examples: "8h", "14d".') + self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory URL.') + self.addOption('', '--dest-directory', dest='destDirectory', + help='Destination directory relative to experiment root path.') + self.addOption('', '--duration', dest='duration', help='DAQ duration; it must be specified in hours (h) or days (d). Examples: "8h", "14d".') self.addOption('', '--upload-data-directory-on-exit', dest='uploadDataDirectoryOnExit', help='Data directory that will be uploaded automatically after DAQ is stopped.') - self.addOption('', '--upload-target-directory-on-exit', dest='uploadTargetDirectoryOnExit', help='Target directory relative to experiment root path for automatic upload after DAQ is stopped. Requires upload data directory to be specified.') + self.addOption('', '--upload-dest-directory-on-exit', dest='uploadDestDirectoryOnExit', help='Destination directory relative to experiment root path for automatic upload after DAQ is stopped. Requires upload data directory to be specified.') self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') def checkArgs(self): @@ -27,30 +28,30 @@ class StartDaqCli(DaqWebServiceSessionCli): def updateDaqInfoFromOptions(self, daqInfo): if self.options.processHidden: daqInfo['processHiddenFiles'] = True - if self.options.runTime: - runTime = self.options.runTime - if runTime.endswith('h'): - daqInfo['maxRunTimeInHours'] = int(runTime[0:-1]) - elif runTime.endswith('d'): - daqInfo['maxRunTimeInHours'] = int(runTime[0:-1])*self.HOURS_PER_DAY + if self.options.duration: + duration = self.options.duration + if duration.endswith('h'): + daqInfo['maxRunTimeInHours'] = int(duration[0:-1]) + elif duration.endswith('d'): + daqInfo['maxRunTimeInHours'] = int(duration[0:-1])*self.HOURS_PER_DAY else: raise InvalidRequest('Maximum run time must contain valid unit specifier: "h" for hours or "d" for days.') - if self.options.targetDirectory: - daqInfo['targetDirectory'] = self.options.targetDirectory + if self.options.destDirectory: + daqInfo['destDirectory'] = self.options.destDirectory if self.options.uploadDataDirectoryOnExit: daqInfo['uploadDataDirectoryOnExit'] = self.options.uploadDataDirectoryOnExit - if self.options.uploadTargetDirectoryOnExit: + if self.options.uploadDestDirectoryOnExit: if not self.options.uploadDataDirectoryOnExit: - raise InvalidRequest('Upload target directory on exit requires that upload data directory is specified as well.') - daqInfo['uploadTargetDirectoryOnExit'] = self.options.uploadTargetDirectoryOnExit + raise InvalidRequest('Upload destination directory on exit requires that upload data directory is specified as well.') + daqInfo['uploadDestDirectoryOnExit'] = self.options.uploadDestDirectoryOnExit def runCommand(self): self.parseArgs(usage=""" dm-start-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY - [--run-time=RUNTIME] - [--target-directory=TARGETDIRECTORY] + [--duration=DURATION] + [--dest-directory=DESTDIRECTORY] [--upload-data-directory-on-exit=UPLOADDATADIRECTORYONEXIT] - [--upload-target-directory-on-exit=UPLOADTARGETDIRECTORYONEXIT] + [--upload-dest-directory-on-exit=UPLOADDESTDIRECTORYONEXIT] [--process-hidden] [key1:value1, key2:value2, ...] diff --git a/src/python/dm/daq_web_service/cli/stopDaqCli.py b/src/python/dm/daq_web_service/cli/stopDaqCli.py index 0c956a05..66d934c0 100755 --- a/src/python/dm/daq_web_service/cli/stopDaqCli.py +++ b/src/python/dm/daq_web_service/cli/stopDaqCli.py @@ -8,7 +8,7 @@ class StopDaqCli(DaqWebServiceSessionCli): def __init__(self): DaqWebServiceSessionCli.__init__(self) self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') - self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR) + self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory URL.') def checkArgs(self): if self.options.experimentName is None: diff --git a/src/python/dm/daq_web_service/cli/uploadCli.py b/src/python/dm/daq_web_service/cli/uploadCli.py index d690ed54..89e0b3e6 100755 --- a/src/python/dm/daq_web_service/cli/uploadCli.py +++ b/src/python/dm/daq_web_service/cli/uploadCli.py @@ -9,8 +9,8 @@ class UploadCli(DaqWebServiceSessionCli): def __init__(self): DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') - self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR) - self.addOption('', '--target-directory', dest='targetDirectory', help='Target directory relative to experiment root path.') + self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory URL.') + self.addOption('', '--dest-directory', dest='destDirectory', help='Destination directory relative to experiment root path.') self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.') self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') self.addOption('', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY)) @@ -33,13 +33,13 @@ class UploadCli(DaqWebServiceSessionCli): if self.options.skipPlugins: daqInfo['skipPlugins'] = self.options.skipPlugins daqInfo['processingMode'] = self.options.processingMode - if self.options.targetDirectory: - daqInfo['targetDirectory'] = self.options.targetDirectory + if self.options.destDirectory: + daqInfo['destDirectory'] = self.options.destDirectory def runCommand(self): self.parseArgs(usage=""" dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY - [--target-directory=TARGETDIRECTORY] + [--dest-directory=DESTDIRECTORY] [--reprocess] [--process-hidden] [--processing-mode=PROCESSINGMODE] diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index e8e60c8a..4c9d1db6 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -88,10 +88,10 @@ class ExperimentSessionControllerImpl(DmObjectManager): self.logger.debug('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName)) daqInfo2 = {} daqInfo2['originalDaqId'] = daqId - uploadTargetDirectoryOnExit = daqInfo.get('uploadTargetDirectoryOnExit') - if uploadTargetDirectoryOnExit: - self.logger.debug('Automatic upload on exit for DAQ id %s (experiment %s) is using target directory: %s' % (daqId, experimentName, uploadTargetDirectoryOnExit)) - daqInfo2['targetDirectory'] = uploadTargetDirectoryOnExit + uploadDestDirectoryOnExit = daqInfo.get('uploadDestDirectoryOnExit') + if uploadDestDirectoryOnExit: + self.logger.debug('Automatic upload on exit for DAQ id %s (experiment %s) is using dest directory: %s' % (daqId, experimentName, uploadDestDirectoryOnExit)) + daqInfo2['destDirectory'] = uploadDestDirectoryOnExit try: uploadInfo = self.uploadFiles(experimentName, uploadDataDirectoryOnExit, daqInfo2) @@ -176,7 +176,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): uploadId = uploadInfo.get('id') self.logger.debug('Preparing upload id: %s' % uploadId) dataDirectory = uploadInfo.get('dataDirectory') - targetDirectory = uploadInfo.get('targetDirectory') + destDirectory = uploadInfo.get('destDirectory') fileProcessingManager = FileProcessingManager.getInstance() try: @@ -223,7 +223,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): self.logger.debug('Upload %s has %s files waiting for upload, will not add any more for %s seconds' % (uploadId, nWaitingFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)) time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS) - fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory) + fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory) fileInfo.update(filePathDict) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index dce1dafa..75e71bf6 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -105,9 +105,9 @@ class FileSystemObserver(threading.Thread,Singleton): daqId = daqInfo['id'] observedFile = self.observedFileMap.get(filePath) - targetDirectory = daqInfo.get('targetDirectory') + destDirectory = daqInfo.get('destDirectory') if not observedFile: - observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory) + observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory) observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys() observedFile['statusMonitor'] = daqInfo self.observedFileMap[filePath] = observedFile diff --git a/src/python/dm/ds_web_service/cli/addExperimentCli.py b/src/python/dm/ds_web_service/cli/addExperimentCli.py index 70c41059..8d4270e5 100755 --- a/src/python/dm/ds_web_service/cli/addExperimentCli.py +++ b/src/python/dm/ds_web_service/cli/addExperimentCli.py @@ -78,14 +78,19 @@ class AddExperimentCli(DsWebServiceSessionCli): return proposalId def getUsers(self): + # Return list of users and beamline managers that can access data users = self.options.users if users: - # Remove duplicates by converting into set - users = list(set(users.split(','))) - + users = users.split(',') else: users = [] - return users + beamlineManagers = ConfigurationManager.getInstance().getBeamlineManagers() + if beamlineManagers: + beamlineManagers = beamlineManagers.split(',') + else: + beamlineManagers = [] + # Remove duplicates by converting into set + return list(set(users+beamlineManagers)) def runCommand(self): self.parseArgs(usage=""" -- GitLab