Skip to content
Snippets Groups Projects
Commit d3290ab0 authored by sveseli's avatar sveseli
Browse files

added ability to specify target directory for daqs/uploads; added max runtime...

added ability to specify target directory for daqs/uploads; added max runtime option for daqs; added ability to specify upload of a directory after exit for daqs
parent c658c1e4
No related branches found
No related tags found
No related merge requests found
Showing
with 139 additions and 20 deletions
...@@ -4,3 +4,8 @@ ...@@ -4,3 +4,8 @@
DM_SERVICE_PROTOCOL_HTTP = 'http' DM_SERVICE_PROTOCOL_HTTP = 'http'
DM_SERVICE_PROTOCOL_HTTPS = 'https' DM_SERVICE_PROTOCOL_HTTPS = 'https'
DM_DAQ_MAX_RUN_TIME_IN_HOURS_KEY = 'maxRunTimeInHours'
DM_DAQ_TARGET_DIRECTORY_KEY = 'targetDirectory'
DM_DAQ_UPLOAD_DATA_DIRECTORY_ON_EXIT_KEY = 'uploadDataDirectoryOnExit'
DM_DAQ_UPLOAD_TARGET_DIRECTORY_ON_EXIT_KEY = 'uploadTargetDirectoryOnExit'
...@@ -9,7 +9,7 @@ class ObservedFile(DmObject): ...@@ -9,7 +9,7 @@ class ObservedFile(DmObject):
DEFAULT_KEY_LIST = [ 'filePath', 'lastUpdateTime' ] DEFAULT_KEY_LIST = [ 'filePath', 'lastUpdateTime' ]
def __init__(self, dict={}, filePath=None, dataDirectory=None, experiment=None): def __init__(self, dict={}, filePath=None, dataDirectory=None, experiment=None, targetDirectory=None):
DmObject.__init__(self, dict) DmObject.__init__(self, dict)
if filePath: if filePath:
self['filePath'] = filePath self['filePath'] = filePath
...@@ -18,11 +18,14 @@ class ObservedFile(DmObject): ...@@ -18,11 +18,14 @@ class ObservedFile(DmObject):
if filePath: if filePath:
parseResult = urlparse.urlparse(dataDirectory) parseResult = urlparse.urlparse(dataDirectory)
self['experimentFilePath'] = os.path.relpath(filePath, parseResult.path) self['experimentFilePath'] = os.path.relpath(filePath, parseResult.path)
if targetDirectory:
self['targetDirectory'] = targetDirectory
self['experimentFilePath'] = '%s/%s' % (targetDirectory, self['experimentFilePath'])
if experiment: if experiment:
self['experimentName'] = experiment.get('name') self['experimentName'] = experiment.get('name')
self['storageHost'] = experiment.get('storageHost') self['storageHost'] = experiment.get('storageHost')
self['storageDirectory'] = experiment.get('storageDirectory') self['storageDirectory'] = experiment.get('storageDirectory')
def setLastUpdateTimeToNow(self): def setLastUpdateTimeToNow(self):
self['lastUpdateTime'] = time.time() self['lastUpdateTime'] = time.time()
......
...@@ -26,10 +26,11 @@ class FileTransferPlugin(FileProcessor): ...@@ -26,10 +26,11 @@ class FileTransferPlugin(FileProcessor):
def processFile(self, fileInfo): def processFile(self, fileInfo):
filePath = fileInfo.get('filePath') filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory') dataDirectory = fileInfo.get('dataDirectory')
targetDirectory = fileInfo.get('targetDirectory')
storageHost = fileInfo.get('storageHost') storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory') storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory)
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo) self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)
...@@ -37,21 +38,25 @@ class FileTransferPlugin(FileProcessor): ...@@ -37,21 +38,25 @@ class FileTransferPlugin(FileProcessor):
srcUrl = '%s/./%s' % (dataDirectory, os.path.relpath(filePath, dataDirectory)) srcUrl = '%s/./%s' % (dataDirectory, os.path.relpath(filePath, dataDirectory))
return srcUrl return srcUrl
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory): def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, targetDirectory=None):
if self.dest: if self.dest:
destUrl = '%s/' % (self.dest) destUrl = '%s/' % (self.dest)
else: else:
destUrl = '%s:%s/' % (storageHost, storageDirectory) destUrl = '%s:%s/' % (storageHost, storageDirectory)
if targetDirectory:
destUrl = '%s/%s/' % (destUrl, targetDirectory)
return destUrl return destUrl
def getSrcDirUrl(self, dataDirectory): def getSrcDirUrl(self, dataDirectory):
return '%s/' % dataDirectory return '%s/' % dataDirectory
def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory): def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, targetDirectory=None):
if self.dest: if self.dest:
destUrl = '%s/' % (self.dest) destUrl = '%s/' % (self.dest)
else: else:
destUrl = '%s:%s/' % (storageHost, storageDirectory) destUrl = '%s:%s/' % (storageHost, storageDirectory)
if targetDirectory:
destUrl = '%s/%s/' % (destUrl, targetDirectory)
return destUrl return destUrl
def getFullCommand(self, src, dest, command=None): def getFullCommand(self, src, dest, command=None):
......
...@@ -48,7 +48,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -48,7 +48,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
srcUrl = '%s/%s' % (self.src,filePath) srcUrl = '%s/%s' % (self.src,filePath)
return self.replaceSpecialCharacters(srcUrl) return self.replaceSpecialCharacters(srcUrl)
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory): def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, targetDirectory=None):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip() dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip()
fileName = os.path.basename(filePath) fileName = os.path.basename(filePath)
...@@ -56,6 +56,8 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -56,6 +56,8 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
destUrl = '%s/%s/%s' % (self.dest, dirName, fileName) destUrl = '%s/%s/%s' % (self.dest, dirName, fileName)
else: else:
destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
if targetDirectory:
destUrl = '%s/%s/' % (destUrl, targetDirectory)
return self.replaceSpecialCharacters(destUrl) return self.replaceSpecialCharacters(destUrl)
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
...@@ -64,6 +66,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -64,6 +66,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
storageHost = uploadInfo['storageHost'] storageHost = uploadInfo['storageHost']
storageDirectory = uploadInfo['storageDirectory'] storageDirectory = uploadInfo['storageDirectory']
dataDirectory = uploadInfo['dataDirectory'] dataDirectory = uploadInfo['dataDirectory']
targetDirectory = uploadInfo.get('targetDirectory')
self.logger.debug('Upload info: %s' % uploadInfo) self.logger.debug('Upload info: %s' % uploadInfo)
# Original data directory may contain host/port # Original data directory may contain host/port
(scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory) (scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
...@@ -71,8 +74,11 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -71,8 +74,11 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
self.logger.debug('Number of original files: %s' % len(filePathsDict)) self.logger.debug('Number of original files: %s' % len(filePathsDict))
self.logger.debug('Looking for existing files in %s' % storageDirectory) self.logger.debug('Looking for existing files in %s' % storageDirectory)
ftpUtility = SftpUtility(storageHost) ftpUtility = SftpUtility(storageHost)
storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, replacementDirPath) destDirectory = storageDirectory
self.logger.debug('There are %s files in %s' % (len(storageFilePathsDict), storageDirectory)) if targetDirectory:
destDirectory = '%s/%s/' % (destDirectory, targetDirectory)
storageFilePathsDict = ftpUtility.getFiles(destDirectory, {}, replacementDirPath)
self.logger.debug('There are %s files in %s' % (len(storageFilePathsDict), destDirectory))
# Remove file from plugin dict if we do not need to transfer it # Remove file from plugin dict if we do not need to transfer it
for (filePath,storageFilePathDict) in storageFilePathsDict.items(): for (filePath,storageFilePathDict) in storageFilePathsDict.items():
filePathDict = filePathsDict.get(filePath) filePathDict = filePathsDict.get(filePath)
...@@ -108,8 +114,9 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -108,8 +114,9 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
experimentName = fileInfo.get('experimentName') experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost') storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory') storageDirectory = fileInfo.get('storageDirectory')
targetDirectory = fileInfo.get('targetDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum # Calculate checksum
...@@ -157,11 +164,13 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -157,11 +164,13 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
srcUrl = '%s/%s/' % (self.src,dataDirectory) srcUrl = '%s/%s/' % (self.src,dataDirectory)
return srcUrl return srcUrl
def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory): def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, targetDirectory=None):
if self.dest: if self.dest:
destUrl = '%s/' % (self.dest) destUrl = '%s/' % (self.dest)
else: else:
destUrl = 'sshftp://%s/%s/' % (storageHost, storageDirectory) destUrl = 'sshftp://%s/%s/' % (storageHost, storageDirectory)
if targetDirectory:
destUrl = '%s/%s/' % (destUrl, targetDirectory)
return destUrl return destUrl
def processDirectory(self, directoryInfo): def processDirectory(self, directoryInfo):
...@@ -170,8 +179,9 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -170,8 +179,9 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
experimentName = uploadInfo.get('experimentName') experimentName = uploadInfo.get('experimentName')
storageHost = uploadInfo.get('storageHost') storageHost = uploadInfo.get('storageHost')
storageDirectory = uploadInfo.get('storageDirectory') storageDirectory = uploadInfo.get('storageDirectory')
targetDirectory = uploadInfo.get('targetDirectory')
destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory) destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, targetDirectory)
srcUrl = self.getSrcDirUrl(dataDirectory) srcUrl = self.getSrcDirUrl(dataDirectory)
# Transfer directory # Transfer directory
......
...@@ -115,11 +115,12 @@ class MongoDbFileCatalogPlugin(FileProcessor): ...@@ -115,11 +115,12 @@ class MongoDbFileCatalogPlugin(FileProcessor):
uploadId = uploadInfo.get('id') uploadId = uploadInfo.get('id')
dataDirectory = uploadInfo.get('dataDirectory') dataDirectory = uploadInfo.get('dataDirectory')
targetDirectory = uploadInfo.get('targetDirectory')
nProcessedFiles = 0 nProcessedFiles = 0
nFiles = len(filePathsDict) nFiles = len(filePathsDict)
for (filePath,filePathDict) in filePathsDict.items(): for (filePath,filePathDict) in filePathsDict.items():
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory)
fileInfo.update(filePathDict) fileInfo.update(filePathDict)
fileInfo['daqInfo'] = daqInfo fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId fileInfo['uploadId'] = uploadId
......
...@@ -12,7 +12,9 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory ...@@ -12,7 +12,9 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class RsyncFileTransferPlugin(FileTransferPlugin): class RsyncFileTransferPlugin(FileTransferPlugin):
DEFAULT_COMMAND = 'rsync -arvlPR --' DEFAULT_COMMAND = 'rsync -arvlPR --'
RSYNC_WITH_MKDIR_COMMAND = 'rsync -arvlPR --rsync-path="mkdir -p %s && rsync" --'
DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP --' DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP --'
DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND = 'rsync -arvlP --rsync-path="mkdir -p %s && rsync" --'
DRY_RUN_COMMAND = 'rsync -arvlP --dry-run --' DRY_RUN_COMMAND = 'rsync -arvlP --dry-run --'
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
...@@ -29,7 +31,11 @@ class RsyncFileTransferPlugin(FileTransferPlugin): ...@@ -29,7 +31,11 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
storageDirectory = uploadInfo['storageDirectory'] storageDirectory = uploadInfo['storageDirectory']
storageHost = uploadInfo['storageHost'] storageHost = uploadInfo['storageHost']
dataDirectory = uploadInfo['dataDirectory'] dataDirectory = uploadInfo['dataDirectory']
targetDirectory = uploadInfo.get('targetDirectory')
dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory) dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory)
if targetDirectory:
dryRunCommand = '%s/%s/' % (dryRunCommand, targetDirectory)
subprocess = DmSubprocess.getSubprocess(dryRunCommand) subprocess = DmSubprocess.getSubprocess(dryRunCommand)
subprocess.run() subprocess.run()
lines = subprocess.getStdOut().split('\n') lines = subprocess.getStdOut().split('\n')
...@@ -52,7 +58,8 @@ class RsyncFileTransferPlugin(FileTransferPlugin): ...@@ -52,7 +58,8 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
experimentName = fileInfo.get('experimentName') experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost') storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory') storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) targetDirectory = fileInfo.get('targetDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory)
# Use relative path with respect to data directory as a source # Use relative path with respect to data directory as a source
srcUrl = self.getSrcUrl(filePath, dataDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory)
...@@ -64,7 +71,11 @@ class RsyncFileTransferPlugin(FileTransferPlugin): ...@@ -64,7 +71,11 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
# Transfer file # Transfer file
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo, cwd=dataDirectory) command = self.command
if targetDirectory:
destDirectory = '%s/%s' % (storageDirectory,targetDirectory)
command = self.RSYNC_WITH_MKDIR_COMMAND % destDirectory
self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=fileInfo, cwd=dataDirectory)
# Get remote checksum # Get remote checksum
if self.remoteMd5Sum: if self.remoteMd5Sum:
...@@ -93,13 +104,18 @@ class RsyncFileTransferPlugin(FileTransferPlugin): ...@@ -93,13 +104,18 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
experimentName = uploadInfo.get('experimentName') experimentName = uploadInfo.get('experimentName')
storageHost = uploadInfo.get('storageHost') storageHost = uploadInfo.get('storageHost')
storageDirectory = uploadInfo.get('storageDirectory') storageDirectory = uploadInfo.get('storageDirectory')
targetDirectory = uploadInfo.get('targetDirectory')
destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory) destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, targetDirectory)
srcUrl = self.getSrcDirUrl(dataDirectory) srcUrl = self.getSrcDirUrl(dataDirectory)
# Transfer directory # Transfer directory
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, command=self.DIRECTORY_TRANSFER_COMMAND, templateInfo=uploadInfo, cwd=dataDirectory) command = self.DIRECTORY_TRANSFER_COMMAND
if targetDirectory:
destDirectory = '%s/%s' % (storageDirectory,targetDirectory)
command = self.DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND % destDirectory
self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=uploadInfo, cwd=dataDirectory)
####################################################################### #######################################################################
# Testing. # Testing.
......
...@@ -24,7 +24,8 @@ class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): ...@@ -24,7 +24,8 @@ class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin):
experimentName = fileInfo.get('experimentName') experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost') storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory') storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) targetDirectory = fileInfo.get('targetDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory)
# Use relative path with respect to data directory as a source # Use relative path with respect to data directory as a source
srcUrl = self.getSrcUrl(filePath, dataDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory)
......
...@@ -5,10 +5,17 @@ from dm.common.exceptions.invalidRequest import InvalidRequest ...@@ -5,10 +5,17 @@ from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StartDaqCli(DaqWebServiceSessionCli): class StartDaqCli(DaqWebServiceSessionCli):
HOURS_PER_DAY = 24
def __init__(self): def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR) self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR)
self.addOption('', '--target-directory', dest='targetDirectory', help='Target directory relative to experiment root path.')
self.addOption('', '--run-time', dest='runTime', help='Maximum run time; it must be specified in hours (h) or days (d). Examples: "8h", "14d".')
self.addOption('', '--upload-data-directory-on-exit', dest='uploadDataDirectoryOnExit', help='Data directory that will be uploaded automatically after DAQ is stopped.')
self.addOption('', '--upload-target-directory-on-exit', dest='uploadTargetDirectoryOnExit', help='Target directory relative to experiment root path for automatic upload after DAQ is stopped. Requires upload data directory to be specified.')
self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
def checkArgs(self): def checkArgs(self):
...@@ -20,10 +27,30 @@ class StartDaqCli(DaqWebServiceSessionCli): ...@@ -20,10 +27,30 @@ class StartDaqCli(DaqWebServiceSessionCli):
def updateDaqInfoFromOptions(self, daqInfo): def updateDaqInfoFromOptions(self, daqInfo):
if self.options.processHidden: if self.options.processHidden:
daqInfo['processHiddenFiles'] = True daqInfo['processHiddenFiles'] = True
if self.options.runTime:
runTime = self.options.runTime
if runTime.endswith('h'):
daqInfo['maxRunTimeInHours'] = int(runTime[0:-1])
elif runTime.endswith('d'):
daqInfo['maxRunTimeInHours'] = int(runTime[0:-1])*self.HOURS_PER_DAY
else:
raise InvalidRequest('Maximum run time must contain valid unit specifier: "h" for hours or "d" for days.')
if self.options.targetDirectory:
daqInfo['targetDirectory'] = self.options.targetDirectory
if self.options.uploadDataDirectoryOnExit:
daqInfo['uploadDataDirectoryOnExit'] = self.options.uploadDataDirectoryOnExit
if self.options.uploadTargetDirectoryOnExit:
if not self.options.uploadDataDirectoryOnExit:
raise InvalidRequest('Upload target directory on exit requires that upload data directory is specified as well.')
daqInfo['uploadTargetDirectoryOnExit'] = self.options.uploadTargetDirectoryOnExit
def runCommand(self): def runCommand(self):
self.parseArgs(usage=""" self.parseArgs(usage="""
dm-start-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY dm-start-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[--run-time=RUNTIME]
[--target-directory=TARGETDIRECTORY]
[--upload-data-directory-on-exit=UPLOADDATADIRECTORYONEXIT]
[--upload-target-directory-on-exit=UPLOADTARGETDIRECTORYONEXIT]
[--process-hidden] [--process-hidden]
[key1:value1, key2:value2, ...] [key1:value1, key2:value2, ...]
......
...@@ -10,6 +10,7 @@ class UploadCli(DaqWebServiceSessionCli): ...@@ -10,6 +10,7 @@ class UploadCli(DaqWebServiceSessionCli):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR) self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR)
self.addOption('', '--target-directory', dest='targetDirectory', help='Target directory relative to experiment root path.')
self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.') self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.')
self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.') self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
self.addOption('', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY)) self.addOption('', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY))
...@@ -32,10 +33,13 @@ class UploadCli(DaqWebServiceSessionCli): ...@@ -32,10 +33,13 @@ class UploadCli(DaqWebServiceSessionCli):
if self.options.skipPlugins: if self.options.skipPlugins:
daqInfo['skipPlugins'] = self.options.skipPlugins daqInfo['skipPlugins'] = self.options.skipPlugins
daqInfo['processingMode'] = self.options.processingMode daqInfo['processingMode'] = self.options.processingMode
if self.options.targetDirectory:
daqInfo['targetDirectory'] = self.options.targetDirectory
def runCommand(self): def runCommand(self):
self.parseArgs(usage=""" self.parseArgs(usage="""
dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[--target-directory=TARGETDIRECTORY]
[--reprocess] [--reprocess]
[--process-hidden] [--process-hidden]
[--processing-mode=PROCESSINGMODE] [--processing-mode=PROCESSINGMODE]
......
...@@ -36,6 +36,8 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -36,6 +36,8 @@ class ExperimentSessionControllerImpl(DmObjectManager):
UPLOAD_CHUNK_SIZE_IN_FILES = 100 UPLOAD_CHUNK_SIZE_IN_FILES = 100
UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0 UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0
DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0 DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0
#SECONDS_PER_HOUR = 60*60
SECONDS_PER_HOUR = 60
def __init__(self): def __init__(self):
DmObjectManager.__init__(self) DmObjectManager.__init__(self)
...@@ -49,13 +51,56 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -49,13 +51,56 @@ class ExperimentSessionControllerImpl(DmObjectManager):
raise InvalidRequest('Experiment %s has not been started.' % experimentName) raise InvalidRequest('Experiment %s has not been started.' % experimentName)
daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo) daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo)
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
maxRunTimeInHours = daqInfo.get('maxRunTimeInHours')
if maxRunTimeInHours:
daqId = daqInfo['id']
self.logger.debug('Starting timer to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours' % (daqId, experimentName, maxRunTimeInHours))
maxRunTimeInSeconds = maxRunTimeInHours*self.SECONDS_PER_HOUR
timer = threading.Timer(maxRunTimeInSeconds, self.stopDaqTimer, args=[experimentName, dataDirectory, daqId])
timer.start()
return daqInfo return daqInfo
def stopDaqTimer(self, experimentName, dataDirectory, daqId):
try:
daqInfo = DaqTracker.getInstance().getDaqInfo(daqId)
maxRunTimeInHours = daqInfo.get('maxRunTimeInHours')
self.logger.debug('Attempting to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours was exceeded' % (daqId, experimentName, maxRunTimeInHours))
daqStatus = daqInfo.get('status')
if daqStatus != dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING:
self.logger.debug('DAQ id %s has status of %s, will not stop it automatically' % (daqId, daqStatus))
return
self.stopDaq(experimentName, dataDirectory)
except Exception, ex:
self.logger.error('Could not automatically stop DAQ id %s: %s' % (daqId, str(ex)))
def stopDaq(self, experimentName, dataDirectory): def stopDaq(self, experimentName, dataDirectory):
experiment = self.dsExperimentApi.getExperimentByName(experimentName) experiment = self.dsExperimentApi.getExperimentByName(experimentName)
daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory) daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
daqInfo.updateStatus() daqInfo.updateStatus()
daqId = daqInfo.get('id')
self.logger.error('Stopped DAQ id %s for experiment %s' % (daqId, experimentName))
# Prepare upload on exit
uploadDataDirectoryOnExit = daqInfo.get('uploadDataDirectoryOnExit')
if uploadDataDirectoryOnExit:
self.logger.error('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName))
daqInfo2 = {}
daqInfo2['originalDaqId'] = daqId
uploadTargetDirectoryOnExit = daqInfo.get('uploadTargetDirectoryOnExit')
if uploadTargetDirectoryOnExit:
self.logger.debug('Automatic upload on exit for DAQ id %s (experiment %s) is using target directory: %s' % (daqId, experimentName, uploadTargetDirectoryOnExit))
daqInfo2['targetDirectory'] = uploadTargetDirectoryOnExit
try:
uploadInfo = self.uploadFiles(experimentName, uploadDataDirectoryOnExit, daqInfo2)
daqInfo['uploadIdOnExit'] = uploadInfo.get('id')
except Exception, ex:
self.logger.error('Could not automatically upload DAQ id %s: %s' % (daqId, str(ex)))
daqInfo['uploadErrorOnExit'] = str(ex)
return daqInfo return daqInfo
def getDaqInfo(self, id): def getDaqInfo(self, id):
...@@ -132,6 +177,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -132,6 +177,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
uploadId = uploadInfo.get('id') uploadId = uploadInfo.get('id')
self.logger.debug('Preparing upload id: %s' % uploadId) self.logger.debug('Preparing upload id: %s' % uploadId)
dataDirectory = uploadInfo.get('dataDirectory') dataDirectory = uploadInfo.get('dataDirectory')
targetDirectory = uploadInfo.get('targetDirectory')
fileProcessingManager = FileProcessingManager.getInstance() fileProcessingManager = FileProcessingManager.getInstance()
try: try:
...@@ -178,7 +224,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -178,7 +224,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
self.logger.debug('Upload %s has %s files waiting for upload, will not add any more for %s seconds' % (uploadId, nWaitingFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)) self.logger.debug('Upload %s has %s files waiting for upload, will not add any more for %s seconds' % (uploadId, nWaitingFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS))
time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS) time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory)
fileInfo.update(filePathDict) fileInfo.update(filePathDict)
fileInfo['daqInfo'] = daqInfo fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId fileInfo['uploadId'] = uploadId
......
...@@ -105,8 +105,9 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -105,8 +105,9 @@ class FileSystemObserver(threading.Thread,Singleton):
daqId = daqInfo['id'] daqId = daqInfo['id']
observedFile = self.observedFileMap.get(filePath) observedFile = self.observedFileMap.get(filePath)
targetDirectory = daqInfo.get('targetDirectory')
if not observedFile: if not observedFile:
observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory)
observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys() observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys()
observedFile['statusMonitor'] = daqInfo observedFile['statusMonitor'] = daqInfo
self.observedFileMap[filePath] = observedFile self.observedFileMap[filePath] = observedFile
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment