Skip to content
Snippets Groups Projects
Commit 696b1894 authored by sveseli's avatar sveseli
Browse files

rename getProcessingPlugins to listProcessingPlugins; completed initial...

rename getProcessingPlugins to listProcessingPlugins; completed initial documentation for experiment daq api; add target directory functionality for uploads
parent d3290ab0
No related branches found
No related tags found
No related merge requests found
...@@ -12,6 +12,6 @@ if [ -z $DM_ROOT_DIR ]; then ...@@ -12,6 +12,6 @@ if [ -z $DM_ROOT_DIR ]; then
source $setupFile > /dev/null source $setupFile > /dev/null
fi fi
$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/getProcessingPluginsCli.py $@ $DM_ROOT_DIR/src/python/dm/daq_web_service/cli/listProcessingPluginsCli.py $@
...@@ -7,6 +7,6 @@ ExperimentDaqApi ...@@ -7,6 +7,6 @@ ExperimentDaqApi
.. autoclass:: dm.daq_web_service.api.experimentDaqApi.ExperimentDaqApi() .. autoclass:: dm.daq_web_service.api.experimentDaqApi.ExperimentDaqApi()
:show-inheritance: :show-inheritance:
:members: __init__, startDaq, stopDaq :members: __init__, startDaq, stopDaq, listDaqs, getDaqInfo, upload, stopUpload, listUploads, getUploadInfo, listProcessingPlugins
...@@ -40,12 +40,15 @@ class ExperimentDaqApi(DaqRestApi): ...@@ -40,12 +40,15 @@ class ExperimentDaqApi(DaqRestApi):
>>> api = ExperimentDaqApi(username='dm', password='XYZ', host='bluegill1', port=22236, protocol='https') >>> api = ExperimentDaqApi(username='dm', password='XYZ', host='bluegill1', port=22236, protocol='https')
''' '''
DaqRestApi.__init__(self, username, password, host, port, protocol) DaqRestApi.__init__(self, username, password, host, port, protocol)
@DaqRestApi.execute2 @DaqRestApi.execute2
def startDaq(self, experimentName, dataDirectory, daqInfo={}): def startDaq(self, experimentName, dataDirectory, daqInfo={}):
''' '''
Start data acquisition (real-time directory monitoring and file upload). Start data acquisition (real-time directory monitoring and file upload).
Only files created or modified after this command was issued will be
uploaded to storage.
:param experimentName: experiment name :param experimentName: experiment name
:type experimentName: str :type experimentName: str
...@@ -53,10 +56,17 @@ class ExperimentDaqApi(DaqRestApi): ...@@ -53,10 +56,17 @@ class ExperimentDaqApi(DaqRestApi):
:param dataDirectory: data directory URL :param dataDirectory: data directory URL
:type dataDirectory: str :type dataDirectory: str
:param daqInfo: DAQ info :param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition; several keys have special meaning:
:type dataDirectory: dict
- *processHiddenFiles* (bool): if set to True, hidden files will be processed
- *targetDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
- *maxRunTimeInHours* (int): specifies maximum data acquisition run time in hours
- *uploadDataDirectoryOnExit* (str): specifies URL of the data directory that should be uploaded after data acquisition completes
- *uploadTargetDirectoryOnExit* (str): specifies directory path relative to experiment root directory where uploaded files should be stored
:returns: DAQ id :type daqInfo: dict
:returns: DaqInfo object
:raises InvalidRequest: in case of empty experiment name or data directory :raises InvalidRequest: in case of empty experiment name or data directory
...@@ -66,8 +76,11 @@ class ExperimentDaqApi(DaqRestApi): ...@@ -66,8 +76,11 @@ class ExperimentDaqApi(DaqRestApi):
:raises DmException: in case of any other errors :raises DmException: in case of any other errors
>>> daqId = api.startDaq(experimentName='test01', dataDirectory='/home/dm/test') >>> daqInfo = api.startDaq(experimentName='test01', dataDirectory='/home/dm/test')
>>> daqId = daqInfo.get('id')
>>> daqStatus = daqInfo.get('status')
''' '''
if not experimentName: if not experimentName:
raise InvalidRequest('Experiment name must be provided.') raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory: if not dataDirectory:
...@@ -88,6 +101,8 @@ class ExperimentDaqApi(DaqRestApi): ...@@ -88,6 +101,8 @@ class ExperimentDaqApi(DaqRestApi):
:param dataDirectory: data directory URL :param dataDirectory: data directory URL
:type dataDirectory: str :type dataDirectory: str
:returns: DaqInfo object
:raises InvalidRequest: in case of empty experiment name or data directory :raises InvalidRequest: in case of empty experiment name or data directory
:raises AuthorizationError: in case user is not authorized to manage DM station :raises AuthorizationError: in case user is not authorized to manage DM station
...@@ -96,8 +111,10 @@ class ExperimentDaqApi(DaqRestApi): ...@@ -96,8 +111,10 @@ class ExperimentDaqApi(DaqRestApi):
:raises DmException: in case of any other errors :raises DmException: in case of any other errors
>>> api.stopDaq(experimentName='test01', dataDirectory='/home/dm/test') >>> daqInfo = api.stopDaq(experimentName='test01', dataDirectory='/home/dm/test')
>>> daqStatus = daqInfo.get('status')
''' '''
if not experimentName: if not experimentName:
raise InvalidRequest('Experiment name must be provided.') raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory: if not dataDirectory:
...@@ -106,24 +123,94 @@ class ExperimentDaqApi(DaqRestApi): ...@@ -106,24 +123,94 @@ class ExperimentDaqApi(DaqRestApi):
responseDict = self.sendSessionRequest(url=url, method='POST') responseDict = self.sendSessionRequest(url=url, method='POST')
return DaqInfo(responseDict) return DaqInfo(responseDict)
@DaqRestApi.execute @DaqRestApi.execute2
def getDaqInfo(self, id): def getDaqInfo(self, id):
'''
Get data acquisition details.
:param id: data acquisition id
:type id: str
:returns: DaqInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known DAQ with a given id
:raises DmException: in case of any other errors
>>> daqInfo = api.getDaqInfo('84311a75-c88b-4605-8948-08257eae6f5c')
>>> daqStatus = daqInfo.get('status')
'''
if not id: if not id:
raise InvalidRequest('Daq id must be provided.') raise InvalidRequest('Daq id must be provided.')
url = '%s/experimentDaqs/%s' % (self.getContextRoot(),id) url = '%s/experimentDaqs/%s' % (self.getContextRoot(),id)
responseDict = self.sendSessionRequest(url=url, method='GET') responseDict = self.sendSessionRequest(url=url, method='GET')
return DaqInfo(responseDict) return DaqInfo(responseDict)
@DaqRestApi.execute @DaqRestApi.execute2
def listDaqs(self, status=None): def listDaqs(self, status=None):
'''
Get list of known DAQs.
:param status: status string; if not supplied, all DAQs will be included in the returned list
:type status: str
:returns: list of DaqInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> daqInfoList = api.listDaqs()
>>> for daqInfo in daqInfoList:
>>> print daqInfo['id'], daqInfo['status']
'''
if not status: if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status) url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET') responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, DaqInfo) return self.toDmObjectList(responseData, DaqInfo)
@DaqRestApi.execute @DaqRestApi.execute2
def upload(self, experimentName, dataDirectory, daqInfo={}): def upload(self, experimentName, dataDirectory, daqInfo={}):
'''
Upload files from the given directory. Only files found at the time
when command was issued will be uploaded to storage.
:param experimentName: experiment name
:type experimentName: str
:param dataDirectory: data directory URL
:type dataDirectory: str
:param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition or processing that generated files that will be uploaded; several keys have special meaning:
- *processHiddenFiles* (bool): if set to True, hidden files will be processed
- *reprocessFiles* (bool): if set to True, files will be uploaded regardless of whether or not they already exist in storage and have not changed
- *targetDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
- *processingMode* (str): specifies processing mode, and can be set to "files" (service plugins process individual files one at a time) or "directory" (service plugins process entire directory at once; works faster for uploads of a large number of small files)
- *skipPlugins* (str): relevant for the "directory" processing mode; comma-separated list of plugins which should not process the given directory
:type daqInfo: dict
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid input arguments
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: in case experiment does not exist
:raises DmException: in case of any other errors
>>> uploadInfo = api.upload(experimentName='test01', dataDirectory='/home/dm/test', daqInfo={'sample' : 'A1'})
>>> uploadId = uploadInfo.get('id')
'''
if not experimentName: if not experimentName:
raise InvalidRequest('Experiment name must be provided.') raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory: if not dataDirectory:
...@@ -135,30 +222,100 @@ class ExperimentDaqApi(DaqRestApi): ...@@ -135,30 +222,100 @@ class ExperimentDaqApi(DaqRestApi):
@DaqRestApi.execute @DaqRestApi.execute
def getUploadInfo(self, id): def getUploadInfo(self, id):
'''
Get upload details.
:param id: upload id
:type id: str
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known upload with a given id
:raises DmException: in case of any other errors
>>> uplaodInfo = api.getUploadInfo('84311a75-c88b-4605-8948-08257eae6f5c')
>>> uploadStatus = uploadInfo.get('status')
'''
url = '%s/experimentUploads/%s' % (self.getContextRoot(),id) url = '%s/experimentUploads/%s' % (self.getContextRoot(),id)
if not id: if not id:
raise InvalidRequest('Upload id must be provided.') raise InvalidRequest('Upload id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='GET') responseDict = self.sendSessionRequest(url=url, method='GET')
return UploadInfo(responseDict) return UploadInfo(responseDict)
@DaqRestApi.execute @DaqRestApi.execute2
def listUploads(self, status=None): def listUploads(self, status=None):
'''
Get list of known uploads.
:param status: status string; if not supplied, all uploads will be included in the returned list
:type status: str
:returns: list of UploadInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> uploadInfoList = api.listUploads(status='running')
>>> for uploadInfo in uploadInfoList:
>>> print uploadInfo['id']
'''
if not status: if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status) url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET') responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, UploadInfo) return self.toDmObjectList(responseData, UploadInfo)
@DaqRestApi.execute @DaqRestApi.execute2
def stopUpload(self, id): def stopUpload(self, id):
'''
Abort upload.
:param id: upload id
:type id: str
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known upload with a given id
:raises DmException: in case of any other errors
>>> uplaodInfo = api.stopUpload('84311a75-c88b-4605-8948-08257eae6f5c')
>>> print uploadInfo.get('nCompletedFiles')
'''
url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id) url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id)
if not id: if not id:
raise InvalidRequest('Upload id must be provided.') raise InvalidRequest('Upload id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='POST') responseDict = self.sendSessionRequest(url=url, method='POST')
return UploadInfo(responseDict) return UploadInfo(responseDict)
@DaqRestApi.execute @DaqRestApi.execute2
def getProcessingPlugins(self): def listProcessingPlugins(self):
'''
Get list of DAQ service processing plugins.
:returns: list of PluginInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> pluginInfoList = api.getProcessingPlugins()
>>> for pluginInfo in pluginInfoList:
>>> print pluginInfo['name']
'''
url = '%s/processingPlugins' % (self.getContextRoot()) url = '%s/processingPlugins' % (self.getContextRoot())
responseData = self.sendSessionRequest(url=url, method='GET') responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, PluginInfo) return self.toDmObjectList(responseData, PluginInfo)
......
...@@ -3,19 +3,19 @@ ...@@ -3,19 +3,19 @@
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from daqWebServiceSessionCli import DaqWebServiceSessionCli from daqWebServiceSessionCli import DaqWebServiceSessionCli
class GetProcessingPluginsCli(DaqWebServiceSessionCli): class ListProcessingPluginsCli(DaqWebServiceSessionCli):
def __init__(self): def __init__(self):
DaqWebServiceSessionCli.__init__(self) DaqWebServiceSessionCli.__init__(self)
def runCommand(self): def runCommand(self):
self.parseArgs(usage=""" self.parseArgs(usage="""
dm-get-processing-plugins dm-list-processing-plugins
Description: Description:
Retrieves list of known processing plugins. Retrieves list of known processing plugins.
""") """)
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
plugins = api.getProcessingPlugins() plugins = api.listProcessingPlugins()
for plugin in plugins: for plugin in plugins:
print plugin.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) print plugin.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
...@@ -23,5 +23,5 @@ Description: ...@@ -23,5 +23,5 @@ Description:
####################################################################### #######################################################################
# Run command. # Run command.
if __name__ == '__main__': if __name__ == '__main__':
cli = GetProcessingPluginsCli() cli = ListProcessingPluginsCli()
cli.run() cli.run()
...@@ -36,8 +36,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -36,8 +36,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
UPLOAD_CHUNK_SIZE_IN_FILES = 100 UPLOAD_CHUNK_SIZE_IN_FILES = 100
UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0 UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0
DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0 DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0
#SECONDS_PER_HOUR = 60*60 SECONDS_PER_HOUR = 60*60
SECONDS_PER_HOUR = 60
def __init__(self): def __init__(self):
DmObjectManager.__init__(self) DmObjectManager.__init__(self)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment