Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 966 additions and 0 deletions
#!/usr/bin/env python
import ssl
class SslUtility:
DEFAULT_SSL_CONTEXT = ssl._create_default_https_context
@classmethod
def useUnverifiedSslContext(cls, func):
def wrapper(*args, **kwargs):
# Disable SSL checking
ssl._create_default_https_context = ssl._create_unverified_context
# Perform function call
result = func(*args, **kwargs)
# Revert back to original SSL settings
ssl._create_default_https_context = SslUtility.DEFAULT_SSL_CONTEXT
return result
return wrapper
#!/usr/bin/env python
import threading
class ThreadSafeQueue:
def __init__(self):
self.lock = threading.RLock()
self.queue = []
def push(self, item):
self.lock.acquire()
try:
self.queue.insert(0,item)
finally:
self.lock.release()
def pop(self):
# Return None if work queue is empty.
self.lock.acquire()
try:
item = None
if len(self.queue):
item = self.queue.pop()
return item
finally:
self.lock.release()
def getLength(self):
return len(self.queue)
def isEmpty(self):
return len(self.queue) == 0
####################################################################
# Testing
if __name__ == '__main__':
q = ThreadSafeQueue()
for i in range(0,10):
q.push(i)
print 'Added: ', i
while not q.isEmpty():
i = q.pop()
print 'Got: ', i
#!/usr/bin/env python
class ThreadingUtility:
# Assumes class has instance lock initialized
@classmethod
def synchronize(cls, func):
def synchronized(*args, **kwargs):
im_self = args[0]
im_self.lock.acquire()
try:
result = func(*args, **kwargs)
return result
finally:
im_self.lock.release()
return synchronized
#######################################################################
# Testing.
if __name__ == '__main__':
import threading
class A:
def __init__(self):
self.lock = threading.RLock()
@ThreadingUtility.synchronize
def twoX(self, x):
print 'X=', x
return 2*x
a = A()
t = a.twoX(3)
print 'Result: ', t
#!/usr/bin/env python
import threading
import time
# Uses earliest allowed processing timestamp to sort items in the queue
# Queued item will not be processed until its earliest allowed processing
# timestamp has passed
class TimeBasedProcessingQueue:
def __init__(self):
self.lock = threading.RLock()
self.queue = []
self.itemPopTimeList = []
def push(self, item, itemProcessingWaitTime=0):
self.lock.acquire()
try:
earliestPopTime = time.time() + itemProcessingWaitTime
popIndex = 0
for t in self.itemPopTimeList:
if earliestPopTime <= t:
break
popIndex += 1
self.itemPopTimeList.insert(popIndex, earliestPopTime)
self.queue.insert(popIndex,item)
finally:
self.lock.release()
def pop(self):
# Return None if work queue is empty.
self.lock.acquire()
try:
item = None
if len(self.queue):
if self.itemPopTimeList[0] <= time.time():
del self.itemPopTimeList[0]
item = self.queue[0]
del self.queue[0]
return item
finally:
self.lock.release()
def getLength(self):
return len(self.queue)
def isEmpty(self):
return len(self.queue) == 0
####################################################################
# Testing
if __name__ == '__main__':
import random
q = TimeBasedProcessingQueue()
for i in range(0,10000000):
waitTime = random.uniform(0,10)
q.push(i, waitTime)
#print 'Added: ', i, '; Processing wait: ', waitTime
print "Sleeping..."
time.sleep(60)
print "Removing..."
while not q.isEmpty():
i = q.pop()
#print 'Got: ', i
#time.sleep(1)
print "Sleeping..."
time.sleep(60)
#!/usr/bin/env python
import time
import pytz
import datetime
from tzlocal import get_localzone
from dm.common.exceptions.invalidArgument import InvalidArgument
class TimeUtility:
UTC_MINUS_LOCAL_TIME = None
@classmethod
def getCurrentGMTimestamp(cls):
""" Formats GMT timestamp. """
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(time.time()))
@classmethod
def formatGMTimestamp(cls, t):
""" Format GMT timestamp. """
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(t))
@classmethod
def getCurrentLocalTimestamp(cls):
""" Formats local timestamp. """
return time.strftime('%Y/%m/%d %H:%M:%S %Z', time.localtime(time.time()))
@classmethod
def formatLocalTimestamp(cls, t):
""" Formats local timestamp. """
return time.strftime('%Y/%m/%d %H:%M:%S %Z', time.localtime(t))
@classmethod
def toDateTime(cls, t, format):
if not t:
return None
tz = get_localzone()
try:
dt = datetime.datetime.strptime(t, format)
except Exception, ex:
raise InvalidArgument('Cannot parse input: %s' % ex)
return tz.localize(dt, is_dst=None)
@classmethod
def utcToLocalTime(cls, utc):
if cls.UTC_MINUS_LOCAL_TIME is None:
cls.UTC_MINUS_LOCAL_TIME = (datetime.datetime.utcnow()-datetime.datetime.now()).total_seconds()
if cls.UTC_MINUS_LOCAL_TIME > 0:
cls.UTC_MINUS_LOCAL_TIME = int(cls.UTC_MINUS_LOCAL_TIME+0.5)
else:
cls.UTC_MINUS_LOCAL_TIME = int(cls.UTC_MINUS_LOCAL_TIME-0.5)
localTime = utc - cls.UTC_MINUS_LOCAL_TIME
return localTime
#######################################################################
# Testing.
if __name__ == '__main__':
print TimeUtility.toDateTime('2015-01-03', '%Y-%m-%d')
dt0 = datetime.datetime.utcnow()
dt1 = datetime.datetime.now()
ts0 = time.mktime(dt0.timetuple())
ts1 = time.mktime(dt1.timetuple())
t0 = time.strftime("%Y/%m/%d %H:%M:%S", dt0.timetuple())
print 'UTC: ', t0, ts0
t1 = time.strftime("%Y/%m/%d %H:%M:%S", dt1.timetuple())
print 'LOCAL: ', t1, ts1
print 'UTC TO LOCAL: ', TimeUtility.utcToLocalTime(ts0)
#!/usr/bin/env python
#######################################################################
import socket
import pwd
import os
from logging.handlers import TimedRotatingFileHandler
#######################################################################
class TimedRotatingFileLoggingHandler(TimedRotatingFileHandler):
""" Class that enables logging into files. """
def __init__(self, filename, when='D', interval=1, backupCount=0, encoding=None):
TimedRotatingFileHandler.__init__(self, filename, when, interval, backupCount, encoding)
self.user = pwd.getpwuid(os.getuid())[0]
self.host = socket.gethostname()
def emit(self, record):
record.__dict__['user'] = self.user
record.__dict__['host'] = self.host
return TimedRotatingFileHandler.emit(self, record)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
class ValueUtility:
@classmethod
def toBoolean(cls, value):
if value is None:
return False
strValue = str(value).lower()
if strValue == '1':
return True
elif strValue == 'true':
return True
return False
#######################################################################
# Testing.
if __name__ == '__main__':
print ValueUtility.toBoolean('True')
#!/usr/bin/env python
from dm.common.api.dmRestApi import DmRestApi
from dm.common.utility.configurationManager import ConfigurationManager
class DaqRestApi(DmRestApi):
""" Base DAQ DM REST api class. """
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
if host == None:
host = ConfigurationManager.getInstance().getDaqWebServiceHost()
if port == None:
port = ConfigurationManager.getInstance().getDaqWebServicePort()
DmRestApi.__init__(self, username, password, host, port, protocol)
#######################################################################
# Testing.
if __name__ == '__main__':
api = DaqRestApi('sveseli', 'sveseli')
#api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list', 'GET', data='')
import urllib
from dm.common.utility.configurationManager import ConfigurationManager
cm = ConfigurationManager.getInstance()
cm.setSessionCacheFile('/tmp/session')
#print 'Non-session request'
#print api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list?path=/tmp', 'GET')
print 'Session request'
data = { 'path' : '/tmp/xyz' }
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write?path=/tmp/xyz&content=xyz', 'POST', contentType='application/x-www-form-urlencoded', data=urllib.urlencode(data))
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', data=data)
postdata='path=/tmp/xyz'
postdata+='&content=%s' % urllib.quote_plus('Hey there')
print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', contentType='application/x-www-form-urlencoded', data=postdata)
#!/usr/bin/env python
import os
import json
import urllib
from dm.common.constants import dmProcessingStatus
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.objects.experiment import Experiment
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.daqInfo import DaqInfo
from daqRestApi import DaqRestApi
class ExperimentDaqApi(DaqRestApi):
'''
Data Management API for accessing experiment interface provided by the
DAQ service on a DM experiment station.
'''
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
'''
Constructor.
:param username: DM username
:type username: str
:param password: DM password
:type password: str
:param host: DM service host
:type host: str
:param port: DM service port
:type port: int
:param protocol: DM service protocol
:type protocol: str
>>> api = ExperimentDaqApi(username='dm', password='XYZ', host='bluegill1', port=22236, protocol='https')
'''
DaqRestApi.__init__(self, username, password, host, port, protocol)
@DaqRestApi.execute2
def startDaq(self, experimentName, dataDirectory, daqInfo={}):
'''
Start data acquisition (real-time directory monitoring and file upload).
Only files created or modified after this command was issued will be
uploaded to storage.
:param experimentName: experiment name
:type experimentName: str
:param dataDirectory: data directory URL
:type dataDirectory: str
:param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition; several keys have special meaning:
- *processHiddenFiles* (bool): if set to True, hidden files will be processed
- *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
- *maxRunTimeInHours* (int): specifies maximum data acquisition run time in hours
- *uploadDataDirectoryOnExit* (str): specifies URL of the data directory that should be uploaded after data acquisition completes
- *uploadDestDirectoryOnExit* (str): specifies directory path relative to experiment root directory where uploaded files should be stored
- *skipPlugins* (str): comma-separated list of plugins which should not process files
:type daqInfo: dict
:returns: DaqInfo object
:raises InvalidRequest: in case of empty experiment name or data directory
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: in case experiment does not exist
:raises DmException: in case of any other errors
>>> daqInfo = api.startDaq(experimentName='test01', dataDirectory='/home/dm/test')
>>> daqId = daqInfo.get('id')
>>> daqStatus = daqInfo.get('status')
'''
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/startDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return DaqInfo(responseDict)
@DaqRestApi.execute2
def stopDaq(self, experimentName, dataDirectory):
'''
Stop data acquisition (real-time directory monitoring and file upload).
:param experimentName: experiment name
:type experimentName: str
:param dataDirectory: data directory URL
:type dataDirectory: str
:returns: DaqInfo object
:raises InvalidRequest: in case of empty experiment name or data directory
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: in case experiment does not exist, or if there are no active DAQs for a given experiment and data directory
:raises DmException: in case of any other errors
>>> daqInfo = api.stopDaq(experimentName='test01', dataDirectory='/home/dm/test')
>>> daqStatus = daqInfo.get('status')
'''
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/stopDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
responseDict = self.sendSessionRequest(url=url, method='POST')
return DaqInfo(responseDict)
@DaqRestApi.execute2
def getDaqInfo(self, id):
'''
Get data acquisition details.
:param id: data acquisition id
:type id: str
:returns: DaqInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known DAQ with a given id
:raises DmException: in case of any other errors
>>> daqInfo = api.getDaqInfo('84311a75-c88b-4605-8948-08257eae6f5c')
>>> daqStatus = daqInfo.get('status')
'''
if not id:
raise InvalidRequest('Daq id must be provided.')
url = '%s/experimentDaqs/%s' % (self.getContextRoot(),id)
responseDict = self.sendSessionRequest(url=url, method='GET')
return DaqInfo(responseDict)
@DaqRestApi.execute2
def listDaqs(self, status=None):
'''
Get list of known DAQs.
:param status: status string; if not supplied, all DAQs will be included in the returned list
:type status: str
:returns: list of DaqInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> daqInfoList = api.listDaqs()
>>> for daqInfo in daqInfoList:
>>> print daqInfo['id'], daqInfo['status']
'''
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, DaqInfo)
@DaqRestApi.execute2
def upload(self, experimentName, dataDirectory, daqInfo={}):
'''
Upload files from the given directory. Only files found at the time
when command was issued will be uploaded to storage.
:param experimentName: experiment name
:type experimentName: str
:param dataDirectory: data directory URL
:type dataDirectory: str
:param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition or processing that generated files that will be uploaded; several keys have special meaning:
- *processHiddenFiles* (bool): if set to True, hidden files will be processed
- *reprocessFiles* (bool): if set to True, files will be uploaded regardless of whether or not they already exist in storage and have not changed
- *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
- *processingMode* (str): specifies processing mode, and can be set to "files" (service plugins process individual files one at a time) or "directory" (service plugins process entire directory at once; works faster for uploads of a large number of small files)
- *skipPlugins* (str): comma-separated list of plugins which should not process files
:type daqInfo: dict
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid input arguments
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: in case experiment does not exist
:raises DmException: in case of any other errors
>>> uploadInfo = api.upload(experimentName='test01', dataDirectory='/home/dm/test', daqInfo={'sample' : 'A1'})
>>> uploadId = uploadInfo.get('id')
'''
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/upload/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return UploadInfo(responseDict)
@DaqRestApi.execute
def getUploadInfo(self, id):
'''
Get upload details.
:param id: upload id
:type id: str
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known upload with a given id
:raises DmException: in case of any other errors
>>> uplaodInfo = api.getUploadInfo('84311a75-c88b-4605-8948-08257eae6f5c')
>>> uploadStatus = uploadInfo.get('status')
'''
url = '%s/experimentUploads/%s' % (self.getContextRoot(),id)
if not id:
raise InvalidRequest('Upload id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='GET')
return UploadInfo(responseDict)
@DaqRestApi.execute2
def listUploads(self, status=None):
'''
Get list of known uploads.
:param status: status string; if not supplied, all uploads will be included in the returned list
:type status: str
:returns: list of UploadInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> uploadInfoList = api.listUploads(status='running')
>>> for uploadInfo in uploadInfoList:
>>> print uploadInfo['id']
'''
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, UploadInfo)
@DaqRestApi.execute2
def stopUpload(self, id):
'''
Abort upload.
:param id: upload id
:type id: str
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known upload with a given id
:raises DmException: in case of any other errors
>>> uplaodInfo = api.stopUpload('84311a75-c88b-4605-8948-08257eae6f5c')
>>> print uploadInfo.get('nCompletedFiles')
'''
url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id)
if not id:
raise InvalidRequest('Upload id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='POST')
return UploadInfo(responseDict)
@DaqRestApi.execute2
def listProcessingPlugins(self):
'''
Get list of DAQ service processing plugins.
:returns: list of PluginInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> pluginInfoList = api.getProcessingPlugins()
>>> for pluginInfo in pluginInfoList:
>>> print pluginInfo['name']
'''
url = '%s/processingPlugins' % (self.getContextRoot())
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, PluginInfo)
#######################################################################
# Testing.
if __name__ == '__main__':
api = ExperimentDaqApi('sveseli', 'sveseli', 'zagreb.svdev.net', 33336, 'http')
print api.startDaq('experiment1', '/tmp/data/experiment1')
#!/usr/bin/env python
from dm.common.cli.dmRestCli import DmRestCli
from dm.common.utility.configurationManager import ConfigurationManager
class DaqWebServiceCli(DmRestCli):
""" DM DAQ web service cli class. """
def __init__(self, validArgCount=0):
DmRestCli.__init__(self, validArgCount)
def getDefaultServiceHost(self):
return ConfigurationManager.getInstance().getDaqWebServiceHost()
def getDefaultServicePort(self):
return ConfigurationManager.getInstance().getDaqWebServicePort()
#!/usr/bin/env python
import os
from dm.common.cli.dmRestSessionCli import DmRestSessionCli
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.configurationManager import ConfigurationManager
class DaqWebServiceSessionCli(DmRestSessionCli):
""" DM DAQ web service session cli class. """
DEFAULT_SESSION_CACHE_FILE = OsUtility.getUserHomeDir() + '/.dm/.daq.session.cache'
def __init__(self, validArgCount=0):
DmRestSessionCli.__init__(self, validArgCount)
ConfigurationManager.getInstance().setSessionCacheFile(DaqWebServiceSessionCli.DEFAULT_SESSION_CACHE_FILE)
def getDefaultServiceHost(self):
return ConfigurationManager.getInstance().getDaqWebServiceHost()
def getDefaultServicePort(self):
return ConfigurationManager.getInstance().getDaqWebServicePort()
def getId(self):
return self.options.id
def getExperimentName(self):
return self.options.experimentName
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class GetDaqInfoCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--id', dest='id', help='Daq id.')
def checkArgs(self):
if self.options.id is None:
raise InvalidRequest('Daq id must be provided.')
def getId(self):
return self.options.id
def runCommand(self):
self.parseArgs(usage="""
dm-get-daq-info --id=ID
Description:
Retrieves detailed information for the specified data acquisition.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = api.getDaqInfo(self.getId())
print daqInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = GetDaqInfoCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class GetUploadInfoCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--id', dest='id', help='Upload id.')
def checkArgs(self):
if self.options.id is None:
raise InvalidRequest('Upload id must be provided.')
def getId(self):
return self.options.id
def runCommand(self):
self.parseArgs(usage="""
dm-get-upload-info --id=ID
Description:
Retrieves detailed information for the specified data upload id.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploadInfo = api.getUploadInfo(self.getId())
print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = GetUploadInfoCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListDaqsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-daqs [--status=STATUS]
Description:
Retrieves all known daqs.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqs = api.listDaqs(self.getStatus())
for daq in daqs:
print daq.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListDaqsCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class ListProcessingPluginsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self)
def runCommand(self):
self.parseArgs(usage="""
dm-list-processing-plugins
Description:
Retrieves list of known processing plugins.
""")
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
plugins = api.listProcessingPlugins()
for plugin in plugins:
print plugin.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListProcessingPluginsCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListUploadsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-uploads [--status=STATUS]
Description:
Retrieves all known uploads.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploads = api.listUploads(self.getStatus())
for upload in uploads:
print upload.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListUploadsCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StartDaqCli(DaqWebServiceSessionCli):
HOURS_PER_DAY = 24
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory URL.')
self.addOption('', '--dest-directory', dest='destDirectory',
help='Destination directory relative to experiment root path.')
self.addOption('', '--duration', dest='duration', help='DAQ duration; it must be specified in hours (h) or days (d). Examples: "8h", "14d".')
self.addOption('', '--upload-data-directory-on-exit', dest='uploadDataDirectoryOnExit', help='Data directory that will be uploaded automatically after DAQ is stopped.')
self.addOption('', '--upload-dest-directory-on-exit', dest='uploadDestDirectoryOnExit', help='Destination directory relative to experiment root path for automatic upload after DAQ is stopped. Requires upload data directory to be specified.')
self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
self.addOption('', '--skip-plugins', dest='skipPlugins', help='Comma-separated list of plugins which should not process files.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def updateDaqInfoFromOptions(self, daqInfo):
if self.options.processHidden:
daqInfo['processHiddenFiles'] = True
if self.options.skipPlugins:
daqInfo['skipPlugins'] = self.options.skipPlugins
if self.options.duration:
duration = self.options.duration
if duration.endswith('h'):
daqInfo['maxRunTimeInHours'] = int(duration[0:-1])
elif duration.endswith('d'):
daqInfo['maxRunTimeInHours'] = int(duration[0:-1])*self.HOURS_PER_DAY
else:
raise InvalidRequest('Maximum run time must contain valid unit specifier: "h" for hours or "d" for days.')
if self.options.destDirectory:
daqInfo['destDirectory'] = self.options.destDirectory
if self.options.uploadDataDirectoryOnExit:
daqInfo['uploadDataDirectoryOnExit'] = self.options.uploadDataDirectoryOnExit
if self.options.uploadDestDirectoryOnExit:
if not self.options.uploadDataDirectoryOnExit:
raise InvalidRequest('Upload destination directory on exit requires that upload data directory is specified as well.')
daqInfo['uploadDestDirectoryOnExit'] = self.options.uploadDestDirectoryOnExit
def runCommand(self):
self.parseArgs(usage="""
dm-start-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[--duration=DURATION]
[--dest-directory=DESTDIRECTORY]
[--upload-data-directory-on-exit=UPLOADDATADIRECTORYONEXIT]
[--upload-dest-directory-on-exit=UPLOADDESTDIRECTORYONEXIT]
[--process-hidden]
[key1:value1, key2:value2, ...]
Description:
Starts DAQ for a given experiment. Specified data directory will be
monitored for data files. All provided key/value pairs will be passed to
file processing plugins.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = self.splitArgsIntoDict()
self.updateDaqInfoFromOptions(daqInfo)
daqInfo = api.startDaq(self.getExperimentName(), self.getDataDirectory(), daqInfo=daqInfo)
print daqInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StartDaqCli()
cli.run()