Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 718 additions and 0 deletions
#!/usr/bin/env python
import ssl
class SslUtility:
DEFAULT_SSL_CONTEXT = ssl._create_default_https_context
@classmethod
def useUnverifiedSslContext(cls, func):
def wrapper(*args, **kwargs):
# Disable SSL checking
ssl._create_default_https_context = ssl._create_unverified_context
# Perform function call
result = func(*args, **kwargs)
# Revert back to original SSL settings
ssl._create_default_https_context = SslUtility.DEFAULT_SSL_CONTEXT
return result
return wrapper
#!/usr/bin/env python
import threading
class ThreadSafeQueue:
def __init__(self):
self.lock = threading.RLock()
self.queue = []
def push(self, item):
self.lock.acquire()
try:
self.queue.insert(0,item)
finally:
self.lock.release()
def pop(self):
# Return None if work queue is empty.
self.lock.acquire()
try:
item = None
if len(self.queue):
item = self.queue.pop()
return item
finally:
self.lock.release()
def getLength(self):
return len(self.queue)
def isEmpty(self):
return len(self.queue) == 0
####################################################################
# Testing
if __name__ == '__main__':
q = ThreadSafeQueue()
for i in range(0,10):
q.push(i)
print 'Added: ', i
while not q.isEmpty():
i = q.pop()
print 'Got: ', i
#!/usr/bin/env python
class ThreadingUtility:
# Assumes class has instance lock initialized
@classmethod
def synchronize(cls, func):
def synchronized(*args, **kwargs):
im_self = args[0]
im_self.lock.acquire()
try:
result = func(*args, **kwargs)
return result
finally:
im_self.lock.release()
return synchronized
#######################################################################
# Testing.
if __name__ == '__main__':
import threading
class A:
def __init__(self):
self.lock = threading.RLock()
@ThreadingUtility.synchronize
def twoX(self, x):
print 'X=', x
return 2*x
a = A()
t = a.twoX(3)
print 'Result: ', t
#!/usr/bin/env python
import threading
import time
# Uses earliest allowed processing timestamp to sort items in the queue
# Queued item will not be processed until its earliest allowed processing
# timestamp has passed
class TimeBasedProcessingQueue:
def __init__(self):
self.lock = threading.RLock()
self.queue = []
self.itemPopTimeList = []
def push(self, item, itemProcessingWaitTime=0):
self.lock.acquire()
try:
earliestPopTime = time.time() + itemProcessingWaitTime
popIndex = 0
for t in self.itemPopTimeList:
if earliestPopTime <= t:
break
popIndex += 1
self.itemPopTimeList.insert(popIndex, earliestPopTime)
self.queue.insert(popIndex,item)
finally:
self.lock.release()
def pop(self):
# Return None if work queue is empty.
self.lock.acquire()
try:
item = None
if len(self.queue):
if self.itemPopTimeList[0] <= time.time():
del self.itemPopTimeList[0]
item = self.queue[0]
del self.queue[0]
return item
finally:
self.lock.release()
def getLength(self):
return len(self.queue)
def isEmpty(self):
return len(self.queue) == 0
####################################################################
# Testing
if __name__ == '__main__':
import random
q = TimeBasedProcessingQueue()
for i in range(0,10000000):
waitTime = random.uniform(0,10)
q.push(i, waitTime)
#print 'Added: ', i, '; Processing wait: ', waitTime
print "Sleeping..."
time.sleep(60)
print "Removing..."
while not q.isEmpty():
i = q.pop()
#print 'Got: ', i
#time.sleep(1)
print "Sleeping..."
time.sleep(60)
#!/usr/bin/env python
import time
import pytz
import datetime
from tzlocal import get_localzone
from dm.common.exceptions.invalidArgument import InvalidArgument
class TimeUtility:
UTC_MINUS_LOCAL_TIME = None
@classmethod
def getCurrentGMTimestamp(cls):
""" Formats GMT timestamp. """
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(time.time()))
@classmethod
def formatGMTimestamp(cls, t):
""" Format GMT timestamp. """
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(t))
@classmethod
def getCurrentLocalTimestamp(cls):
""" Formats local timestamp. """
return time.strftime('%Y/%m/%d %H:%M:%S %Z', time.localtime(time.time()))
@classmethod
def formatLocalTimestamp(cls, t):
""" Formats local timestamp. """
return time.strftime('%Y/%m/%d %H:%M:%S %Z', time.localtime(t))
@classmethod
def toDateTime(cls, t, format):
if not t:
return None
tz = get_localzone()
try:
dt = datetime.datetime.strptime(t, format)
except Exception, ex:
raise InvalidArgument('Cannot parse input: %s' % ex)
return tz.localize(dt, is_dst=None)
@classmethod
def utcToLocalTime(cls, utc):
if cls.UTC_MINUS_LOCAL_TIME is None:
cls.UTC_MINUS_LOCAL_TIME = (datetime.datetime.utcnow()-datetime.datetime.now()).total_seconds()
if cls.UTC_MINUS_LOCAL_TIME > 0:
cls.UTC_MINUS_LOCAL_TIME = int(cls.UTC_MINUS_LOCAL_TIME+0.5)
else:
cls.UTC_MINUS_LOCAL_TIME = int(cls.UTC_MINUS_LOCAL_TIME-0.5)
localTime = utc - cls.UTC_MINUS_LOCAL_TIME
return localTime
#######################################################################
# Testing.
if __name__ == '__main__':
print TimeUtility.toDateTime('2015-01-03', '%Y-%m-%d')
dt0 = datetime.datetime.utcnow()
dt1 = datetime.datetime.now()
ts0 = time.mktime(dt0.timetuple())
ts1 = time.mktime(dt1.timetuple())
t0 = time.strftime("%Y/%m/%d %H:%M:%S", dt0.timetuple())
print 'UTC: ', t0, ts0
t1 = time.strftime("%Y/%m/%d %H:%M:%S", dt1.timetuple())
print 'LOCAL: ', t1, ts1
print 'UTC TO LOCAL: ', TimeUtility.utcToLocalTime(ts0)
#!/usr/bin/env python
#######################################################################
import socket
import pwd
import os
from logging.handlers import TimedRotatingFileHandler
#######################################################################
class TimedRotatingFileLoggingHandler(TimedRotatingFileHandler):
""" Class that enables logging into files. """
def __init__(self, filename, when='D', interval=1, backupCount=0, encoding=None):
TimedRotatingFileHandler.__init__(self, filename, when, interval, backupCount, encoding)
self.user = pwd.getpwuid(os.getuid())[0]
self.host = socket.gethostname()
def emit(self, record):
record.__dict__['user'] = self.user
record.__dict__['host'] = self.host
return TimedRotatingFileHandler.emit(self, record)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
class ValueUtility:
@classmethod
def toBoolean(cls, value):
if value is None:
return False
strValue = str(value).lower()
if strValue == '1':
return True
elif strValue == 'true':
return True
return False
#######################################################################
# Testing.
if __name__ == '__main__':
print ValueUtility.toBoolean('True')
#!/usr/bin/env python
from dm.common.api.dmRestApi import DmRestApi
from dm.common.utility.configurationManager import ConfigurationManager
class DaqRestApi(DmRestApi):
""" Base DAQ DM REST api class. """
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
if host == None:
host = ConfigurationManager.getInstance().getDaqWebServiceHost()
if port == None:
port = ConfigurationManager.getInstance().getDaqWebServicePort()
DmRestApi.__init__(self, username, password, host, port, protocol)
#######################################################################
# Testing.
if __name__ == '__main__':
api = DaqRestApi('sveseli', 'sveseli')
#api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list', 'GET', data='')
import urllib
from dm.common.utility.configurationManager import ConfigurationManager
cm = ConfigurationManager.getInstance()
cm.setSessionCacheFile('/tmp/session')
#print 'Non-session request'
#print api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list?path=/tmp', 'GET')
print 'Session request'
data = { 'path' : '/tmp/xyz' }
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write?path=/tmp/xyz&content=xyz', 'POST', contentType='application/x-www-form-urlencoded', data=urllib.urlencode(data))
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', data=data)
postdata='path=/tmp/xyz'
postdata+='&content=%s' % urllib.quote_plus('Hey there')
print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', contentType='application/x-www-form-urlencoded', data=postdata)
#!/usr/bin/env python
import os
import json
import urllib
from dm.common.constants import dmProcessingStatus
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.objects.experiment import Experiment
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.daqInfo import DaqInfo
from daqRestApi import DaqRestApi
class ExperimentRestApi(DaqRestApi):
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
DaqRestApi.__init__(self, username, password, host, port, protocol)
@DaqRestApi.execute
def startDaq(self, experimentName, dataDirectory, daqInfo={}):
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/startDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return DaqInfo(responseDict)
@DaqRestApi.execute
def stopDaq(self, experimentName, dataDirectory):
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/stopDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
responseDict = self.sendSessionRequest(url=url, method='POST')
return DaqInfo(responseDict)
@DaqRestApi.execute
def getDaqInfo(self, id):
if not id:
raise InvalidRequest('Daq id must be provided.')
url = '%s/experimentDaqs/%s' % (self.getContextRoot(),id)
responseDict = self.sendSessionRequest(url=url, method='GET')
return DaqInfo(responseDict)
@DaqRestApi.execute
def listDaqs(self, status=None):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, DaqInfo)
@DaqRestApi.execute
def upload(self, experimentName, dataDirectory, daqInfo={}):
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/upload/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return UploadInfo(responseDict)
@DaqRestApi.execute
def getUploadInfo(self, id):
url = '%s/experimentUploads/%s' % (self.getContextRoot(),id)
if not id:
raise InvalidRequest('Upload id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='GET')
return UploadInfo(responseDict)
@DaqRestApi.execute
def listUploads(self, status=None):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, UploadInfo)
@DaqRestApi.execute
def stopUpload(self, id):
url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id)
if not id:
raise InvalidRequest('Upload id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='POST')
return UploadInfo(responseDict)
@DaqRestApi.execute
def getProcessingPlugins(self):
url = '%s/processingPlugins' % (self.getContextRoot())
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, PluginInfo)
#######################################################################
# Testing.
if __name__ == '__main__':
api = ExperimentRestApi('sveseli', 'sveseli', 'zagreb.svdev.net', 33336, 'http')
print api.startDaq('experiment1', '/tmp/data/experiment1')
#!/usr/bin/env python
from dm.common.cli.dmRestCli import DmRestCli
from dm.common.utility.configurationManager import ConfigurationManager
class DaqWebServiceCli(DmRestCli):
""" DM DAQ web service cli class. """
def __init__(self, validArgCount=0):
DmRestCli.__init__(self, validArgCount)
def getDefaultServiceHost(self):
return ConfigurationManager.getInstance().getDaqWebServiceHost()
def getDefaultServicePort(self):
return ConfigurationManager.getInstance().getDaqWebServicePort()
#!/usr/bin/env python
import os
from dm.common.cli.dmRestSessionCli import DmRestSessionCli
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.configurationManager import ConfigurationManager
class DaqWebServiceSessionCli(DmRestSessionCli):
""" DM DAQ web service session cli class. """
DM_FILE_SERVER_URL_ENV_VAR = 'DM_FILE_SERVER_URL'
DEFAULT_SESSION_CACHE_FILE = OsUtility.getUserHomeDir() + '/.dm/.daq.session.cache'
def __init__(self, validArgCount=0):
DmRestSessionCli.__init__(self, validArgCount)
ConfigurationManager.getInstance().setSessionCacheFile(DaqWebServiceSessionCli.DEFAULT_SESSION_CACHE_FILE)
def getDefaultServiceHost(self):
return ConfigurationManager.getInstance().getDaqWebServiceHost()
def getDefaultServicePort(self):
return ConfigurationManager.getInstance().getDaqWebServicePort()
def getId(self):
return self.options.id
def getExperimentName(self):
return self.options.experimentName
def getDataDirectory(self):
dataDirectory = self.options.dataDirectory
# Make sure data directory already does not have url scheme
if dataDirectory and dataDirectory.find('://') < 0:
fileServerUrl = os.environ.get(self.DM_FILE_SERVER_URL_ENV_VAR, '')
dataDirectory = '%s%s' % (fileServerUrl, dataDirectory)
return dataDirectory
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class GetDaqInfoCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--id', dest='id', help='Daq id.')
def checkArgs(self):
if self.options.id is None:
raise InvalidRequest('Daq id must be provided.')
def getId(self):
return self.options.id
def runCommand(self):
self.parseArgs(usage="""
dm-get-daq-info --id=ID
Description:
Retrieves detailed information for the specified data acquisition.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = api.getDaqInfo(self.getId())
print daqInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = GetDaqInfoCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class GetProcessingPluginsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self)
def runCommand(self):
self.parseArgs(usage="""
dm-get-processing-plugins
Description:
Retrieves list of known processing plugins.
""")
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
plugins = api.getProcessingPlugins()
for plugin in plugins:
print plugin.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = GetProcessingPluginsCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class GetUploadInfoCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--id', dest='id', help='Upload id.')
def checkArgs(self):
if self.options.id is None:
raise InvalidRequest('Upload id must be provided.')
def getId(self):
return self.options.id
def runCommand(self):
self.parseArgs(usage="""
dm-get-upload-info --id=ID
Description:
Retrieves detailed information for the specified data upload id.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploadInfo = api.getUploadInfo(self.getId())
print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = GetUploadInfoCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListDaqsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-daqs [--status=STATUS]
Description:
Retrieves all known daqs.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqs = api.listDaqs(self.getStatus())
for daq in daqs:
print daq.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListDaqsCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListUploadsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-uploads [--status=STATUS]
Description:
Retrieves all known uploads.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploads = api.listUploads(self.getStatus())
for upload in uploads:
print upload.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListUploadsCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StartDaqCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR)
self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def updateDaqInfoFromOptions(self, daqInfo):
if self.options.processHidden:
daqInfo['processHiddenFiles'] = True
def runCommand(self):
self.parseArgs(usage="""
dm-start-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[--process-hidden]
[key1:value1, key2:value2, ...]
Description:
Starts DAQ for a given experiment. Specified data directory will be
monitored for data files. All provided key/value pairs will be passed to
file processing plugins.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = self.splitArgsIntoDict()
self.updateDaqInfoFromOptions(daqInfo)
daqInfo = api.startDaq(self.getExperimentName(), self.getDataDirectory(), daqInfo=daqInfo)
print daqInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StartDaqCli()
cli.run()