Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 1133 additions and 0 deletions
#!/usr/bin/env python
from dm.common.api.dmRestApi import DmRestApi
from dm.common.utility.configurationManager import ConfigurationManager
class DaqRestApi(DmRestApi):
""" Base DAQ DM REST api class. """
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
if host == None:
host = ConfigurationManager.getInstance().getDaqWebServiceHost()
if port == None:
port = ConfigurationManager.getInstance().getDaqWebServicePort()
DmRestApi.__init__(self, username, password, host, port, protocol)
#######################################################################
# Testing.
if __name__ == '__main__':
api = DaqRestApi('sveseli', 'sveseli')
#api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list', 'GET', data='')
import urllib
from dm.common.utility.configurationManager import ConfigurationManager
cm = ConfigurationManager.getInstance()
cm.setSessionCacheFile('/tmp/session')
#print 'Non-session request'
#print api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list?path=/tmp', 'GET')
print 'Session request'
data = { 'path' : '/tmp/xyz' }
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write?path=/tmp/xyz&content=xyz', 'POST', contentType='application/x-www-form-urlencoded', data=urllib.urlencode(data))
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', data=data)
postdata='path=/tmp/xyz'
postdata+='&content=%s' % urllib.quote_plus('Hey there')
print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', contentType='application/x-www-form-urlencoded', data=postdata)
#!/usr/bin/env python
import os
import json
import urllib
from dm.common.constants import dmProcessingStatus
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.objects.experiment import Experiment
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.daqInfo import DaqInfo
from daqRestApi import DaqRestApi
class ExperimentDaqApi(DaqRestApi):
'''
Data Management API for accessing experiment interface provided by the
DAQ service on a DM experiment station.
'''
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
'''
Constructor.
:param username: DM username
:type username: str
:param password: DM password
:type password: str
:param host: DM service host
:type host: str
:param port: DM service port
:type port: int
:param protocol: DM service protocol
:type protocol: str
>>> api = ExperimentDaqApi(username='dm', password='XYZ', host='bluegill1', port=33336, protocol='https')
'''
DaqRestApi.__init__(self, username, password, host, port, protocol)
@DaqRestApi.execute2
def startDaq(self, experimentName, dataDirectory, daqInfo={}):
'''
Start data acquisition (real-time directory monitoring and file upload).
Only files created or modified after this command was issued will be
uploaded to storage.
:param experimentName: experiment name
:type experimentName: str
:param dataDirectory: data directory URL
:type dataDirectory: str
:param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition; several keys have special meaning:
- *processHiddenFiles* (bool): if set to True, hidden files will be processed
- *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
- *maxRunTimeInHours* (int): specifies maximum data acquisition run time in hours
- *uploadDataDirectoryOnExit* (str): specifies URL of the data directory that should be uploaded after data acquisition completes
- *uploadDestDirectoryOnExit* (str): specifies directory path relative to experiment root directory where uploaded files should be stored
- *skipPlugins* (str): comma-separated list of plugins which should not process files
:type daqInfo: dict
:returns: DaqInfo object
:raises InvalidRequest: in case of empty experiment name or data directory
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: in case experiment does not exist
:raises DmException: in case of any other errors
>>> daqInfo = api.startDaq(experimentName='test01', dataDirectory='/home/dm/test')
>>> daqId = daqInfo.get('id')
>>> daqStatus = daqInfo.get('status')
'''
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/startDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return DaqInfo(responseDict)
@DaqRestApi.execute2
def stopDaq(self, experimentName, dataDirectory):
'''
Stop data acquisition (real-time directory monitoring and file upload).
:param experimentName: experiment name
:type experimentName: str
:param dataDirectory: data directory URL
:type dataDirectory: str
:returns: DaqInfo object
:raises InvalidRequest: in case of empty experiment name or data directory
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: in case experiment does not exist, or if there are no active DAQs for a given experiment and data directory
:raises DmException: in case of any other errors
>>> daqInfo = api.stopDaq(experimentName='test01', dataDirectory='/home/dm/test')
>>> daqStatus = daqInfo.get('status')
'''
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/stopDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
responseDict = self.sendSessionRequest(url=url, method='POST')
return DaqInfo(responseDict)
@DaqRestApi.execute2
def getDaqInfo(self, id):
'''
Get data acquisition details.
:param id: data acquisition id
:type id: str
:returns: DaqInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known DAQ with a given id
:raises DmException: in case of any other errors
>>> daqInfo = api.getDaqInfo('84311a75-c88b-4605-8948-08257eae6f5c')
>>> daqStatus = daqInfo.get('status')
'''
if not id:
raise InvalidRequest('Daq id must be provided.')
url = '%s/experimentDaqs/%s' % (self.getContextRoot(),id)
responseDict = self.sendSessionRequest(url=url, method='GET')
return DaqInfo(responseDict)
@DaqRestApi.execute2
def listDaqs(self, status=None):
'''
Get list of known DAQs.
:param status: status string; if not supplied, all DAQs will be included in the returned list
:type status: str
:returns: list of DaqInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> daqInfoList = api.listDaqs()
>>> for daqInfo in daqInfoList:
>>> print daqInfo['id'], daqInfo['status']
'''
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, DaqInfo)
@DaqRestApi.execute2
def upload(self, experimentName, dataDirectory, daqInfo={}):
'''
Upload files from the given directory. Only files found at the time
when command was issued will be uploaded to storage.
:param experimentName: experiment name
:type experimentName: str
:param dataDirectory: data directory URL
:type dataDirectory: str
:param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition or processing that generated files that will be uploaded; several keys have special meaning:
- *processHiddenFiles* (bool): if set to True, hidden files will be processed
- *reprocessFiles* (bool): if set to True, files will be uploaded regardless of whether or not they already exist in storage and have not changed
- *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
- *processingMode* (str): specifies processing mode, and can be set to "files" (service plugins process individual files one at a time) or "directory" (service plugins process entire directory at once; works faster for uploads of a large number of small files)
- *skipPlugins* (str): comma-separated list of plugins which should not process files
:type daqInfo: dict
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid input arguments
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: in case experiment does not exist
:raises DmException: in case of any other errors
>>> uploadInfo = api.upload(experimentName='test01', dataDirectory='/home/dm/test', daqInfo={'sample' : 'A1'})
>>> uploadId = uploadInfo.get('id')
'''
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/upload/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return UploadInfo(responseDict)
@DaqRestApi.execute
def getUploadInfo(self, id):
'''
Get upload details.
:param id: upload id
:type id: str
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known upload with a given id
:raises DmException: in case of any other errors
>>> uploadInfo = api.getUploadInfo('84311a75-c88b-4605-8948-08257eae6f5c')
>>> uploadStatus = uploadInfo.get('status')
'''
url = '%s/experimentUploads/%s' % (self.getContextRoot(),id)
if not id:
raise InvalidRequest('Upload id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='GET')
return UploadInfo(responseDict)
@DaqRestApi.execute2
def listUploads(self, status=None):
'''
Get list of known uploads.
:param status: status string; if not supplied, all uploads will be included in the returned list
:type status: str
:returns: list of UploadInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> uploadInfoList = api.listUploads(status='running')
>>> for uploadInfo in uploadInfoList:
>>> print uploadInfo['id']
'''
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, UploadInfo)
@DaqRestApi.execute2
def stopUpload(self, id):
'''
Abort upload.
:param id: upload id
:type id: str
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known upload with a given id
:raises DmException: in case of any other errors
>>> uploadInfo = api.stopUpload('84311a75-c88b-4605-8948-08257eae6f5c')
>>> print uploadInfo.get('nCompletedFiles')
'''
url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id)
if not id:
raise InvalidRequest('Upload id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='POST')
return UploadInfo(responseDict)
@DaqRestApi.execute2
def listProcessingPlugins(self):
'''
Get list of DAQ service processing plugins.
:returns: list of PluginInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> pluginInfoList = api.getProcessingPlugins()
>>> for pluginInfo in pluginInfoList:
>>> print pluginInfo['name']
'''
url = '%s/processingPlugins' % (self.getContextRoot())
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, PluginInfo)
#######################################################################
# Testing.
if __name__ == '__main__':
api = ExperimentDaqApi('sveseli', 'sveseli', 'zagreb.svdev.net', 33336, 'http')
print api.startDaq('experiment1', '/tmp/data/experiment1')
#!/usr/bin/env python
from dm.common.cli.dmRestCli import DmRestCli
from dm.common.utility.configurationManager import ConfigurationManager
class DaqWebServiceCli(DmRestCli):
""" DM DAQ web service cli class. """
def __init__(self, validArgCount=0):
DmRestCli.__init__(self, validArgCount)
def getDefaultServiceHost(self):
return ConfigurationManager.getInstance().getDaqWebServiceHost()
def getDefaultServicePort(self):
return ConfigurationManager.getInstance().getDaqWebServicePort()
#!/usr/bin/env python
import os
from dm.common.cli.dmRestSessionCli import DmRestSessionCli
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.configurationManager import ConfigurationManager
class DaqWebServiceSessionCli(DmRestSessionCli):
""" DM DAQ web service session cli class. """
DEFAULT_SESSION_CACHE_FILE = OsUtility.getUserHomeDir() + '/.dm/.daq.session.cache'
def __init__(self, validArgCount=0):
DmRestSessionCli.__init__(self, validArgCount)
ConfigurationManager.getInstance().setSessionCacheFile(DaqWebServiceSessionCli.DEFAULT_SESSION_CACHE_FILE)
def getDefaultServiceHost(self):
return ConfigurationManager.getInstance().getDaqWebServiceHost()
def getDefaultServicePort(self):
return ConfigurationManager.getInstance().getDaqWebServicePort()
def getId(self):
return self.options.id
def getExperimentName(self):
return self.options.experimentName
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class GetDaqInfoCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--id', dest='id', help='Daq id.')
def checkArgs(self):
if self.options.id is None:
raise InvalidRequest('Daq id must be provided.')
def getId(self):
return self.options.id
def runCommand(self):
self.parseArgs(usage="""
dm-get-daq-info --id=ID
Description:
Retrieves detailed information for the specified data acquisition.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = api.getDaqInfo(self.getId())
print daqInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = GetDaqInfoCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class GetUploadInfoCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--id', dest='id', help='Upload id.')
def checkArgs(self):
if self.options.id is None:
raise InvalidRequest('Upload id must be provided.')
def getId(self):
return self.options.id
def runCommand(self):
self.parseArgs(usage="""
dm-get-upload-info --id=ID
Description:
Retrieves detailed information for the specified data upload id.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploadInfo = api.getUploadInfo(self.getId())
print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = GetUploadInfoCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListDaqsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-daqs [--status=STATUS]
Description:
Retrieves all known daqs.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqs = api.listDaqs(self.getStatus())
for daq in daqs:
print daq.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListDaqsCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class ListProcessingPluginsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self)
def runCommand(self):
self.parseArgs(usage="""
dm-list-processing-plugins
Description:
Retrieves list of known processing plugins.
""")
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
plugins = api.listProcessingPlugins()
for plugin in plugins:
print plugin.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListProcessingPluginsCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListUploadsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-uploads [--status=STATUS]
Description:
Retrieves all known uploads.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploads = api.listUploads(self.getStatus())
for upload in uploads:
print upload.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListUploadsCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StartDaqCli(DaqWebServiceSessionCli):
HOURS_PER_DAY = 24
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory URL.')
self.addOption('', '--dest-directory', dest='destDirectory',
help='Destination directory relative to experiment root path.')
self.addOption('', '--duration', dest='duration', help='DAQ duration; it must be specified in hours (h) or days (d). Examples: "8h", "14d".')
self.addOption('', '--upload-data-directory-on-exit', dest='uploadDataDirectoryOnExit', help='Data directory that will be uploaded automatically after DAQ is stopped.')
self.addOption('', '--upload-dest-directory-on-exit', dest='uploadDestDirectoryOnExit', help='Destination directory relative to experiment root path for automatic upload after DAQ is stopped. Requires upload data directory to be specified.')
self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
self.addOption('', '--skip-plugins', dest='skipPlugins', help='Comma-separated list of plugins which should not process files.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def updateDaqInfoFromOptions(self, daqInfo):
if self.options.processHidden:
daqInfo['processHiddenFiles'] = True
if self.options.skipPlugins:
daqInfo['skipPlugins'] = self.options.skipPlugins
if self.options.duration:
duration = self.options.duration
if duration.endswith('h'):
daqInfo['maxRunTimeInHours'] = int(duration[0:-1])
elif duration.endswith('d'):
daqInfo['maxRunTimeInHours'] = int(duration[0:-1])*self.HOURS_PER_DAY
else:
raise InvalidRequest('Maximum run time must contain valid unit specifier: "h" for hours or "d" for days.')
if self.options.destDirectory:
daqInfo['destDirectory'] = self.options.destDirectory
if self.options.uploadDataDirectoryOnExit:
daqInfo['uploadDataDirectoryOnExit'] = self.options.uploadDataDirectoryOnExit
if self.options.uploadDestDirectoryOnExit:
if not self.options.uploadDataDirectoryOnExit:
raise InvalidRequest('Upload destination directory on exit requires that upload data directory is specified as well.')
daqInfo['uploadDestDirectoryOnExit'] = self.options.uploadDestDirectoryOnExit
def runCommand(self):
self.parseArgs(usage="""
dm-start-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[--duration=DURATION]
[--dest-directory=DESTDIRECTORY]
[--upload-data-directory-on-exit=UPLOADDATADIRECTORYONEXIT]
[--upload-dest-directory-on-exit=UPLOADDESTDIRECTORYONEXIT]
[--process-hidden]
[key1:value1, key2:value2, ...]
Description:
Starts DAQ for a given experiment. Specified data directory will be
monitored for data files. All provided key/value pairs will be passed to
file processing plugins.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = self.splitArgsIntoDict()
self.updateDaqInfoFromOptions(daqInfo)
daqInfo = api.startDaq(self.getExperimentName(), self.getDataDirectory(), daqInfo=daqInfo)
print daqInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StartDaqCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StopDaqCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory URL.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def runCommand(self):
self.parseArgs(usage="""
dm-stop-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
Description:
Stop DAQ for a given experiment and data directory.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = api.stopDaq(self.getExperimentName(), self.getDataDirectory())
print daqInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StopDaqCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class StopUploadCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--id', dest='id', help='Upload id.')
def checkArgs(self):
if self.options.id is None:
raise InvalidRequest('Upload id must be provided.')
def getId(self):
return self.options.id
def runCommand(self):
self.parseArgs(usage="""
dm-stop-upload --id=ID
Description:
Aborts specified data upload.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploadInfo = api.stopUpload(self.getId())
print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StopUploadCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.constants import dmProcessingMode
class UploadCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory URL.')
self.addOption('', '--dest-directory', dest='destDirectory', help='Destination directory relative to experiment root path.')
self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.')
self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
self.addOption('', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY))
self.addOption('', '--skip-plugins', dest='skipPlugins', help='Comma-separated list of plugins which should not process files.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
if self.options.processingMode not in dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST:
raise InvalidRequest('Processing mode must be one of %s.' % dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST)
def updateDaqInfoFromOptions(self, daqInfo):
if self.options.reprocess:
daqInfo['reprocessFiles'] = True
if self.options.processHidden:
daqInfo['processHiddenFiles'] = True
if self.options.skipPlugins:
daqInfo['skipPlugins'] = self.options.skipPlugins
daqInfo['processingMode'] = self.options.processingMode
if self.options.destDirectory:
daqInfo['destDirectory'] = self.options.destDirectory
def runCommand(self):
self.parseArgs(usage="""
dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[--dest-directory=DESTDIRECTORY]
[--reprocess]
[--process-hidden]
[--processing-mode=PROCESSINGMODE]
[--skip-plugins=SKIPPLUGINS]
[key1:value1, key2:value2, ...]
Description:
Schedules data upload for a given experiment. All existing files in the
specified directory will be uploaded to storage. Relative directory
structure will be preserved. All provided key/value pairs will be passed
to file processing plugins.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = self.splitArgsIntoDict()
self.updateDaqInfoFromOptions(daqInfo)
uploadInfo = api.upload(self.getExperimentName(), self.getDataDirectory(), daqInfo=daqInfo)
print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = UploadCli()
cli.run()
#!/usr/bin/env python
#
# DM DAQ Web Service
#
from dm.common.service.dmRestWebServiceBase import DmRestWebServiceBase
from dm.common.utility.dmModuleManager import DmModuleManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.daq_web_service.service.impl.daqProcessingCompleteNotificationPlugin import DaqProcessingCompleteNotificationPlugin
from dm.daq_web_service.service.impl.fileSystemObserver import FileSystemObserver
from daqWebServiceRouteMapper import DaqWebServiceRouteMapper
class DaqWebService(DmRestWebServiceBase):
def __init__(self):
DmRestWebServiceBase.__init__(self, DaqWebServiceRouteMapper)
def initDmModules(self):
self.logger.debug('Initializing dm modules')
# Add modules that will be started.
moduleManager = DmModuleManager.getInstance()
moduleManager.addModule(FileSystemObserver.getInstance())
moduleManager.addModule(FileProcessingManager.getInstance())
# Requred processing plugin
#notificationPlugin = DaqProcessingCompleteNotificationPlugin()
#FileProcessingManager.getInstance().appendFileProcessor(notificationPlugin)
self.logger.debug('Initialized dm modules')
def getDefaultServerHost(self):
return ConfigurationManager.getInstance().getServiceHost()
def getDefaultServerPort(self):
return ConfigurationManager.getInstance().getServicePort()
####################################################################
# Run service
if __name__ == '__main__':
ConfigurationManager.getInstance().setServiceName('daq-web-service')
service = DaqWebService();
service.run()
#!/usr/bin/env python
#
# Route mapper for DM DAQ web service.
#
import sys
import os
import cherrypy
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.service.loginRouteDescriptor import LoginRouteDescriptor
from experimentRouteDescriptor import ExperimentRouteDescriptor
class DaqWebServiceRouteMapper:
@classmethod
def setupRoutes(cls):
""" Setup RESTFul routes. """
logger = LoggingManager.getInstance().getLogger(cls.__name__)
contextRoot = ConfigurationManager.getInstance().getContextRoot()
logger.debug('Using context root: %s' % contextRoot)
# Get routes.
routes = LoginRouteDescriptor.getRoutes()
routes += ExperimentRouteDescriptor.getRoutes()
# Add routes to dispatcher.
d = cherrypy.dispatch.RoutesDispatcher()
for route in routes:
logger.debug('Connecting route: %s' % route)
d.connect(route['name'], route['path'], action=route['action'], controller=route['controller'], conditions=dict(method=route['method']))
return d
#!/usr/bin/env python
#
# User route descriptor.
#
from dm.common.utility.configurationManager import ConfigurationManager
from experimentSessionController import ExperimentSessionController
class ExperimentRouteDescriptor:
@classmethod
def getRoutes(cls):
contextRoot = ConfigurationManager.getInstance().getContextRoot()
# Static instances shared between different routes
experimentSessionController = ExperimentSessionController()
# Define routes.
routes = [
# Start experiment daq
{
'name' : 'startDaq',
'path' : '%s/experimentsByName/:(experimentName)/startDaq/:(dataDirectory)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'startDaq',
'method' : ['POST']
},
# Stop experiment daq
{
'name' : 'stopDaq',
'path' : '%s/experimentsByName/:(experimentName)/stopDaq/:(dataDirectory)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'stopDaq',
'method' : ['POST']
},
# Get daq info
{
'name' : 'getDaqInfo',
'path' : '%s/experimentDaqs/:(id)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'getDaqInfo',
'method' : ['GET']
},
# List DAQs
{
'name' : 'listDaqs',
'path' : '%s/experimentDaqsByStatus/:(status)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'listDaqs',
'method' : ['GET']
},
# Upload experiment data
{
'name' : 'upload',
'path' : '%s/experimentsByName/:(experimentName)/upload/:(dataDirectory)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'upload',
'method' : ['POST']
},
# Get upload info
{
'name' : 'getUploadInfo',
'path' : '%s/experimentUploads/:(id)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'getUploadInfo',
'method' : ['GET']
},
# List uploads
{
'name' : 'listUploads',
'path' : '%s/experimentUploadsByStatus/:(status)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'listUploads',
'method' : ['GET']
},
# Stop upload
{
'name' : 'stopUpload',
'controller' : experimentSessionController,
'path' : '%s/experimentUploads/stopUpload/:(id)' % contextRoot,
'action' : 'stopUpload',
'method' : ['POST']
},
# Get processing plugins
{
'name' : 'getProcessingPlugins',
'path' : '%s/processingPlugins' % contextRoot,
'controller' : experimentSessionController,
'action' : 'getProcessingPlugins',
'method' : ['GET']
},
]
return routes
#!/usr/bin/env python
import cherrypy
import json
import os
from dm.common.constants import dmProcessingMode
from dm.common.constants import dmProcessingStatus
from dm.common.service.dmSessionController import DmSessionController
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.authorizationError import AuthorizationError
from dm.common.utility.encoder import Encoder
from dm.common.utility.dictUtility import DictUtility
from dm.daq_web_service.service.impl.experimentSessionControllerImpl import ExperimentSessionControllerImpl
class ExperimentSessionController(DmSessionController):
def __init__(self):
DmSessionController.__init__(self)
self.experimentSessionControllerImpl = ExperimentSessionControllerImpl()
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def startDaq(self, experimentName, dataDirectory, **kwargs):
if not experimentName:
raise InvalidRequest('Missing experiment name.')
experimentName = Encoder.decode(experimentName)
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
if not dataDirectory.startswith('/') and not dataDirectory.count('://'):
raise InvalidRequest('Data directory must be an absolute path.')
daqInfo = {}
encodedDaqInfo = kwargs.get('daqInfo')
if encodedDaqInfo is not None:
daqInfo = json.loads(Encoder.decode(encodedDaqInfo))
response = self.experimentSessionControllerImpl.startDaq(experimentName, dataDirectory, daqInfo).getFullJsonRep()
self.logger.debug('Started DAQ: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def stopDaq(self, experimentName, dataDirectory, **kwargs):
if not experimentName:
raise InvalidRequest('Missing experiment name.')
experimentName = Encoder.decode(experimentName)
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
response = self.experimentSessionControllerImpl.stopDaq(experimentName, dataDirectory).getFullJsonRep()
self.logger.debug('Stopped DAQ: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def getDaqInfo(self, id, **kwargs):
response = self.experimentSessionControllerImpl.getDaqInfo(id).getFullJsonRep()
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def listDaqs(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, **kwargs):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
if status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Invalid processing status "%s". Status must be one of %s.' % (status,dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST))
return self.listToJson(self.experimentSessionControllerImpl.listDaqs(status))
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def upload(self, experimentName, dataDirectory, **kwargs):
if not experimentName:
raise InvalidRequest('Missing experiment name.')
experimentName = Encoder.decode(experimentName)
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
if not dataDirectory.startswith('/') and not dataDirectory.count('://'):
raise InvalidRequest('Data directory must be an absolute path: %s' % dataDirectory)
daqInfo = {}
encodedDaqInfo = kwargs.get('daqInfo')
if encodedDaqInfo:
daqInfo = json.loads(Encoder.decode(encodedDaqInfo))
processingMode = DictUtility.getAndRemoveKey(daqInfo, 'processingMode', dmProcessingMode.DM_PROCESSING_MODE_FILES)
if processingMode not in dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST:
raise InvalidRequest('Allowed processing modes: %s' % dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST)
if processingMode == dmProcessingMode.DM_PROCESSING_MODE_FILES:
response = self.experimentSessionControllerImpl.uploadFiles(experimentName, dataDirectory, daqInfo).getFullJsonRep()
else:
response = self.experimentSessionControllerImpl.uploadDirectory(experimentName, dataDirectory, daqInfo).getFullJsonRep()
self.logger.debug('Returning upload info for directory %s' % dataDirectory)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def getUploadInfo(self, id, **kwargs):
response = self.experimentSessionControllerImpl.getUploadInfo(id).getFullJsonRep()
self.logger.debug('Returning info for upload id %s' % id)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def listUploads(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, **kwargs):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
if status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Invalid processing status "%s". Status must be one of %s.' % (status,dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST))
return self.listToJson(self.experimentSessionControllerImpl.listUploads(status))
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def stopUpload(self, id, **kwargs):
response = self.experimentSessionControllerImpl.stopUpload(id).getFullJsonRep()
self.logger.debug('Stopped upload id %s' % id)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def getProcessingPlugins(self, **kwargs):
return self.listToJson(self.experimentSessionControllerImpl.getProcessingPlugins())