Skip to content
Snippets Groups Projects
Commit 6c0e1f50 authored by sveseli's avatar sveseli
Browse files

added 0MQ interface to workflow service

parent f2fc914a
No related branches found
No related tags found
No related merge requests found
- DS service
- runs on extrepid
- DAQ service
- APSU station, runs on ctlsdaqsrv2
- Catalog service
- APSU station, runs on ctlsdaqsrv2
- Processing (Workflow) service
- can run workflows for each file, or for set of files, in real-time
- RT workflow args can be set via PV
- DAQ needs to send burst number (e.g, file 3/10)
- can always request workflow processing on a set of files (post processing)
- need portal that will be able to submit processing job
- need to be able to keep track of processing jobs and their results
- DM/CDB integration plugin
- triggered via CDB:<qrId> keyword
- adds DM-document property, will have metadata catalog URL,
file download URL, metadata key/value pairs as needed
- must be able to associate multiple qrID's
- CDB Document domain
- CDB/DM integration plugin: needs to be able to click on the file to see the
metadata, or to download the file
- DM/SDDS plugin
- triggered by ".sdds"
- reads parameters, converts them to metadata
- Use Case: StudiesSession-20170422
- From CDB perspective StudiesSession-20170422 is an item in the
Documents Domain
- From DM perspective, StudiesSession-20170422 is an experiment on an APSU
station
- When collecting DAQ data, DM will automatically asign all files to be
part of the StudiesSession-20170422 experiment
- DM/CDB plugin will add DataFile property to StudiesSession-20170422 for
each file
- CDB/DM file property handler will be able to display DM metadata (URL link)
and download file; later, it may be be able to submit processing job
to DM processing service
#!/usr/bin/env python
import threading
import time
import json
import os
import uuid
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.threadSafeQueue import ThreadSafeQueue
from dm.common.utility.singleton import Singleton
from requestHandlerThread import RequestHandlerThread
class RequestHandler(Singleton):
CONFIG_SECTION_NAME = 'RequestHandler'
NUMBER_OF_HANDLER_THREADS_KEY = 'numberofhandlerthreads'
DEFAULT_NUMBER_OF_HANDLER_THREADS = 3
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
RequestHandler.__instanceLock.acquire()
try:
if RequestHandler.__instance:
return
RequestHandler.__instance = self
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.eventFlag = threading.Event()
self.requestHandlerThreadDict = {}
self.requestQueue = ThreadSafeQueue()
self.__configure()
self.logger.debug('Initialization complete')
finally:
RequestHandler.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(RequestHandler.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.nHandlerThreads = int(cm.getConfigOption(RequestHandler.CONFIG_SECTION_NAME, RequestHandler.NUMBER_OF_HANDLER_THREADS_KEY, RequestHandler.DEFAULT_NUMBER_OF_HANDLER_THREADS))
def queueRequest(self, request):
requestId = str(uuid.uuid4())
self.requestQueue.push(request)
self.eventFlag.set()
return json.dumps({'requestId' : requestId})
def start(self):
self.lock.acquire()
try:
self.logger.debug('Starting Request Handler threads')
for i in range(0, self.nHandlerThreads):
tName = 'RequestHandlerThread-%s' % i
t = RequestHandlerThread(tName, self, self.requestQueue)
t.start()
self.requestHandlerThreadDict[tName] = t
finally:
self.lock.release()
def stop(self):
self.logger.debug('Stopping Request Handler threads')
for (tName, t) in self.requestHandlerThreadDict.items():
t.stop()
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
for (tName, t) in self.requestHandlerThreadDict.items():
t.join()
def setEvent(self):
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
def clearEvent(self):
self.lock.acquire()
try:
self.eventFlag.clear()
finally:
self.lock.release()
def waitOnEvent(self, timeoutInSeconds=None):
self.eventFlag.wait(timeoutInSeconds)
####################################################################
# Testing
if __name__ == '__main__':
rh = RequestHandler.getInstance()
print rh
rh.start()
time.sleep(30)
rh.stop()
#!/usr/bin/env python
import threading
import time
from dm.common.utility.loggingManager import LoggingManager
class RequestHandlerThread(threading.Thread):
THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0
def __init__ (self, name, requestHandler, requestQueue):
threading.Thread.__init__(self)
self.setName(name)
self.exitFlag = False
self.requestHandler = requestHandler
self.requestQueue = requestQueue
self.logger = LoggingManager.getInstance().getLogger(name)
def processRequest(self, request):
if not request:
self.logger.warn('Refusing to process empty request')
return
try:
self.logger.debug('Processing request: %s' % request)
except Exception, ex:
self.logger.exception(ex)
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
while True:
self.requestHandler.clearEvent()
if self.exitFlag:
self.logger.debug('Exit flag is set')
break
while True:
try:
request = self.requestQueue.pop()
if request is None:
break
self.logger.debug('Request queue depth after pop: %s', self.requestQueue.getLength())
self.processRequest(request)
except Exception, ex:
self.logger.exception(ex)
self.requestHandler.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS)
self.logger.debug('%s is done' % self.getName())
def stop(self):
self.exitFlag = True
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
import zmq
import json
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://localhost:55537')
# Do 10 requests, waiting each time for a response
for request in range(1,11):
dict = {'reqNo' : request}
socket.send_json(json.dumps(dict))
message = socket.recv_json()
print("Received reply %s [%s]" % (request, message))
#!/usr/bin/env python
import threading
import time
import zmq
from dm.common.exceptions.dmException import DmException
from dm.common.exceptions.internalError import InternalError
from dm.common.utility.loggingManager import LoggingManager
class ZmqListener(threading.Thread):
THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0
def __init__ (self, serviceName, serviceUrl, requestHandler=None):
threading.Thread.__init__(self)
self.setName(serviceName)
self.serviceName = serviceName
self.serviceUrl = serviceUrl
self.exitFlag = False
self.requestHandler = requestHandler
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.__configure()
@classmethod
def getLogger(cls):
logger = LoggingManager.getInstance().getLogger(cls.__name__)
return logger
# Exception decorator for all exposed method calls
@classmethod
def execute(cls, func):
def decorate(*args, **kwargs):
try:
response = func(*args, **kwargs)
except DmException, ex:
cls.getLogger().error('%s' % ex)
response = ex
except Exception, ex:
cls.getLogger().error('%s' % ex)
response = InternalError(ex)
return response.getFullJsonRep()
return decorate
def __configure(self):
pass
def start(self):
context = zmq.Context()
self.socket = context.socket(zmq.REP)
self.socket.bind(self.serviceUrl)
threading.Thread.start(self)
def handleRequest(self, request):
response = ''
if self.requestHandler:
response = self.requestHandler.queueRequest(request)
return response
def run(self):
self.logger.debug('Starting ZMQ listener %s on URL %s' % (self.getName(), self.serviceUrl))
while True:
if self.exitFlag:
self.logger.debug('Exit flag is set')
break
try:
rList,wList,xList = zmq.select([self.socket],[],[self.socket], self.THREAD_EVENT_TIMEOUT_IN_SECONDS)
if len(rList) or len(xList):
request = self.socket.recv_json()
response = self.handleRequest(request)
self.socket.send_json(response)
else:
#self.logger.debug('ZMQ read timeout')
pass
except Exception, ex:
self.logger.exception(ex)
self.logger.debug('%s is done' % self.getName())
def stop(self):
self.exitFlag = True
####################################################################
# Testing
if __name__ == '__main__':
from requestHandler import RequestHandler
handler = RequestHandler()
handler.start()
listener = ZmqListener('WorkflowService', 'tcp://*:55537', handler)
listener.start()
time.sleep(30)
listener.stop()
handler.stop()
#!/usr/bin/env python
import threading
import time
import json
import os
import uuid
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.threadSafeQueue import ThreadSafeQueue
from dm.common.zeromq.zmqListener import ZmqListener
from dm.common.mongodb.api.workflowMongoDbApi import WorkflowMongoDbApi
from dm.common.objects.processingJob import ProcessingJob
from dm.common.exceptions.invalidRequest import InvalidRequest
from executionEngine import ExecutionEngine
class WorkflowZmqListener(ZmqListener):
CONFIG_SECTION_NAME = 'WorkflowZmqListener'
ZMQ_URL_KEY = 'zmqurl'
ZMQ_NAME_KEY = 'zmqname'
DEFAULT_ZMQ_URL = 'tcp://*:55537'
DEFAULT_ZMQ_NAME = 'WorkflowService'
def __init__(self):
self.__configure()
ZmqListener.__init__(self, self.zmqName, self.zmqUrl)
self.workflowMongoDbApi = WorkflowMongoDbApi()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(self.CONFIG_SECTION_NAME)
self.zmqUrl = cm.getConfigOption(self.CONFIG_SECTION_NAME, self.ZMQ_URL_KEY, self.DEFAULT_ZMQ_URL)
self.zmqName = cm.getConfigOption(self.CONFIG_SECTION_NAME, self.ZMQ_NAME_KEY, self.DEFAULT_ZMQ_NAME)
@ZmqListener.execute
def handleRequest(self, request):
self.logger.debug('Got request: %s' % request)
methodName = request.get('methodName')
if not methodName:
raise InvalidRequest('Missing method name.')
if methodName != 'startProcessingJob':
raise InvalidRequest('Unsupported method %s.' % methodName)
workflowOwner = request.get('workflowOwner')
if not workflowOwner:
raise InvalidRequest('Missing workflow owner.')
workflowName = request.get('workflowName')
if not workflowName:
raise InvalidRequest('Missing workflow name.')
processingJobOwner = request.get('processingJobOwner')
if not processingJobOwner :
raise InvalidRequest('Missing processing job owner.')
argsDict = request.get('argsDict', {})
workflow = self.workflowMongoDbApi.getWorkflowByName(workflowOwner, workflowName)
ExecutionEngine.verifyWorkflow(workflow.data)
processingJob = ProcessingJob(argsDict)
processingJob['workflow'] = workflow
processingJob['owner'] = processingJobOwner
return ExecutionEngine.getInstance().startProcessingJob(processingJob)
####################################################################
# Testing
if __name__ == '__main__':
listener = WorkflowZmqListener()
listener.start()
time.sleep(30)
listener.stop()
...@@ -8,6 +8,7 @@ from dm.common.service.dmRestWebServiceBase import DmRestWebServiceBase ...@@ -8,6 +8,7 @@ from dm.common.service.dmRestWebServiceBase import DmRestWebServiceBase
from dm.common.utility.dmModuleManager import DmModuleManager from dm.common.utility.dmModuleManager import DmModuleManager
from dm.common.utility.configurationManager import ConfigurationManager from dm.common.utility.configurationManager import ConfigurationManager
from dm.proc_web_service.service.impl.executionEngine import ExecutionEngine from dm.proc_web_service.service.impl.executionEngine import ExecutionEngine
from dm.proc_web_service.service.impl.workflowZmqListener import WorkflowZmqListener
from procWebServiceRouteMapper import ProcWebServiceRouteMapper from procWebServiceRouteMapper import ProcWebServiceRouteMapper
class ProcWebService(DmRestWebServiceBase): class ProcWebService(DmRestWebServiceBase):
...@@ -21,6 +22,7 @@ class ProcWebService(DmRestWebServiceBase): ...@@ -21,6 +22,7 @@ class ProcWebService(DmRestWebServiceBase):
# Add modules that will be started. # Add modules that will be started.
moduleManager = DmModuleManager.getInstance() moduleManager = DmModuleManager.getInstance()
moduleManager.addModule(ExecutionEngine.getInstance()) moduleManager.addModule(ExecutionEngine.getInstance())
moduleManager.addModule(WorkflowZmqListener())
self.logger.debug('Initialized dm modules') self.logger.debug('Initialized dm modules')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment