Skip to content
Snippets Groups Projects
Commit 56e604fc authored by sveseli's avatar sveseli
Browse files

add first functional daq web service with experiment controller, functionality...

add first functional daq web service with experiment controller, functionality to observe file system and invoke file transfers; add initial API and CLI classes for start/stop daq, upload data;
parent 0157c851
No related branches found
No related tags found
No related merge requests found
Showing
with 687 additions and 0 deletions
#!/usr/bin/env python
from dm.common.api.dmRestApi import DmRestApi
class DaqRestApi(DmRestApi):
""" Base DAQ DM REST api class. """
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
if host == None:
host = self.configurationManager.getDaqWebServiceHost()
if port == None:
port = self.configurationManager.getDaqWebServicePort()
DmRestApi.__init__(self, username, password, host, port, protocol)
#######################################################################
# Testing.
if __name__ == '__main__':
api = DaqRestApi('sveseli', 'sveseli')
#api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list', 'GET', data='')
import urllib
from dm.common.utility.configurationManager import ConfigurationManager
cm = ConfigurationManager.getInstance()
cm.setSessionCacheFile('/tmp/session')
#print 'Non-session request'
#print api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list?path=/tmp', 'GET')
print 'Session request'
data = { 'path' : '/tmp/xyz' }
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write?path=/tmp/xyz&content=xyz', 'POST', contentType='application/x-www-form-urlencoded', data=urllib.urlencode(data))
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', data=data)
postdata='path=/tmp/xyz'
postdata+='&content=%s' % urllib.quote_plus('Hey there')
print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', contentType='application/x-www-form-urlencoded', data=postdata)
#!/usr/bin/env python
import os
import urllib
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.objects.experiment import Experiment
from daqRestApi import DaqRestApi
class ExperimentRestApi(DaqRestApi):
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
DaqRestApi.__init__(self, username, password, host, port, protocol)
@DaqRestApi.execute
def startDaq(self, name, dataDirectory):
url = '%s/experiments/startDaq' % (self.getContextRoot())
if name is None or not len(name):
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
if dataDirectory is None or not len(dataDirectory):
raise InvalidRequest('Experiment data directory must be provided.')
url += '&dataDirectory=%s' % Encoder.encode(dataDirectory)
responseDict = self.sendSessionRequest(url=url, method='POST')
return Experiment(responseDict)
@DaqRestApi.execute
def stopDaq(self, name):
url = '%s/experiments/stopDaq' % (self.getContextRoot())
if name is None or not len(name):
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
responseDict = self.sendSessionRequest(url=url, method='POST')
return Experiment(responseDict)
#######################################################################
# Testing.
if __name__ == '__main__':
api = ExperimentRestApi('sveseli', 'sveseli', 'zagreb.svdev.net', 33336, 'http')
print api.startDaq('experiment1', '/tmp/data/experiment1')
#!/usr/bin/env python
from dm.common.cli.dmRestCli import DmRestCli
from dm.common.utility.configurationManager import ConfigurationManager
class DaqWebServiceCli(DmRestCli):
""" DM DAQ web service cli class. """
def __init__(self, validArgCount=0):
DmRestCli.__init__(self, validArgCount)
def getDefaultServiceHost(self):
return ConfigurationManager.getInstance().getDaqWebServiceHost()
def getDefaultServicePort(self):
return ConfigurationManager.getInstance().getDaqWebServicePort()
#!/usr/bin/env python
from dm.common.cli.dmRestSessionCli import DmRestSessionCli
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.configurationManager import ConfigurationManager
class DaqWebServiceSessionCli(DmRestSessionCli):
""" DM DAQ web service session cli class. """
DEFAULT_SESSION_CACHE_FILE = OsUtility.getUserHomeDir() + '/.dm/.daq.session.cache'
def __init__(self, validArgCount=0):
DmRestSessionCli.__init__(self, validArgCount)
ConfigurationManager.getInstance().setSessionCacheFile(DaqWebServiceSessionCli.DEFAULT_SESSION_CACHE_FILE)
def getDefaultServiceHost(self):
return ConfigurationManager.getInstance().getDaqWebServiceHost()
def getDefaultServicePort(self):
return ConfigurationManager.getInstance().getDaqWebServicePort()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StartDaqCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def getExperimentName(self):
return self.options.experimentName
def getDataDirectory(self):
return self.options.dataDirectory
def runCommand(self):
self.parseArgs(usage="""
dm-start-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
Description:
Starts DAQ for a given experiment. Provided data directory will be monitored for data files.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
experiment = api.startDaq(self.getExperimentName(), self.getDataDirectory())
print experiment.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StartDaqCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StopDaqCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
def getExperimentName(self):
return self.options.experimentName
def runCommand(self):
self.parseArgs(usage="""
dm-stop-daq --experiment=EXPERIMENTNAME
Description:
Stop DAQ for a given experiment.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
experiment = api.stopDaq(self.getExperimentName())
print experiment.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StopDaqCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class UploadDataCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceCli.__init__(self)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def getExperimentName(self):
return self.options.experimentName
def getDataDirectory(self):
return self.options.dataDirectory
def runCommand(self):
self.parseArgs(usage="""
dm-upload-data --name=EXPERIMENTNAME --data-directory=DATADIRECTORY
Description:
Schedules data upload for a given experiment.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
experiment = api.uploadData(self.getExperimentName(), self.getDataDirectory())
print experiment.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = UploadDataCli()
cli.run()
#!/usr/bin/env python
#
# DM DAQ Web Service
#
from dm.common.service.dmRestWebServiceBase import DmRestWebServiceBase
from dm.common.utility.dmModuleManager import DmModuleManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.daq_web_service.service.impl.fileSystemObserver import FileSystemObserver
from dm.daq_web_service.service.impl.fileProcessingManager import FileProcessingManager
from daqWebServiceRouteMapper import DaqWebServiceRouteMapper
class DaqWebService(DmRestWebServiceBase):
def __init__(self):
DmRestWebServiceBase.__init__(self, DaqWebServiceRouteMapper)
def initDmModules(self):
self.logger.debug('Initializing dm modules')
# Add modules that will be started.
moduleManager = DmModuleManager.getInstance()
moduleManager.addModule(FileSystemObserver.getInstance())
moduleManager.addModule(FileProcessingManager.getInstance())
self.logger.debug('Initialized dm modules')
def getDefaultServerHost(self):
return ConfigurationManager.getInstance().getServiceHost()
def getDefaultServerPort(self):
return ConfigurationManager.getInstance().getServicePort()
####################################################################
# Run service
if __name__ == '__main__':
ConfigurationManager.getInstance().setServiceName('daq-web-service')
service = DaqWebService();
service.run()
#!/usr/bin/env python
#
# Route mapper for DM DAQ web service.
#
import sys
import os
import cherrypy
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.service.loginRouteDescriptor import LoginRouteDescriptor
from experimentRouteDescriptor import ExperimentRouteDescriptor
class DaqWebServiceRouteMapper:
@classmethod
def setupRoutes(cls):
""" Setup RESTFul routes. """
logger = LoggingManager.getInstance().getLogger(cls.__name__)
contextRoot = ConfigurationManager.getInstance().getContextRoot()
logger.debug('Using context root: %s' % contextRoot)
# Get routes.
routes = LoginRouteDescriptor.getRoutes()
routes += ExperimentRouteDescriptor.getRoutes()
# Add routes to dispatcher.
d = cherrypy.dispatch.RoutesDispatcher()
for route in routes:
logger.debug('Connecting route: %s' % route)
d.connect(route['name'], route['path'], action=route['action'], controller=route['controller'], conditions=dict(method=route['method']))
return d
#!/usr/bin/env python
#
# User route descriptor.
#
from dm.common.utility.configurationManager import ConfigurationManager
from experimentSessionController import ExperimentSessionController
class ExperimentRouteDescriptor:
@classmethod
def getRoutes(cls):
contextRoot = ConfigurationManager.getInstance().getContextRoot()
# Static instances shared between different routes
experimentSessionController = ExperimentSessionController()
# Define routes.
routes = [
# Start experiment daq
{
'name' : 'startDaq',
'path' : '%s/experiments/startDaq' % contextRoot,
'controller' : experimentSessionController,
'action' : 'startDaq',
'method' : ['POST']
},
# Stop experiment daq
{
'name' : 'stopDaq',
'path' : '%s/experiments/stopDaq' % contextRoot,
'controller' : experimentSessionController,
'action' : 'stopDaq',
'method' : ['POST']
},
]
return routes
#!/usr/bin/env python
import cherrypy
from dm.common.service.dmSessionController import DmSessionController
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.utility.encoder import Encoder
from dm.daq_web_service.service.impl.experimentSessionControllerImpl import ExperimentSessionControllerImpl
class ExperimentSessionController(DmSessionController):
def __init__(self):
DmSessionController.__init__(self)
self.experimentSessionControllerImpl = ExperimentSessionControllerImpl()
@cherrypy.expose
@DmSessionController.require(DmSessionController.isLoggedIn())
@DmSessionController.execute
def startDaq(self, **kwargs):
name = kwargs.get('name')
if name is None or not len(name):
raise InvalidRequest('Missing experiment name.')
name = Encoder.decode(name)
dataDirectory = kwargs.get('dataDirectory')
if dataDirectory is None or not len(dataDirectory):
raise InvalidRequest('Missing experiment data directory.')
dataDirectory = Encoder.decode(dataDirectory)
response = self.experimentSessionControllerImpl.startDaq(name, dataDirectory).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isLoggedIn())
@DmSessionController.execute
def stopDaq(self, **kwargs):
name = kwargs.get('name')
if name is None or not len(name):
raise InvalidRequest('Missing experiment name.')
name = Encoder.decode(name)
response = self.experimentSessionControllerImpl.stopDaq(name).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
#!/usr/bin/env python
import os
from watchdog.events import FileSystemEventHandler
from dm.common.utility.loggingManager import LoggingManager
class DmFileSystemEventHandler(FileSystemEventHandler):
def __init__(self, fileSystemObserver):
FileSystemEventHandler.__init__(self)
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.fileSystemObserver = fileSystemObserver
def dispatch(self, event):
FileSystemEventHandler.dispatch(self, event)
def on_any_event(self, event):
FileSystemEventHandler.on_any_event(self, event)
def on_created(self, event):
FileSystemEventHandler.on_created(self, event)
def on_deleted(self, event):
FileSystemEventHandler.on_deleted(self, event)
def on_modified(self, event):
FileSystemEventHandler.on_modified(self, event)
path = event.src_path
self.logger.debug('File system modified event: %s' % (event.__dict__))
if not event.is_directory:
self.fileSystemObserver.observedFileUpdated(path)
def on_moved(self, event):
FileSystemEventHandler.on_moved(self, event)
####################################################################
# Testing
if __name__ == '__main__':
import sys
import time
import logging
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.observers.api import ObservedWatch
from watchdog.observers.api import EventQueue
from watchdog.observers.api import EventEmitter
from watchdog.events import LoggingEventHandler
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
path = sys.argv[1] if len(sys.argv) > 1 else '.'
eventHandler = DmFileSystemEventHandler()
observer = PollingObserver()
observedWatch = observer.schedule(eventHandler, path, recursive=True)
print 'OBSERVED WATCH: ', observedWatch
#observer.add_handler_for_watch(eventHandler2, observedWatch)
#observer._clear_emitters()
print observer.emitters
observer.start()
try:
while True:
time.sleep(1)
print time.time()
except KeyboardInterrupt:
observer.stop()
observer.join()
#!/usr/bin/env python
#
# Implementation for experiment session controller.
#
import time
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
def __init__(self):
DmObjectManager.__init__(self)
self.experimentDict = {}
def startDaq(self, name, dataDirectory):
FileSystemObserver.getInstance().startObservingPath(dataDirectory)
startTime = time.time()
experiment = Experiment({'name' : name, 'dataDirectory' : dataDirectory, 'daqStartTime' : startTime})
self.experimentDict[name] = experiment
return experiment
def stopDaq(self, name):
experiment = self.experimentDict.get(name)
if experiment is None or experiment.get('daqEndTime') is not None:
raise InvalidRequest('Experiment %s is not active.' % name)
dataDirectory = experiment.get('dataDirectory')
FileSystemObserver.getInstance().stopObservingPath(dataDirectory)
experiment['daqEndTime'] = time.time()
return experiment
#!/usr/bin/env python
import threading
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.threadSafeQueue import ThreadSafeQueue
from dm.common.utility.singleton import Singleton
from dm.daq_web_service.service.impl.fileProcessingThread import FileProcessingThread
class FileProcessingManager(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileProcessingManager'
N_PROCESSING_THREADS_KEY = 'nprocessingthreads'
FILE_PROCESSOR_KEY = 'fileprocessor'
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
FileProcessingManager.__instanceLock.acquire()
try:
if FileProcessingManager.__instance:
return
FileProcessingManager.__instance = self
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.fileProcessingThreadDict = {}
self.eventFlag = threading.Event()
self.fileProcessorDict = {}
self.fileProcessorKeyList = []
self.fileProcessingQueue = ThreadSafeQueue()
self.processedFileDict = {}
self.unprocessedFileDict = {}
self.__configure()
self.logger.debug('Initialization complete')
finally:
FileProcessingManager.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.N_PROCESSING_THREADS_KEY))
# Create processors
for (key,value) in configItems:
if key.startswith(FileProcessingManager.FILE_PROCESSOR_KEY):
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value)
self.logger.debug('Creating file processor instance of class %s' % className)
fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList = self.fileProcessorDict.keys()
self.fileProcessorKeyList.sort()
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
def processObservedFile(self, observedFile):
self.fileProcessingQueue.push(observedFile)
self.eventFlag.set()
def start(self):
self.lock.acquire()
try:
self.logger.debug('Starting file processing threads')
for i in range(0, self.nProcessingThreads):
tName = 'FileProcessingThread-%s' % i
t = FileProcessingThread(tName, self.eventFlag, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue, self.processedFileDict, self.unprocessedFileDict)
t.start()
self.fileProcessingThreadDict[tName] = t
finally:
self.lock.release()
def stop(self):
self.lock.acquire()
try:
self.logger.debug('Stopping file processing threads')
for (tName, t) in self.fileProcessingThreadDict.items():
t.stop()
self.eventFlag.set()
for (tName, t) in self.fileProcessingThreadDict.items():
t.join()
finally:
self.lock.release()
def setEvent(self):
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
def clearEvent(self):
self.lock.acquire()
try:
self.eventFlag.clear()
finally:
self.lock.release()
####################################################################
# Testing
if __name__ == '__main__':
fp = FileProcessingManager.getInstance()
print fp
#fp.start()
#time.sleep(30)
#fp.stop()
#!/usr/bin/env python
import threading
from dm.common.utility.loggingManager import LoggingManager
class FileProcessingThread(threading.Thread):
def __init__ (self, name, eventFlag, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict):
threading.Thread.__init__(self)
self.setName(name)
self.exitFlag = False
self.eventFlag = eventFlag
self.fileProcessorDict = fileProcessorDict
self.fileProcessorKeyList = fileProcessorKeyList
self.fileProcessingQueue = fileProcessingQueue
self.processedFileDict = processedFileDict
self.unprocessedFileDict = unprocessedFileDict
self.logger = LoggingManager.getInstance().getLogger(name)
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
while True:
if self.exitFlag:
self.logger.debug('Exit flag set, %s done' % self.getName())
break
while True:
file = self.fileProcessingQueue.pop()
if file is None:
break
path = file.getPath()
try:
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
processorName = processor.__class__.__name__
self.logger.debug('%s is about to process file %s ' % (processorName, file))
try:
processor.processFile(path)
self.logger.debug('%s processed file %s ' % (processorName, file))
except Exception, ex:
self.logger.exception(ex)
self.logger.debug('%s processing failed for file %s ' % (processorName, file))
file[processorName] = {'error' : ex}
self.unprocessedFileDict[path] = file
except Exception, ex:
self.logger.exception(ex)
self.eventFlag.wait()
def stop(self):
self.exitFlag = True
####################################################################
# Testing
if __name__ == '__main__':
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment