#!/usr/bin/env python import os import json import urllib from dm.common.constants import dmProcessingStatus from dm.common.utility.encoder import Encoder from dm.common.exceptions.dmException import DmException from dm.common.objects.experiment import Experiment from dm.common.objects.uploadInfo import UploadInfo from dm.common.objects.pluginInfo import PluginInfo from dm.common.objects.daqInfo import DaqInfo from daqRestApi import DaqRestApi class ExperimentDaqApi(DaqRestApi): ''' Data Management API for accessing experiment interface provided by the DAQ service on a DM experiment station. ''' def __init__(self, username=None, password=None, host=None, port=None, protocol=None): ''' Constructor. :param username: DM username :type username: str :param password: DM password :type password: str :param host: DM service host :type host: str :param port: DM service port :type port: int :param protocol: DM service protocol :type protocol: str >>> api = ExperimentDaqApi(username='dm', password='XYZ', host='bluegill1', port=22236, protocol='https') ''' DaqRestApi.__init__(self, username, password, host, port, protocol) @DaqRestApi.execute2 def startDaq(self, experimentName, dataDirectory, daqInfo={}): ''' Start data acquisition (real-time directory monitoring and file upload). Only files created or modified after this command was issued will be uploaded to storage. :param experimentName: experiment name :type experimentName: str :param dataDirectory: data directory URL :type dataDirectory: str :param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition; several keys have special meaning: - *processHiddenFiles* (bool): if set to True, hidden files will be processed - *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored - *maxRunTimeInHours* (int): specifies maximum data acquisition run time in hours - *uploadDataDirectoryOnExit* (str): specifies URL of the data directory that should be uploaded after data acquisition completes - *uploadDestDirectoryOnExit* (str): specifies directory path relative to experiment root directory where uploaded files should be stored - *skipPlugins* (str): comma-separated list of plugins which should not process files :type daqInfo: dict :returns: DaqInfo object :raises InvalidRequest: in case of empty experiment name or data directory :raises AuthorizationError: in case user is not authorized to manage DM station :raises ObjectNotFound: in case experiment does not exist :raises DmException: in case of any other errors >>> daqInfo = api.startDaq(experimentName='test01', dataDirectory='/home/dm/test') >>> daqId = daqInfo.get('id') >>> daqStatus = daqInfo.get('status') ''' if not experimentName: raise InvalidRequest('Experiment name must be provided.') if not dataDirectory: raise InvalidRequest('Experiment data directory must be provided.') url = '%s/experimentsByName/%s/startDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory)) url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo))) responseDict = self.sendSessionRequest(url=url, method='POST') return DaqInfo(responseDict) @DaqRestApi.execute2 def stopDaq(self, experimentName, dataDirectory): ''' Stop data acquisition (real-time directory monitoring and file upload). :param experimentName: experiment name :type experimentName: str :param dataDirectory: data directory URL :type dataDirectory: str :returns: DaqInfo object :raises InvalidRequest: in case of empty experiment name or data directory :raises AuthorizationError: in case user is not authorized to manage DM station :raises ObjectNotFound: in case experiment does not exist, or if there are no active DAQs for a given experiment and data directory :raises DmException: in case of any other errors >>> daqInfo = api.stopDaq(experimentName='test01', dataDirectory='/home/dm/test') >>> daqStatus = daqInfo.get('status') ''' if not experimentName: raise InvalidRequest('Experiment name must be provided.') if not dataDirectory: raise InvalidRequest('Experiment data directory must be provided.') url = '%s/experimentsByName/%s/stopDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory)) responseDict = self.sendSessionRequest(url=url, method='POST') return DaqInfo(responseDict) @DaqRestApi.execute2 def getDaqInfo(self, id): ''' Get data acquisition details. :param id: data acquisition id :type id: str :returns: DaqInfo object :raises InvalidRequest: in case of invalid (empty or None) id :raises AuthorizationError: in case user is not authorized to manage DM station :raises ObjectNotFound: if there is no known DAQ with a given id :raises DmException: in case of any other errors >>> daqInfo = api.getDaqInfo('84311a75-c88b-4605-8948-08257eae6f5c') >>> daqStatus = daqInfo.get('status') ''' if not id: raise InvalidRequest('Daq id must be provided.') url = '%s/experimentDaqs/%s' % (self.getContextRoot(),id) responseDict = self.sendSessionRequest(url=url, method='GET') return DaqInfo(responseDict) @DaqRestApi.execute2 def listDaqs(self, status=None): ''' Get list of known DAQs. :param status: status string; if not supplied, all DAQs will be included in the returned list :type status: str :returns: list of DaqInfo objects :raises AuthorizationError: in case user is not authorized to manage DM station :raises DmException: in case of any other errors >>> daqInfoList = api.listDaqs() >>> for daqInfo in daqInfoList: >>> print daqInfo['id'], daqInfo['status'] ''' if not status: status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status) responseData = self.sendSessionRequest(url=url, method='GET') return self.toDmObjectList(responseData, DaqInfo) @DaqRestApi.execute2 def upload(self, experimentName, dataDirectory, daqInfo={}): ''' Upload files from the given directory. Only files found at the time when command was issued will be uploaded to storage. :param experimentName: experiment name :type experimentName: str :param dataDirectory: data directory URL :type dataDirectory: str :param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition or processing that generated files that will be uploaded; several keys have special meaning: - *processHiddenFiles* (bool): if set to True, hidden files will be processed - *reprocessFiles* (bool): if set to True, files will be uploaded regardless of whether or not they already exist in storage and have not changed - *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored - *processingMode* (str): specifies processing mode, and can be set to "files" (service plugins process individual files one at a time) or "directory" (service plugins process entire directory at once; works faster for uploads of a large number of small files) - *skipPlugins* (str): comma-separated list of plugins which should not process files :type daqInfo: dict :returns: UploadInfo object :raises InvalidRequest: in case of invalid input arguments :raises AuthorizationError: in case user is not authorized to manage DM station :raises ObjectNotFound: in case experiment does not exist :raises DmException: in case of any other errors >>> uploadInfo = api.upload(experimentName='test01', dataDirectory='/home/dm/test', daqInfo={'sample' : 'A1'}) >>> uploadId = uploadInfo.get('id') ''' if not experimentName: raise InvalidRequest('Experiment name must be provided.') if not dataDirectory: raise InvalidRequest('Experiment data directory must be provided.') url = '%s/experimentsByName/%s/upload/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory)) url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo))) responseDict = self.sendSessionRequest(url=url, method='POST') return UploadInfo(responseDict) @DaqRestApi.execute def getUploadInfo(self, id): ''' Get upload details. :param id: upload id :type id: str :returns: UploadInfo object :raises InvalidRequest: in case of invalid (empty or None) id :raises AuthorizationError: in case user is not authorized to manage DM station :raises ObjectNotFound: if there is no known upload with a given id :raises DmException: in case of any other errors >>> uplaodInfo = api.getUploadInfo('84311a75-c88b-4605-8948-08257eae6f5c') >>> uploadStatus = uploadInfo.get('status') ''' url = '%s/experimentUploads/%s' % (self.getContextRoot(),id) if not id: raise InvalidRequest('Upload id must be provided.') responseDict = self.sendSessionRequest(url=url, method='GET') return UploadInfo(responseDict) @DaqRestApi.execute2 def listUploads(self, status=None): ''' Get list of known uploads. :param status: status string; if not supplied, all uploads will be included in the returned list :type status: str :returns: list of UploadInfo objects :raises AuthorizationError: in case user is not authorized to manage DM station :raises DmException: in case of any other errors >>> uploadInfoList = api.listUploads(status='running') >>> for uploadInfo in uploadInfoList: >>> print uploadInfo['id'] ''' if not status: status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status) responseData = self.sendSessionRequest(url=url, method='GET') return self.toDmObjectList(responseData, UploadInfo) @DaqRestApi.execute2 def stopUpload(self, id): ''' Abort upload. :param id: upload id :type id: str :returns: UploadInfo object :raises InvalidRequest: in case of invalid (empty or None) id :raises AuthorizationError: in case user is not authorized to manage DM station :raises ObjectNotFound: if there is no known upload with a given id :raises DmException: in case of any other errors >>> uplaodInfo = api.stopUpload('84311a75-c88b-4605-8948-08257eae6f5c') >>> print uploadInfo.get('nCompletedFiles') ''' url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id) if not id: raise InvalidRequest('Upload id must be provided.') responseDict = self.sendSessionRequest(url=url, method='POST') return UploadInfo(responseDict) @DaqRestApi.execute2 def listProcessingPlugins(self): ''' Get list of DAQ service processing plugins. :returns: list of PluginInfo objects :raises AuthorizationError: in case user is not authorized to manage DM station :raises DmException: in case of any other errors >>> pluginInfoList = api.getProcessingPlugins() >>> for pluginInfo in pluginInfoList: >>> print pluginInfo['name'] ''' url = '%s/processingPlugins' % (self.getContextRoot()) responseData = self.sendSessionRequest(url=url, method='GET') return self.toDmObjectList(responseData, PluginInfo) ####################################################################### # Testing. if __name__ == '__main__': api = ExperimentDaqApi('sveseli', 'sveseli', 'zagreb.svdev.net', 33336, 'http') print api.startDaq('experiment1', '/tmp/data/experiment1')