Skip to content
Snippets Groups Projects
Forked from DM / dm-docs
261 commits behind, 839 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
experimentDaqApi.py 12.48 KiB
#!/usr/bin/env python

import os
import json
import urllib

from dm.common.constants import dmProcessingStatus
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.objects.experiment import Experiment
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.daqInfo import DaqInfo
from daqRestApi import DaqRestApi

class ExperimentDaqApi(DaqRestApi):
    ''' 
    Data Management API for accessing experiment interface provided by the
    DAQ service on a DM experiment station.
    '''
    
    def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
        ''' 
        Constructor.

        :param username: DM username
        :type username: str

        :param password: DM password
        :type password: str

        :param host: DM service host
        :type host: str

        :param port: DM service port
        :type port: int

        :param protocol: DM service protocol
        :type protocol: str

        >>> api = ExperimentDaqApi(username='dm', password='XYZ', host='bluegill1', port=22236, protocol='https')
        '''

        DaqRestApi.__init__(self, username, password, host, port, protocol)

    @DaqRestApi.execute2
    def startDaq(self, experimentName, dataDirectory, daqInfo={}):
        '''
        Start data acquisition (real-time directory monitoring and file upload).
        Only files created or modified after this command was issued will be 
        uploaded to storage.

        :param experimentName: experiment name
        :type experimentName: str

        :param dataDirectory: data directory URL
        :type dataDirectory: str

        :param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition; several keys have special meaning:

           - *processHiddenFiles* (bool): if set to True, hidden files will be processed
           - *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
           - *maxRunTimeInHours* (int): specifies maximum data acquisition run time in hours
           - *uploadDataDirectoryOnExit* (str): specifies URL of the data directory that should be uploaded after data acquisition completes
           - *uploadDestDirectoryOnExit* (str): specifies directory path relative to experiment root directory where uploaded files should be stored
           - *skipPlugins* (str): comma-separated list of plugins which should not process files

        :type daqInfo: dict

        :returns: DaqInfo object

        :raises InvalidRequest: in case of empty experiment name or data directory

        :raises AuthorizationError: in case user is not authorized to manage DM station

        :raises ObjectNotFound: in case experiment does not exist

        :raises DmException: in case of any other errors

        >>> daqInfo = api.startDaq(experimentName='test01', dataDirectory='/home/dm/test')
        >>> daqId = daqInfo.get('id')
        >>> daqStatus = daqInfo.get('status')
        '''

        if not experimentName:
            raise InvalidRequest('Experiment name must be provided.')
        if not dataDirectory:
            raise InvalidRequest('Experiment data directory must be provided.')
        url = '%s/experimentsByName/%s/startDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
        url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
        responseDict = self.sendSessionRequest(url=url, method='POST')
        return DaqInfo(responseDict)

    @DaqRestApi.execute2
    def stopDaq(self, experimentName, dataDirectory):
        '''
        Stop data acquisition (real-time directory monitoring and file upload).

        :param experimentName: experiment name
        :type experimentName: str

        :param dataDirectory: data directory URL
        :type dataDirectory: str

        :returns: DaqInfo object

        :raises InvalidRequest: in case of empty experiment name or data directory

        :raises AuthorizationError: in case user is not authorized to manage DM station

        :raises ObjectNotFound: in case experiment does not exist, or if there are no active DAQs for a given experiment and data directory

        :raises DmException: in case of any other errors

        >>> daqInfo = api.stopDaq(experimentName='test01', dataDirectory='/home/dm/test')
        >>> daqStatus = daqInfo.get('status')
        '''

        if not experimentName:
            raise InvalidRequest('Experiment name must be provided.')
        if not dataDirectory:
            raise InvalidRequest('Experiment data directory must be provided.')
        url = '%s/experimentsByName/%s/stopDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
        responseDict = self.sendSessionRequest(url=url, method='POST')
        return DaqInfo(responseDict)

    @DaqRestApi.execute2
    def getDaqInfo(self, id):
        '''
        Get data acquisition details.

        :param id: data acquisition id
        :type id: str

        :returns: DaqInfo object

        :raises InvalidRequest: in case of invalid (empty or None) id

        :raises AuthorizationError: in case user is not authorized to manage DM station

        :raises ObjectNotFound: if there is no known DAQ with a given id

        :raises DmException: in case of any other errors

        >>> daqInfo = api.getDaqInfo('84311a75-c88b-4605-8948-08257eae6f5c')
        >>> daqStatus = daqInfo.get('status')
        '''

        if not id:
            raise InvalidRequest('Daq id must be provided.')
        url = '%s/experimentDaqs/%s' % (self.getContextRoot(),id)
        responseDict = self.sendSessionRequest(url=url, method='GET')
        return DaqInfo(responseDict)

    @DaqRestApi.execute2
    def listDaqs(self, status=None):
        '''
        Get list of known DAQs.

        :param status: status string; if not supplied, all DAQs will be included in the returned list
        :type status: str

        :returns: list of DaqInfo objects

        :raises AuthorizationError: in case user is not authorized to manage DM station

        :raises DmException: in case of any other errors

        >>> daqInfoList = api.listDaqs()
        >>> for daqInfo in daqInfoList:
        >>>     print daqInfo['id'], daqInfo['status']
        '''

        if not status:
            status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
        url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status)
        responseData = self.sendSessionRequest(url=url, method='GET')
        return self.toDmObjectList(responseData, DaqInfo)

    @DaqRestApi.execute2
    def upload(self, experimentName, dataDirectory, daqInfo={}):
        '''
        Upload files from the given directory. Only files found at the time
        when command was issued will be uploaded to storage.

        :param experimentName: experiment name
        :type experimentName: str

        :param dataDirectory: data directory URL
        :type dataDirectory: str

        :param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition or processing that generated files that will be uploaded; several keys have special meaning:

           - *processHiddenFiles* (bool): if set to True, hidden files will be processed
           - *reprocessFiles* (bool): if set to True, files will be uploaded regardless of whether or not they already exist in storage and have not changed
           - *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
           - *processingMode* (str): specifies processing mode, and can be set to "files" (service plugins process individual files one at a time) or "directory" (service plugins process entire directory at once; works faster for uploads of a large number of small files)
           - *skipPlugins* (str): comma-separated list of plugins which should not process files
        :type daqInfo: dict

        :returns: UploadInfo object

        :raises InvalidRequest: in case of invalid input arguments

        :raises AuthorizationError: in case user is not authorized to manage DM station

        :raises ObjectNotFound: in case experiment does not exist

        :raises DmException: in case of any other errors

        >>> uploadInfo = api.upload(experimentName='test01', dataDirectory='/home/dm/test', daqInfo={'sample' : 'A1'})
        >>> uploadId = uploadInfo.get('id')
        '''

        if not experimentName:
            raise InvalidRequest('Experiment name must be provided.')
        if not dataDirectory:
            raise InvalidRequest('Experiment data directory must be provided.')
        url = '%s/experimentsByName/%s/upload/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
        url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
        responseDict = self.sendSessionRequest(url=url, method='POST')
        return UploadInfo(responseDict)

    @DaqRestApi.execute
    def getUploadInfo(self, id):
        '''
        Get upload details.

        :param id: upload id
        :type id: str

        :returns: UploadInfo object

        :raises InvalidRequest: in case of invalid (empty or None) id

        :raises AuthorizationError: in case user is not authorized to manage DM station

        :raises ObjectNotFound: if there is no known upload with a given id

        :raises DmException: in case of any other errors

        >>> uplaodInfo = api.getUploadInfo('84311a75-c88b-4605-8948-08257eae6f5c')
        >>> uploadStatus = uploadInfo.get('status')
        '''

        url = '%s/experimentUploads/%s' % (self.getContextRoot(),id)
        if not id:
            raise InvalidRequest('Upload id must be provided.')
        responseDict = self.sendSessionRequest(url=url, method='GET')
        return UploadInfo(responseDict)

    @DaqRestApi.execute2
    def listUploads(self, status=None):
        '''
        Get list of known uploads.

        :param status: status string; if not supplied, all uploads will be included in the returned list
        :type status: str

        :returns: list of UploadInfo objects

        :raises AuthorizationError: in case user is not authorized to manage DM station

        :raises DmException: in case of any other errors

        >>> uploadInfoList = api.listUploads(status='running')
        >>> for uploadInfo in uploadInfoList:
        >>>     print uploadInfo['id']
        '''

        if not status:
            status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
        url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status)
        responseData = self.sendSessionRequest(url=url, method='GET')
        return self.toDmObjectList(responseData, UploadInfo)

    @DaqRestApi.execute2
    def stopUpload(self, id):
        '''
        Abort upload.

        :param id: upload id
        :type id: str

        :returns: UploadInfo object

        :raises InvalidRequest: in case of invalid (empty or None) id

        :raises AuthorizationError: in case user is not authorized to manage DM station

        :raises ObjectNotFound: if there is no known upload with a given id

        :raises DmException: in case of any other errors

        >>> uplaodInfo = api.stopUpload('84311a75-c88b-4605-8948-08257eae6f5c')
        >>> print uploadInfo.get('nCompletedFiles')
        '''
        url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id)
        if not id:
            raise InvalidRequest('Upload id must be provided.')
        responseDict = self.sendSessionRequest(url=url, method='POST')
        return UploadInfo(responseDict)

    @DaqRestApi.execute2
    def listProcessingPlugins(self):
        '''
        Get list of DAQ service processing plugins.

        :returns: list of PluginInfo objects

        :raises AuthorizationError: in case user is not authorized to manage DM station

        :raises DmException: in case of any other errors

        >>> pluginInfoList = api.getProcessingPlugins()
        >>> for pluginInfo in pluginInfoList:
        >>>     print pluginInfo['name']
        '''

        url = '%s/processingPlugins' % (self.getContextRoot())
        responseData = self.sendSessionRequest(url=url, method='GET')
        return self.toDmObjectList(responseData, PluginInfo)

#######################################################################
# Testing.

if __name__ == '__main__':
    api = ExperimentDaqApi('sveseli', 'sveseli', 'zagreb.svdev.net', 33336, 'http')
    print api.startDaq('experiment1', '/tmp/data/experiment1')