diff --git a/bin/dm-get-processing-plugins b/bin/dm-list-processing-plugins similarity index 77% rename from bin/dm-get-processing-plugins rename to bin/dm-list-processing-plugins index e3a0e6af615e036088c7175973726ab4c83aaf5b..369ed9c29af64b55b8bbd1c68aaa83b7dc6cdd76 100755 --- a/bin/dm-get-processing-plugins +++ b/bin/dm-list-processing-plugins @@ -12,6 +12,6 @@ if [ -z $DM_ROOT_DIR ]; then source $setupFile > /dev/null fi -$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/getProcessingPluginsCli.py $@ +$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/listProcessingPluginsCli.py $@ diff --git a/doc/sphinx/source/dm.daq_web_service.api.rst b/doc/sphinx/source/dm.daq_web_service.api.rst index 1aac5ccb79e60be2d4172146df3a1703f7d41668..c19a4347cc865e46bd2ac8a0f059c2700c8ad045 100644 --- a/doc/sphinx/source/dm.daq_web_service.api.rst +++ b/doc/sphinx/source/dm.daq_web_service.api.rst @@ -7,6 +7,6 @@ ExperimentDaqApi .. autoclass:: dm.daq_web_service.api.experimentDaqApi.ExperimentDaqApi() :show-inheritance: - :members: __init__, startDaq, stopDaq + :members: __init__, startDaq, stopDaq, listDaqs, getDaqInfo, upload, stopUpload, listUploads, getUploadInfo, listProcessingPlugins diff --git a/src/python/dm/daq_web_service/api/experimentDaqApi.py b/src/python/dm/daq_web_service/api/experimentDaqApi.py index b9b9f9ccb30d6cb43de77d66baa7c405e8a023f9..f0e0b00d36ce3d295031fecde34bb1064c3e200e 100755 --- a/src/python/dm/daq_web_service/api/experimentDaqApi.py +++ b/src/python/dm/daq_web_service/api/experimentDaqApi.py @@ -40,12 +40,15 @@ class ExperimentDaqApi(DaqRestApi): >>> api = ExperimentDaqApi(username='dm', password='XYZ', host='bluegill1', port=22236, protocol='https') ''' + DaqRestApi.__init__(self, username, password, host, port, protocol) @DaqRestApi.execute2 def startDaq(self, experimentName, dataDirectory, daqInfo={}): ''' Start data acquisition (real-time directory monitoring and file upload). + Only files created or modified after this command was issued will be + uploaded to storage. :param experimentName: experiment name :type experimentName: str @@ -53,10 +56,17 @@ class ExperimentDaqApi(DaqRestApi): :param dataDirectory: data directory URL :type dataDirectory: str - :param daqInfo: DAQ info - :type dataDirectory: dict + :param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition; several keys have special meaning: + + - *processHiddenFiles* (bool): if set to True, hidden files will be processed + - *targetDirectory* (str): specifies directory path relative to experiment root directory where files will be stored + - *maxRunTimeInHours* (int): specifies maximum data acquisition run time in hours + - *uploadDataDirectoryOnExit* (str): specifies URL of the data directory that should be uploaded after data acquisition completes + - *uploadTargetDirectoryOnExit* (str): specifies directory path relative to experiment root directory where uploaded files should be stored - :returns: DAQ id + :type daqInfo: dict + + :returns: DaqInfo object :raises InvalidRequest: in case of empty experiment name or data directory @@ -66,8 +76,11 @@ class ExperimentDaqApi(DaqRestApi): :raises DmException: in case of any other errors - >>> daqId = api.startDaq(experimentName='test01', dataDirectory='/home/dm/test') + >>> daqInfo = api.startDaq(experimentName='test01', dataDirectory='/home/dm/test') + >>> daqId = daqInfo.get('id') + >>> daqStatus = daqInfo.get('status') ''' + if not experimentName: raise InvalidRequest('Experiment name must be provided.') if not dataDirectory: @@ -88,6 +101,8 @@ class ExperimentDaqApi(DaqRestApi): :param dataDirectory: data directory URL :type dataDirectory: str + :returns: DaqInfo object + :raises InvalidRequest: in case of empty experiment name or data directory :raises AuthorizationError: in case user is not authorized to manage DM station @@ -96,8 +111,10 @@ class ExperimentDaqApi(DaqRestApi): :raises DmException: in case of any other errors - >>> api.stopDaq(experimentName='test01', dataDirectory='/home/dm/test') + >>> daqInfo = api.stopDaq(experimentName='test01', dataDirectory='/home/dm/test') + >>> daqStatus = daqInfo.get('status') ''' + if not experimentName: raise InvalidRequest('Experiment name must be provided.') if not dataDirectory: @@ -106,24 +123,94 @@ class ExperimentDaqApi(DaqRestApi): responseDict = self.sendSessionRequest(url=url, method='POST') return DaqInfo(responseDict) - @DaqRestApi.execute + @DaqRestApi.execute2 def getDaqInfo(self, id): + ''' + Get data acquisition details. + + :param id: data acquisition id + :type id: str + + :returns: DaqInfo object + + :raises InvalidRequest: in case of invalid (empty or None) id + + :raises AuthorizationError: in case user is not authorized to manage DM station + + :raises ObjectNotFound: if there is no known DAQ with a given id + + :raises DmException: in case of any other errors + + >>> daqInfo = api.getDaqInfo('84311a75-c88b-4605-8948-08257eae6f5c') + >>> daqStatus = daqInfo.get('status') + ''' + if not id: raise InvalidRequest('Daq id must be provided.') url = '%s/experimentDaqs/%s' % (self.getContextRoot(),id) responseDict = self.sendSessionRequest(url=url, method='GET') return DaqInfo(responseDict) - @DaqRestApi.execute + @DaqRestApi.execute2 def listDaqs(self, status=None): + ''' + Get list of known DAQs. + + :param status: status string; if not supplied, all DAQs will be included in the returned list + :type status: str + + :returns: list of DaqInfo objects + + :raises AuthorizationError: in case user is not authorized to manage DM station + + :raises DmException: in case of any other errors + + >>> daqInfoList = api.listDaqs() + >>> for daqInfo in daqInfoList: + >>> print daqInfo['id'], daqInfo['status'] + ''' + if not status: status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status) responseData = self.sendSessionRequest(url=url, method='GET') return self.toDmObjectList(responseData, DaqInfo) - @DaqRestApi.execute + @DaqRestApi.execute2 def upload(self, experimentName, dataDirectory, daqInfo={}): + ''' + Upload files from the given directory. Only files found at the time + when command was issued will be uploaded to storage. + + :param experimentName: experiment name + :type experimentName: str + + :param dataDirectory: data directory URL + :type dataDirectory: str + + :param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition or processing that generated files that will be uploaded; several keys have special meaning: + + - *processHiddenFiles* (bool): if set to True, hidden files will be processed + - *reprocessFiles* (bool): if set to True, files will be uploaded regardless of whether or not they already exist in storage and have not changed + - *targetDirectory* (str): specifies directory path relative to experiment root directory where files will be stored + - *processingMode* (str): specifies processing mode, and can be set to "files" (service plugins process individual files one at a time) or "directory" (service plugins process entire directory at once; works faster for uploads of a large number of small files) + - *skipPlugins* (str): relevant for the "directory" processing mode; comma-separated list of plugins which should not process the given directory + :type daqInfo: dict + + :returns: UploadInfo object + + :raises InvalidRequest: in case of invalid input arguments + + :raises AuthorizationError: in case user is not authorized to manage DM station + + :raises ObjectNotFound: in case experiment does not exist + + :raises DmException: in case of any other errors + + >>> uploadInfo = api.upload(experimentName='test01', dataDirectory='/home/dm/test', daqInfo={'sample' : 'A1'}) + >>> uploadId = uploadInfo.get('id') + ''' + if not experimentName: raise InvalidRequest('Experiment name must be provided.') if not dataDirectory: @@ -135,30 +222,100 @@ class ExperimentDaqApi(DaqRestApi): @DaqRestApi.execute def getUploadInfo(self, id): + ''' + Get upload details. + + :param id: upload id + :type id: str + + :returns: UploadInfo object + + :raises InvalidRequest: in case of invalid (empty or None) id + + :raises AuthorizationError: in case user is not authorized to manage DM station + + :raises ObjectNotFound: if there is no known upload with a given id + + :raises DmException: in case of any other errors + + >>> uplaodInfo = api.getUploadInfo('84311a75-c88b-4605-8948-08257eae6f5c') + >>> uploadStatus = uploadInfo.get('status') + ''' + url = '%s/experimentUploads/%s' % (self.getContextRoot(),id) if not id: raise InvalidRequest('Upload id must be provided.') responseDict = self.sendSessionRequest(url=url, method='GET') return UploadInfo(responseDict) - @DaqRestApi.execute + @DaqRestApi.execute2 def listUploads(self, status=None): + ''' + Get list of known uploads. + + :param status: status string; if not supplied, all uploads will be included in the returned list + :type status: str + + :returns: list of UploadInfo objects + + :raises AuthorizationError: in case user is not authorized to manage DM station + + :raises DmException: in case of any other errors + + >>> uploadInfoList = api.listUploads(status='running') + >>> for uploadInfo in uploadInfoList: + >>> print uploadInfo['id'] + ''' + if not status: status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status) responseData = self.sendSessionRequest(url=url, method='GET') return self.toDmObjectList(responseData, UploadInfo) - @DaqRestApi.execute + @DaqRestApi.execute2 def stopUpload(self, id): + ''' + Abort upload. + + :param id: upload id + :type id: str + + :returns: UploadInfo object + + :raises InvalidRequest: in case of invalid (empty or None) id + + :raises AuthorizationError: in case user is not authorized to manage DM station + + :raises ObjectNotFound: if there is no known upload with a given id + + :raises DmException: in case of any other errors + + >>> uplaodInfo = api.stopUpload('84311a75-c88b-4605-8948-08257eae6f5c') + >>> print uploadInfo.get('nCompletedFiles') + ''' url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id) if not id: raise InvalidRequest('Upload id must be provided.') responseDict = self.sendSessionRequest(url=url, method='POST') return UploadInfo(responseDict) - @DaqRestApi.execute - def getProcessingPlugins(self): + @DaqRestApi.execute2 + def listProcessingPlugins(self): + ''' + Get list of DAQ service processing plugins. + + :returns: list of PluginInfo objects + + :raises AuthorizationError: in case user is not authorized to manage DM station + + :raises DmException: in case of any other errors + + >>> pluginInfoList = api.getProcessingPlugins() + >>> for pluginInfo in pluginInfoList: + >>> print pluginInfo['name'] + ''' + url = '%s/processingPlugins' % (self.getContextRoot()) responseData = self.sendSessionRequest(url=url, method='GET') return self.toDmObjectList(responseData, PluginInfo) diff --git a/src/python/dm/daq_web_service/cli/getProcessingPluginsCli.py b/src/python/dm/daq_web_service/cli/listProcessingPluginsCli.py similarity index 81% rename from src/python/dm/daq_web_service/cli/getProcessingPluginsCli.py rename to src/python/dm/daq_web_service/cli/listProcessingPluginsCli.py index 42b41ab41e7b3509c1a900187c17af77be6aa616..329f39c0da20e9c75014b219f0964c077af01617 100755 --- a/src/python/dm/daq_web_service/cli/getProcessingPluginsCli.py +++ b/src/python/dm/daq_web_service/cli/listProcessingPluginsCli.py @@ -3,19 +3,19 @@ from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi from daqWebServiceSessionCli import DaqWebServiceSessionCli -class GetProcessingPluginsCli(DaqWebServiceSessionCli): +class ListProcessingPluginsCli(DaqWebServiceSessionCli): def __init__(self): DaqWebServiceSessionCli.__init__(self) def runCommand(self): self.parseArgs(usage=""" - dm-get-processing-plugins + dm-list-processing-plugins Description: Retrieves list of known processing plugins. """) api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) - plugins = api.getProcessingPlugins() + plugins = api.listProcessingPlugins() for plugin in plugins: print plugin.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) @@ -23,5 +23,5 @@ Description: ####################################################################### # Run command. if __name__ == '__main__': - cli = GetProcessingPluginsCli() + cli = ListProcessingPluginsCli() cli.run() diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index c0312f962e2f3c606b9ced8812ebe8873a0aa5d9..c97159188edc908a918026d96e948278330c65e3 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -36,8 +36,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): UPLOAD_CHUNK_SIZE_IN_FILES = 100 UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0 DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0 - #SECONDS_PER_HOUR = 60*60 - SECONDS_PER_HOUR = 60 + SECONDS_PER_HOUR = 60*60 def __init__(self): DmObjectManager.__init__(self)