From d3290ab08f3a4f29b31075480ccd630e794ddb12 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Fri, 24 Feb 2017 20:52:37 +0000 Subject: [PATCH] added ability to specify target directory for daqs/uploads; added max runtime option for daqs; added ability to specify upload of a directory after exit for daqs --- .../dm/common/constants/dmServiceConstants.py | 5 ++ src/python/dm/common/objects/observedFile.py | 7 ++- .../processing/plugins/fileTransferPlugin.py | 11 +++-- .../plugins/gridftpFileTransferPlugin.py | 22 ++++++--- .../plugins/mongoDbFileCatalogPlugin.py | 3 +- .../plugins/rsyncFileTransferPlugin.py | 24 ++++++++-- ...WithChecksumAndDeleteFileTransferPlugin.py | 3 +- .../dm/daq_web_service/cli/startDaqCli.py | 27 +++++++++++ .../dm/daq_web_service/cli/uploadCli.py | 6 ++- .../impl/experimentSessionControllerImpl.py | 48 ++++++++++++++++++- .../service/impl/fileSystemObserver.py | 3 +- 11 files changed, 139 insertions(+), 20 deletions(-) diff --git a/src/python/dm/common/constants/dmServiceConstants.py b/src/python/dm/common/constants/dmServiceConstants.py index 1a141935..b74e76ee 100755 --- a/src/python/dm/common/constants/dmServiceConstants.py +++ b/src/python/dm/common/constants/dmServiceConstants.py @@ -4,3 +4,8 @@ DM_SERVICE_PROTOCOL_HTTP = 'http' DM_SERVICE_PROTOCOL_HTTPS = 'https' + +DM_DAQ_MAX_RUN_TIME_IN_HOURS_KEY = 'maxRunTimeInHours' +DM_DAQ_TARGET_DIRECTORY_KEY = 'targetDirectory' +DM_DAQ_UPLOAD_DATA_DIRECTORY_ON_EXIT_KEY = 'uploadDataDirectoryOnExit' +DM_DAQ_UPLOAD_TARGET_DIRECTORY_ON_EXIT_KEY = 'uploadTargetDirectoryOnExit' diff --git a/src/python/dm/common/objects/observedFile.py b/src/python/dm/common/objects/observedFile.py index 2648d958..2ef2e31c 100755 --- a/src/python/dm/common/objects/observedFile.py +++ b/src/python/dm/common/objects/observedFile.py @@ -9,7 +9,7 @@ class ObservedFile(DmObject): DEFAULT_KEY_LIST = [ 'filePath', 'lastUpdateTime' ] - def __init__(self, dict={}, filePath=None, dataDirectory=None, experiment=None): + def __init__(self, dict={}, filePath=None, dataDirectory=None, experiment=None, targetDirectory=None): DmObject.__init__(self, dict) if filePath: self['filePath'] = filePath @@ -18,11 +18,14 @@ class ObservedFile(DmObject): if filePath: parseResult = urlparse.urlparse(dataDirectory) self['experimentFilePath'] = os.path.relpath(filePath, parseResult.path) + if targetDirectory: + self['targetDirectory'] = targetDirectory + self['experimentFilePath'] = '%s/%s' % (targetDirectory, self['experimentFilePath']) + if experiment: self['experimentName'] = experiment.get('name') self['storageHost'] = experiment.get('storageHost') self['storageDirectory'] = experiment.get('storageDirectory') - def setLastUpdateTimeToNow(self): self['lastUpdateTime'] = time.time() diff --git a/src/python/dm/common/processing/plugins/fileTransferPlugin.py b/src/python/dm/common/processing/plugins/fileTransferPlugin.py index 6d22cc28..0b2548bd 100755 --- a/src/python/dm/common/processing/plugins/fileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/fileTransferPlugin.py @@ -26,10 +26,11 @@ class FileTransferPlugin(FileProcessor): def processFile(self, fileInfo): filePath = fileInfo.get('filePath') dataDirectory = fileInfo.get('dataDirectory') + targetDirectory = fileInfo.get('targetDirectory') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') - destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) + destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo) @@ -37,21 +38,25 @@ class FileTransferPlugin(FileProcessor): srcUrl = '%s/./%s' % (dataDirectory, os.path.relpath(filePath, dataDirectory)) return srcUrl - def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory): + def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, targetDirectory=None): if self.dest: destUrl = '%s/' % (self.dest) else: destUrl = '%s:%s/' % (storageHost, storageDirectory) + if targetDirectory: + destUrl = '%s/%s/' % (destUrl, targetDirectory) return destUrl def getSrcDirUrl(self, dataDirectory): return '%s/' % dataDirectory - def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory): + def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, targetDirectory=None): if self.dest: destUrl = '%s/' % (self.dest) else: destUrl = '%s:%s/' % (storageHost, storageDirectory) + if targetDirectory: + destUrl = '%s/%s/' % (destUrl, targetDirectory) return destUrl def getFullCommand(self, src, dest, command=None): diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 03c56c2a..d42ec1f6 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -48,7 +48,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): srcUrl = '%s/%s' % (self.src,filePath) return self.replaceSpecialCharacters(srcUrl) - def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory): + def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, targetDirectory=None): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip() fileName = os.path.basename(filePath) @@ -56,6 +56,8 @@ class GridftpFileTransferPlugin(FileTransferPlugin): destUrl = '%s/%s/%s' % (self.dest, dirName, fileName) else: destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) + if targetDirectory: + destUrl = '%s/%s/' % (destUrl, targetDirectory) return self.replaceSpecialCharacters(destUrl) def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): @@ -64,6 +66,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): storageHost = uploadInfo['storageHost'] storageDirectory = uploadInfo['storageDirectory'] dataDirectory = uploadInfo['dataDirectory'] + targetDirectory = uploadInfo.get('targetDirectory') self.logger.debug('Upload info: %s' % uploadInfo) # Original data directory may contain host/port (scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory) @@ -71,8 +74,11 @@ class GridftpFileTransferPlugin(FileTransferPlugin): self.logger.debug('Number of original files: %s' % len(filePathsDict)) self.logger.debug('Looking for existing files in %s' % storageDirectory) ftpUtility = SftpUtility(storageHost) - storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, replacementDirPath) - self.logger.debug('There are %s files in %s' % (len(storageFilePathsDict), storageDirectory)) + destDirectory = storageDirectory + if targetDirectory: + destDirectory = '%s/%s/' % (destDirectory, targetDirectory) + storageFilePathsDict = ftpUtility.getFiles(destDirectory, {}, replacementDirPath) + self.logger.debug('There are %s files in %s' % (len(storageFilePathsDict), destDirectory)) # Remove file from plugin dict if we do not need to transfer it for (filePath,storageFilePathDict) in storageFilePathsDict.items(): filePathDict = filePathsDict.get(filePath) @@ -108,8 +114,9 @@ class GridftpFileTransferPlugin(FileTransferPlugin): experimentName = fileInfo.get('experimentName') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') + targetDirectory = fileInfo.get('targetDirectory') - destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) + destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) # Calculate checksum @@ -157,11 +164,13 @@ class GridftpFileTransferPlugin(FileTransferPlugin): srcUrl = '%s/%s/' % (self.src,dataDirectory) return srcUrl - def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory): + def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, targetDirectory=None): if self.dest: destUrl = '%s/' % (self.dest) else: destUrl = 'sshftp://%s/%s/' % (storageHost, storageDirectory) + if targetDirectory: + destUrl = '%s/%s/' % (destUrl, targetDirectory) return destUrl def processDirectory(self, directoryInfo): @@ -170,8 +179,9 @@ class GridftpFileTransferPlugin(FileTransferPlugin): experimentName = uploadInfo.get('experimentName') storageHost = uploadInfo.get('storageHost') storageDirectory = uploadInfo.get('storageDirectory') + targetDirectory = uploadInfo.get('targetDirectory') - destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory) + destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, targetDirectory) srcUrl = self.getSrcDirUrl(dataDirectory) # Transfer directory diff --git a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py index 4c79ea8c..e7fe8d69 100755 --- a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py +++ b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py @@ -115,11 +115,12 @@ class MongoDbFileCatalogPlugin(FileProcessor): uploadId = uploadInfo.get('id') dataDirectory = uploadInfo.get('dataDirectory') + targetDirectory = uploadInfo.get('targetDirectory') nProcessedFiles = 0 nFiles = len(filePathsDict) for (filePath,filePathDict) in filePathsDict.items(): - fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) + fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory) fileInfo.update(filePathDict) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index ae3ef7bc..dc211439 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -12,7 +12,9 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class RsyncFileTransferPlugin(FileTransferPlugin): DEFAULT_COMMAND = 'rsync -arvlPR --' + RSYNC_WITH_MKDIR_COMMAND = 'rsync -arvlPR --rsync-path="mkdir -p %s && rsync" --' DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP --' + DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND = 'rsync -arvlP --rsync-path="mkdir -p %s && rsync" --' DRY_RUN_COMMAND = 'rsync -arvlP --dry-run --' def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): @@ -29,7 +31,11 @@ class RsyncFileTransferPlugin(FileTransferPlugin): storageDirectory = uploadInfo['storageDirectory'] storageHost = uploadInfo['storageHost'] dataDirectory = uploadInfo['dataDirectory'] + targetDirectory = uploadInfo.get('targetDirectory') dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory) + if targetDirectory: + dryRunCommand = '%s/%s/' % (dryRunCommand, targetDirectory) + subprocess = DmSubprocess.getSubprocess(dryRunCommand) subprocess.run() lines = subprocess.getStdOut().split('\n') @@ -52,7 +58,8 @@ class RsyncFileTransferPlugin(FileTransferPlugin): experimentName = fileInfo.get('experimentName') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') - destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) + targetDirectory = fileInfo.get('targetDirectory') + destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory) # Use relative path with respect to data directory as a source srcUrl = self.getSrcUrl(filePath, dataDirectory) @@ -64,7 +71,11 @@ class RsyncFileTransferPlugin(FileTransferPlugin): # Transfer file self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) - self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo, cwd=dataDirectory) + command = self.command + if targetDirectory: + destDirectory = '%s/%s' % (storageDirectory,targetDirectory) + command = self.RSYNC_WITH_MKDIR_COMMAND % destDirectory + self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=fileInfo, cwd=dataDirectory) # Get remote checksum if self.remoteMd5Sum: @@ -93,13 +104,18 @@ class RsyncFileTransferPlugin(FileTransferPlugin): experimentName = uploadInfo.get('experimentName') storageHost = uploadInfo.get('storageHost') storageDirectory = uploadInfo.get('storageDirectory') + targetDirectory = uploadInfo.get('targetDirectory') - destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory) + destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, targetDirectory) srcUrl = self.getSrcDirUrl(dataDirectory) # Transfer directory self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) - self.start(src=srcUrl, dest=destUrl, command=self.DIRECTORY_TRANSFER_COMMAND, templateInfo=uploadInfo, cwd=dataDirectory) + command = self.DIRECTORY_TRANSFER_COMMAND + if targetDirectory: + destDirectory = '%s/%s' % (storageDirectory,targetDirectory) + command = self.DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND % destDirectory + self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=uploadInfo, cwd=dataDirectory) ####################################################################### # Testing. diff --git a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py index 515bff88..e206f090 100755 --- a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py @@ -24,7 +24,8 @@ class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): experimentName = fileInfo.get('experimentName') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') - destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) + targetDirectory = fileInfo.get('targetDirectory') + destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory) # Use relative path with respect to data directory as a source srcUrl = self.getSrcUrl(filePath, dataDirectory) diff --git a/src/python/dm/daq_web_service/cli/startDaqCli.py b/src/python/dm/daq_web_service/cli/startDaqCli.py index 9bbf662d..91248279 100755 --- a/src/python/dm/daq_web_service/cli/startDaqCli.py +++ b/src/python/dm/daq_web_service/cli/startDaqCli.py @@ -5,10 +5,17 @@ from dm.common.exceptions.invalidRequest import InvalidRequest from daqWebServiceSessionCli import DaqWebServiceSessionCli class StartDaqCli(DaqWebServiceSessionCli): + + HOURS_PER_DAY = 24 + def __init__(self): DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR) + self.addOption('', '--target-directory', dest='targetDirectory', help='Target directory relative to experiment root path.') + self.addOption('', '--run-time', dest='runTime', help='Maximum run time; it must be specified in hours (h) or days (d). Examples: "8h", "14d".') + self.addOption('', '--upload-data-directory-on-exit', dest='uploadDataDirectoryOnExit', help='Data directory that will be uploaded automatically after DAQ is stopped.') + self.addOption('', '--upload-target-directory-on-exit', dest='uploadTargetDirectoryOnExit', help='Target directory relative to experiment root path for automatic upload after DAQ is stopped. Requires upload data directory to be specified.') self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') def checkArgs(self): @@ -20,10 +27,30 @@ class StartDaqCli(DaqWebServiceSessionCli): def updateDaqInfoFromOptions(self, daqInfo): if self.options.processHidden: daqInfo['processHiddenFiles'] = True + if self.options.runTime: + runTime = self.options.runTime + if runTime.endswith('h'): + daqInfo['maxRunTimeInHours'] = int(runTime[0:-1]) + elif runTime.endswith('d'): + daqInfo['maxRunTimeInHours'] = int(runTime[0:-1])*self.HOURS_PER_DAY + else: + raise InvalidRequest('Maximum run time must contain valid unit specifier: "h" for hours or "d" for days.') + if self.options.targetDirectory: + daqInfo['targetDirectory'] = self.options.targetDirectory + if self.options.uploadDataDirectoryOnExit: + daqInfo['uploadDataDirectoryOnExit'] = self.options.uploadDataDirectoryOnExit + if self.options.uploadTargetDirectoryOnExit: + if not self.options.uploadDataDirectoryOnExit: + raise InvalidRequest('Upload target directory on exit requires that upload data directory is specified as well.') + daqInfo['uploadTargetDirectoryOnExit'] = self.options.uploadTargetDirectoryOnExit def runCommand(self): self.parseArgs(usage=""" dm-start-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY + [--run-time=RUNTIME] + [--target-directory=TARGETDIRECTORY] + [--upload-data-directory-on-exit=UPLOADDATADIRECTORYONEXIT] + [--upload-target-directory-on-exit=UPLOADTARGETDIRECTORYONEXIT] [--process-hidden] [key1:value1, key2:value2, ...] diff --git a/src/python/dm/daq_web_service/cli/uploadCli.py b/src/python/dm/daq_web_service/cli/uploadCli.py index 9185ff10..d690ed54 100755 --- a/src/python/dm/daq_web_service/cli/uploadCli.py +++ b/src/python/dm/daq_web_service/cli/uploadCli.py @@ -10,6 +10,7 @@ class UploadCli(DaqWebServiceSessionCli): DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR) + self.addOption('', '--target-directory', dest='targetDirectory', help='Target directory relative to experiment root path.') self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.') self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') self.addOption('', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY)) @@ -32,10 +33,13 @@ class UploadCli(DaqWebServiceSessionCli): if self.options.skipPlugins: daqInfo['skipPlugins'] = self.options.skipPlugins daqInfo['processingMode'] = self.options.processingMode - + if self.options.targetDirectory: + daqInfo['targetDirectory'] = self.options.targetDirectory + def runCommand(self): self.parseArgs(usage=""" dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY + [--target-directory=TARGETDIRECTORY] [--reprocess] [--process-hidden] [--processing-mode=PROCESSINGMODE] diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index 2669c5ad..c0312f96 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -36,6 +36,8 @@ class ExperimentSessionControllerImpl(DmObjectManager): UPLOAD_CHUNK_SIZE_IN_FILES = 100 UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0 DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0 + #SECONDS_PER_HOUR = 60*60 + SECONDS_PER_HOUR = 60 def __init__(self): DmObjectManager.__init__(self) @@ -49,13 +51,56 @@ class ExperimentSessionControllerImpl(DmObjectManager): raise InvalidRequest('Experiment %s has not been started.' % experimentName) daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo) FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) + maxRunTimeInHours = daqInfo.get('maxRunTimeInHours') + if maxRunTimeInHours: + daqId = daqInfo['id'] + self.logger.debug('Starting timer to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours' % (daqId, experimentName, maxRunTimeInHours)) + maxRunTimeInSeconds = maxRunTimeInHours*self.SECONDS_PER_HOUR + timer = threading.Timer(maxRunTimeInSeconds, self.stopDaqTimer, args=[experimentName, dataDirectory, daqId]) + timer.start() + return daqInfo + def stopDaqTimer(self, experimentName, dataDirectory, daqId): + try: + daqInfo = DaqTracker.getInstance().getDaqInfo(daqId) + maxRunTimeInHours = daqInfo.get('maxRunTimeInHours') + self.logger.debug('Attempting to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours was exceeded' % (daqId, experimentName, maxRunTimeInHours)) + daqStatus = daqInfo.get('status') + if daqStatus != dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING: + self.logger.debug('DAQ id %s has status of %s, will not stop it automatically' % (daqId, daqStatus)) + return + self.stopDaq(experimentName, dataDirectory) + except Exception, ex: + self.logger.error('Could not automatically stop DAQ id %s: %s' % (daqId, str(ex))) + + def stopDaq(self, experimentName, dataDirectory): experiment = self.dsExperimentApi.getExperimentByName(experimentName) daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory) FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) daqInfo.updateStatus() + daqId = daqInfo.get('id') + self.logger.error('Stopped DAQ id %s for experiment %s' % (daqId, experimentName)) + + # Prepare upload on exit + uploadDataDirectoryOnExit = daqInfo.get('uploadDataDirectoryOnExit') + if uploadDataDirectoryOnExit: + self.logger.error('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName)) + daqInfo2 = {} + daqInfo2['originalDaqId'] = daqId + uploadTargetDirectoryOnExit = daqInfo.get('uploadTargetDirectoryOnExit') + if uploadTargetDirectoryOnExit: + self.logger.debug('Automatic upload on exit for DAQ id %s (experiment %s) is using target directory: %s' % (daqId, experimentName, uploadTargetDirectoryOnExit)) + daqInfo2['targetDirectory'] = uploadTargetDirectoryOnExit + + try: + uploadInfo = self.uploadFiles(experimentName, uploadDataDirectoryOnExit, daqInfo2) + daqInfo['uploadIdOnExit'] = uploadInfo.get('id') + except Exception, ex: + self.logger.error('Could not automatically upload DAQ id %s: %s' % (daqId, str(ex))) + daqInfo['uploadErrorOnExit'] = str(ex) + return daqInfo def getDaqInfo(self, id): @@ -132,6 +177,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): uploadId = uploadInfo.get('id') self.logger.debug('Preparing upload id: %s' % uploadId) dataDirectory = uploadInfo.get('dataDirectory') + targetDirectory = uploadInfo.get('targetDirectory') fileProcessingManager = FileProcessingManager.getInstance() try: @@ -178,7 +224,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): self.logger.debug('Upload %s has %s files waiting for upload, will not add any more for %s seconds' % (uploadId, nWaitingFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)) time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS) - fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) + fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory) fileInfo.update(filePathDict) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index 9a0a6d90..dce1dafa 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -105,8 +105,9 @@ class FileSystemObserver(threading.Thread,Singleton): daqId = daqInfo['id'] observedFile = self.observedFileMap.get(filePath) + targetDirectory = daqInfo.get('targetDirectory') if not observedFile: - observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) + observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory) observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys() observedFile['statusMonitor'] = daqInfo self.observedFileMap[filePath] = observedFile -- GitLab