Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 620 additions and 0 deletions
#!/usr/bin/env python
class ThreadingUtility:
# Assumes class has instance lock initialized
@classmethod
def synchronize(cls, func):
def synchronized(*args, **kwargs):
im_self = args[0]
im_self.lock.acquire()
try:
result = func(*args, **kwargs)
return result
finally:
im_self.lock.release()
return synchronized
#######################################################################
# Testing.
if __name__ == '__main__':
import threading
class A:
def __init__(self):
self.lock = threading.RLock()
@ThreadingUtility.synchronize
def twoX(self, x):
print 'X=', x
return 2*x
a = A()
t = a.twoX(3)
print 'Result: ', t
#!/usr/bin/env python
import threading
import time
# Uses earliest allowed processing timestamp to sort items in the queue
# Queued item will not be processed until its earliest allowed processing
# timestamp has passed
class TimeBasedProcessingQueue:
def __init__(self):
self.lock = threading.RLock()
self.queue = []
self.itemPopTimeList = []
def push(self, item, itemProcessingWaitTime=0):
self.lock.acquire()
try:
earliestPopTime = time.time() + itemProcessingWaitTime
popIndex = 0
for t in self.itemPopTimeList:
if earliestPopTime <= t:
break
popIndex += 1
self.itemPopTimeList.insert(popIndex, earliestPopTime)
self.queue.insert(popIndex,item)
finally:
self.lock.release()
def pop(self):
# Return None if work queue is empty.
self.lock.acquire()
try:
item = None
if len(self.queue):
if self.itemPopTimeList[0] <= time.time():
del self.itemPopTimeList[0]
item = self.queue[0]
del self.queue[0]
return item
finally:
self.lock.release()
def getLength(self):
return len(self.queue)
def isEmpty(self):
return len(self.queue) == 0
####################################################################
# Testing
if __name__ == '__main__':
import random
q = TimeBasedProcessingQueue()
for i in range(0,10):
waitTime = random.uniform(0,10)
q.push(i, waitTime)
print 'Added: ', i, '; Processing wait: ', waitTime
while not q.isEmpty():
i = q.pop()
print 'Got: ', i
time.sleep(1)
#!/usr/bin/env python
import time
class TimeUtility:
@classmethod
def getCurrentGMTimeStamp(cls):
""" Formats GMT timestamp. """
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(time.time()))
@classmethod
def formatGMTimeStamp(cls, t):
""" Format GMT timestamp. """
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(t))
@classmethod
def getCurrentLocalTimeStamp(cls):
""" Formats local timestamp. """
return time.strftime('%Y/%m/%d %H:%M:%S %Z', time.localtime(time.time()))
@classmethod
def formatLocalTimeStamp(cls, t):
""" Formats local timestamp. """
return time.strftime('%Y/%m/%d %H:%M:%S %Z', time.localtime(t))
#!/usr/bin/env python
#######################################################################
import socket
import pwd
import os
from logging.handlers import TimedRotatingFileHandler
#######################################################################
class TimedRotatingFileLoggingHandler(TimedRotatingFileHandler):
""" Class that enables logging into files. """
def __init__(self, filename, when='D', interval=1, backupCount=0, encoding=None):
TimedRotatingFileHandler.__init__(self, filename, when, interval, backupCount, encoding)
self.user = pwd.getpwuid(os.getuid())[0]
self.host = socket.gethostname()
def emit(self, record):
record.__dict__['user'] = self.user
record.__dict__['host'] = self.host
return TimedRotatingFileHandler.emit(self, record)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
class ValueUtility:
@classmethod
def toBoolean(cls, value):
if value is None:
return False
if str(value).lower() == 'true':
return True
return False
#######################################################################
# Testing.
if __name__ == '__main__':
print ValueUtility.toBoolean('True')
#!/usr/bin/env python
from dm.common.api.dmRestApi import DmRestApi
from dm.common.utility.configurationManager import ConfigurationManager
class DaqRestApi(DmRestApi):
""" Base DAQ DM REST api class. """
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
if host == None:
host = ConfigurationManager.getInstance().getDaqWebServiceHost()
if port == None:
port = ConfigurationManager.getInstance().getDaqWebServicePort()
DmRestApi.__init__(self, username, password, host, port, protocol)
#######################################################################
# Testing.
if __name__ == '__main__':
api = DaqRestApi('sveseli', 'sveseli')
#api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list', 'GET', data='')
import urllib
from dm.common.utility.configurationManager import ConfigurationManager
cm = ConfigurationManager.getInstance()
cm.setSessionCacheFile('/tmp/session')
#print 'Non-session request'
#print api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list?path=/tmp', 'GET')
print 'Session request'
data = { 'path' : '/tmp/xyz' }
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write?path=/tmp/xyz&content=xyz', 'POST', contentType='application/x-www-form-urlencoded', data=urllib.urlencode(data))
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', data=data)
postdata='path=/tmp/xyz'
postdata+='&content=%s' % urllib.quote_plus('Hey there')
print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', contentType='application/x-www-form-urlencoded', data=postdata)
#!/usr/bin/env python
import os
import urllib
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.objects.experiment import Experiment
from daqRestApi import DaqRestApi
class ExperimentRestApi(DaqRestApi):
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
DaqRestApi.__init__(self, username, password, host, port, protocol)
@DaqRestApi.execute
def startDaq(self, name, dataDirectory):
url = '%s/experiments/startDaq' % (self.getContextRoot())
if name is None or not len(name):
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
if dataDirectory is None or not len(dataDirectory):
raise InvalidRequest('Experiment data directory must be provided.')
url += '&dataDirectory=%s' % Encoder.encode(dataDirectory)
responseDict = self.sendSessionRequest(url=url, method='POST')
return Experiment(responseDict)
@DaqRestApi.execute
def stopDaq(self, name):
url = '%s/experiments/stopDaq' % (self.getContextRoot())
if name is None or not len(name):
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
responseDict = self.sendSessionRequest(url=url, method='POST')
return Experiment(responseDict)
@DaqRestApi.execute
def upload(self, name, dataDirectory):
url = '%s/experiments/upload' % (self.getContextRoot())
if name is None or not len(name):
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
if dataDirectory is None or not len(dataDirectory):
raise InvalidRequest('Experiment data directory must be provided.')
url += '&dataDirectory=%s' % Encoder.encode(dataDirectory)
responseDict = self.sendSessionRequest(url=url, method='POST')
return Experiment(responseDict)
#######################################################################
# Testing.
if __name__ == '__main__':
api = ExperimentRestApi('sveseli', 'sveseli', 'zagreb.svdev.net', 33336, 'http')
print api.startDaq('experiment1', '/tmp/data/experiment1')
#!/usr/bin/env python
from dm.common.cli.dmRestCli import DmRestCli
from dm.common.utility.configurationManager import ConfigurationManager
class DaqWebServiceCli(DmRestCli):
""" DM DAQ web service cli class. """
def __init__(self, validArgCount=0):
DmRestCli.__init__(self, validArgCount)
def getDefaultServiceHost(self):
return ConfigurationManager.getInstance().getDaqWebServiceHost()
def getDefaultServicePort(self):
return ConfigurationManager.getInstance().getDaqWebServicePort()
#!/usr/bin/env python
from dm.common.cli.dmRestSessionCli import DmRestSessionCli
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.configurationManager import ConfigurationManager
class DaqWebServiceSessionCli(DmRestSessionCli):
""" DM DAQ web service session cli class. """
DEFAULT_SESSION_CACHE_FILE = OsUtility.getUserHomeDir() + '/.dm/.daq.session.cache'
def __init__(self, validArgCount=0):
DmRestSessionCli.__init__(self, validArgCount)
ConfigurationManager.getInstance().setSessionCacheFile(DaqWebServiceSessionCli.DEFAULT_SESSION_CACHE_FILE)
def getDefaultServiceHost(self):
return ConfigurationManager.getInstance().getDaqWebServiceHost()
def getDefaultServicePort(self):
return ConfigurationManager.getInstance().getDaqWebServicePort()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StartDaqCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def getExperimentName(self):
return self.options.experimentName
def getDataDirectory(self):
return self.options.dataDirectory
def runCommand(self):
self.parseArgs(usage="""
dm-start-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[key1:value1, key2:value2, ...]
Description:
Starts DAQ for a given experiment. Specified data directory will be
monitored for data files. All provided key/value pairs will be passed to
file processing plugins.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = self.splitArgsIntoDict()
experiment = api.startDaq(self.getExperimentName(), self.getDataDirectory(), daqInfo=daqInfo)
print experiment.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StartDaqCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StopDaqCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
def getExperimentName(self):
return self.options.experimentName
def runCommand(self):
self.parseArgs(usage="""
dm-stop-daq --experiment=EXPERIMENTNAME
Description:
Stop DAQ for a given experiment.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
experiment = api.stopDaq(self.getExperimentName())
print experiment.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StopDaqCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class UploadCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def getExperimentName(self):
return self.options.experimentName
def getDataDirectory(self):
return self.options.dataDirectory
def runCommand(self):
self.parseArgs(usage="""
dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[key1:value1, key2:value2, ...]
Description:
Schedules data upload for a given experiment. All existing files in the
specified directory will be uploaded to storage. Relative directory
structure will be preserved. All provided key/value pairs will be passed
to file processing plugins.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
experiment = api.upload(self.getExperimentName(), self.getDataDirectory())
print experiment.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = UploadCli()
cli.run()
#!/usr/bin/env python
#
# DM DAQ Web Service
#
from dm.common.service.dmRestWebServiceBase import DmRestWebServiceBase
from dm.common.utility.dmModuleManager import DmModuleManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.daq_web_service.service.impl.fileSystemObserver import FileSystemObserver
from daqWebServiceRouteMapper import DaqWebServiceRouteMapper
class DaqWebService(DmRestWebServiceBase):
def __init__(self):
DmRestWebServiceBase.__init__(self, DaqWebServiceRouteMapper)
def initDmModules(self):
self.logger.debug('Initializing dm modules')
# Add modules that will be started.
moduleManager = DmModuleManager.getInstance()
moduleManager.addModule(FileSystemObserver.getInstance())
moduleManager.addModule(FileProcessingManager.getInstance())
self.logger.debug('Initialized dm modules')
def getDefaultServerHost(self):
return ConfigurationManager.getInstance().getServiceHost()
def getDefaultServerPort(self):
return ConfigurationManager.getInstance().getServicePort()
####################################################################
# Run service
if __name__ == '__main__':
ConfigurationManager.getInstance().setServiceName('daq-web-service')
service = DaqWebService();
service.run()
#!/usr/bin/env python
#
# Route mapper for DM DAQ web service.
#
import sys
import os
import cherrypy
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.service.loginRouteDescriptor import LoginRouteDescriptor
from experimentRouteDescriptor import ExperimentRouteDescriptor
class DaqWebServiceRouteMapper:
@classmethod
def setupRoutes(cls):
""" Setup RESTFul routes. """
logger = LoggingManager.getInstance().getLogger(cls.__name__)
contextRoot = ConfigurationManager.getInstance().getContextRoot()
logger.debug('Using context root: %s' % contextRoot)
# Get routes.
routes = LoginRouteDescriptor.getRoutes()
routes += ExperimentRouteDescriptor.getRoutes()
# Add routes to dispatcher.
d = cherrypy.dispatch.RoutesDispatcher()
for route in routes:
logger.debug('Connecting route: %s' % route)
d.connect(route['name'], route['path'], action=route['action'], controller=route['controller'], conditions=dict(method=route['method']))
return d
#!/usr/bin/env python
#
# User route descriptor.
#
from dm.common.utility.configurationManager import ConfigurationManager
from experimentSessionController import ExperimentSessionController
class ExperimentRouteDescriptor:
@classmethod
def getRoutes(cls):
contextRoot = ConfigurationManager.getInstance().getContextRoot()
# Static instances shared between different routes
experimentSessionController = ExperimentSessionController()
# Define routes.
routes = [
# Start experiment daq
{
'name' : 'startDaq',
'path' : '%s/experiments/startDaq' % contextRoot,
'controller' : experimentSessionController,
'action' : 'startDaq',
'method' : ['POST']
},
# Stop experiment daq
{
'name' : 'stopDaq',
'path' : '%s/experiments/stopDaq' % contextRoot,
'controller' : experimentSessionController,
'action' : 'stopDaq',
'method' : ['POST']
},
# Upload experiment data
{
'name' : 'upload',
'path' : '%s/experiments/upload' % contextRoot,
'controller' : experimentSessionController,
'action' : 'upload',
'method' : ['POST']
},
]
return routes
#!/usr/bin/env python
import cherrypy
from dm.common.service.dmSessionController import DmSessionController
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.utility.encoder import Encoder
from dm.daq_web_service.service.impl.experimentSessionControllerImpl import ExperimentSessionControllerImpl
class ExperimentSessionController(DmSessionController):
def __init__(self):
DmSessionController.__init__(self)
self.experimentSessionControllerImpl = ExperimentSessionControllerImpl()
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def startDaq(self, **kwargs):
name = kwargs.get('name')
if name is None or not len(name):
raise InvalidRequest('Missing experiment name.')
name = Encoder.decode(name)
dataDirectory = kwargs.get('dataDirectory')
if dataDirectory is None or not len(dataDirectory):
raise InvalidRequest('Missing experiment data directory.')
dataDirectory = Encoder.decode(dataDirectory)
response = self.experimentSessionControllerImpl.startDaq(name, dataDirectory).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def stopDaq(self, **kwargs):
name = kwargs.get('name')
if name is None or not len(name):
raise InvalidRequest('Missing experiment name.')
name = Encoder.decode(name)
response = self.experimentSessionControllerImpl.stopDaq(name).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def upload(self, **kwargs):
name = kwargs.get('name')
if name is None or not len(name):
raise InvalidRequest('Missing experiment name.')
name = Encoder.decode(name)
dataDirectory = kwargs.get('dataDirectory')
if dataDirectory is None or not len(dataDirectory):
raise InvalidRequest('Missing experiment data directory.')
dataDirectory = Encoder.decode(dataDirectory)
response = self.experimentSessionControllerImpl.upload(name, dataDirectory).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response