diff --git a/src/python/dm/daq_web_service/__init__.py b/src/python/dm/daq_web_service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/python/dm/daq_web_service/api/__init__.py b/src/python/dm/daq_web_service/api/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/python/dm/daq_web_service/api/daqRestApi.py b/src/python/dm/daq_web_service/api/daqRestApi.py new file mode 100755 index 0000000000000000000000000000000000000000..69d943d33023380cf5cc916baf90d2d8bd24b57b --- /dev/null +++ b/src/python/dm/daq_web_service/api/daqRestApi.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +from dm.common.api.dmRestApi import DmRestApi + +class DaqRestApi(DmRestApi): + """ Base DAQ DM REST api class. """ + + def __init__(self, username=None, password=None, host=None, port=None, protocol=None): + if host == None: + host = self.configurationManager.getDaqWebServiceHost() + if port == None: + port = self.configurationManager.getDaqWebServicePort() + DmRestApi.__init__(self, username, password, host, port, protocol) + +####################################################################### +# Testing. + +if __name__ == '__main__': + api = DaqRestApi('sveseli', 'sveseli') + #api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list', 'GET', data='') + import urllib + from dm.common.utility.configurationManager import ConfigurationManager + cm = ConfigurationManager.getInstance() + cm.setSessionCacheFile('/tmp/session') + #print 'Non-session request' + #print api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list?path=/tmp', 'GET') + print 'Session request' + data = { 'path' : '/tmp/xyz' } + #print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write?path=/tmp/xyz&content=xyz', 'POST', contentType='application/x-www-form-urlencoded', data=urllib.urlencode(data)) + #print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', data=data) + postdata='path=/tmp/xyz' + postdata+='&content=%s' % urllib.quote_plus('Hey there') + print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', contentType='application/x-www-form-urlencoded', data=postdata) + + + diff --git a/src/python/dm/daq_web_service/api/experimentRestApi.py b/src/python/dm/daq_web_service/api/experimentRestApi.py new file mode 100755 index 0000000000000000000000000000000000000000..036f940379e6a5f965beb2d8fa880e7bdb034f8f --- /dev/null +++ b/src/python/dm/daq_web_service/api/experimentRestApi.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python + +import os +import urllib + +from dm.common.utility.encoder import Encoder +from dm.common.exceptions.dmException import DmException +from dm.common.objects.experiment import Experiment +from daqRestApi import DaqRestApi + +class ExperimentRestApi(DaqRestApi): + + def __init__(self, username=None, password=None, host=None, port=None, protocol=None): + DaqRestApi.__init__(self, username, password, host, port, protocol) + + @DaqRestApi.execute + def startDaq(self, name, dataDirectory): + url = '%s/experiments/startDaq' % (self.getContextRoot()) + if name is None or not len(name): + raise InvalidRequest('Experiment name must be provided.') + url += '?name=%s' % Encoder.encode(name) + if dataDirectory is None or not len(dataDirectory): + raise InvalidRequest('Experiment data directory must be provided.') + url += '&dataDirectory=%s' % Encoder.encode(dataDirectory) + responseDict = self.sendSessionRequest(url=url, method='POST') + return Experiment(responseDict) + + @DaqRestApi.execute + def stopDaq(self, name): + url = '%s/experiments/stopDaq' % (self.getContextRoot()) + if name is None or not len(name): + raise InvalidRequest('Experiment name must be provided.') + url += '?name=%s' % Encoder.encode(name) + responseDict = self.sendSessionRequest(url=url, method='POST') + return Experiment(responseDict) + +####################################################################### +# Testing. + +if __name__ == '__main__': + api = ExperimentRestApi('sveseli', 'sveseli', 'zagreb.svdev.net', 33336, 'http') + print api.startDaq('experiment1', '/tmp/data/experiment1') + diff --git a/src/python/dm/daq_web_service/cli/__init__.py b/src/python/dm/daq_web_service/cli/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/python/dm/daq_web_service/cli/daqWebServiceCli.py b/src/python/dm/daq_web_service/cli/daqWebServiceCli.py new file mode 100755 index 0000000000000000000000000000000000000000..1651a3f3cd13639544c25ab40ea179cf12f87a81 --- /dev/null +++ b/src/python/dm/daq_web_service/cli/daqWebServiceCli.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python + +from dm.common.cli.dmRestCli import DmRestCli +from dm.common.utility.configurationManager import ConfigurationManager + +class DaqWebServiceCli(DmRestCli): + """ DM DAQ web service cli class. """ + + def __init__(self, validArgCount=0): + DmRestCli.__init__(self, validArgCount) + + def getDefaultServiceHost(self): + return ConfigurationManager.getInstance().getDaqWebServiceHost() + + def getDefaultServicePort(self): + return ConfigurationManager.getInstance().getDaqWebServicePort() + diff --git a/src/python/dm/daq_web_service/cli/daqWebServiceSessionCli.py b/src/python/dm/daq_web_service/cli/daqWebServiceSessionCli.py new file mode 100755 index 0000000000000000000000000000000000000000..c7f02f28ada895d08374af4f8118bb8e0cbbf19f --- /dev/null +++ b/src/python/dm/daq_web_service/cli/daqWebServiceSessionCli.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +from dm.common.cli.dmRestSessionCli import DmRestSessionCli +from dm.common.utility.osUtility import OsUtility +from dm.common.utility.configurationManager import ConfigurationManager + +class DaqWebServiceSessionCli(DmRestSessionCli): + """ DM DAQ web service session cli class. """ + + DEFAULT_SESSION_CACHE_FILE = OsUtility.getUserHomeDir() + '/.dm/.daq.session.cache' + + def __init__(self, validArgCount=0): + DmRestSessionCli.__init__(self, validArgCount) + ConfigurationManager.getInstance().setSessionCacheFile(DaqWebServiceSessionCli.DEFAULT_SESSION_CACHE_FILE) + + def getDefaultServiceHost(self): + return ConfigurationManager.getInstance().getDaqWebServiceHost() + + def getDefaultServicePort(self): + return ConfigurationManager.getInstance().getDaqWebServicePort() + diff --git a/src/python/dm/daq_web_service/cli/startDaqCli.py b/src/python/dm/daq_web_service/cli/startDaqCli.py new file mode 100755 index 0000000000000000000000000000000000000000..81af4eb27c21696b1e0de081a745a65bdf296df8 --- /dev/null +++ b/src/python/dm/daq_web_service/cli/startDaqCli.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi +from dm.common.exceptions.invalidRequest import InvalidRequest +from daqWebServiceSessionCli import DaqWebServiceSessionCli + +class StartDaqCli(DaqWebServiceSessionCli): + def __init__(self): + DaqWebServiceSessionCli.__init__(self) + self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') + self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory.') + + def checkArgs(self): + if self.options.experimentName is None: + raise InvalidRequest('Experiment name must be provided.') + if self.options.dataDirectory is None: + raise InvalidRequest('Experiment data directory must be provided.') + + def getExperimentName(self): + return self.options.experimentName + + def getDataDirectory(self): + return self.options.dataDirectory + + def runCommand(self): + self.parseArgs(usage=""" + dm-start-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY + +Description: + Starts DAQ for a given experiment. Provided data directory will be monitored for data files. + """) + self.checkArgs() + api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) + experiment = api.startDaq(self.getExperimentName(), self.getDataDirectory()) + print experiment.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) + +####################################################################### +# Run command. +if __name__ == '__main__': + cli = StartDaqCli() + cli.run() + diff --git a/src/python/dm/daq_web_service/cli/stopDaqCli.py b/src/python/dm/daq_web_service/cli/stopDaqCli.py new file mode 100755 index 0000000000000000000000000000000000000000..892b2d78789d7897a2be779dd2b95ef2cad5a75f --- /dev/null +++ b/src/python/dm/daq_web_service/cli/stopDaqCli.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi +from dm.common.exceptions.invalidRequest import InvalidRequest +from daqWebServiceSessionCli import DaqWebServiceSessionCli + +class StopDaqCli(DaqWebServiceSessionCli): + def __init__(self): + DaqWebServiceSessionCli.__init__(self) + self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') + + def checkArgs(self): + if self.options.experimentName is None: + raise InvalidRequest('Experiment name must be provided.') + + def getExperimentName(self): + return self.options.experimentName + + def runCommand(self): + self.parseArgs(usage=""" + dm-stop-daq --experiment=EXPERIMENTNAME + +Description: + Stop DAQ for a given experiment. + """) + self.checkArgs() + api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) + experiment = api.stopDaq(self.getExperimentName()) + print experiment.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) + +####################################################################### +# Run command. +if __name__ == '__main__': + cli = StopDaqCli() + cli.run() + diff --git a/src/python/dm/daq_web_service/cli/uploadDataCli.py b/src/python/dm/daq_web_service/cli/uploadDataCli.py new file mode 100755 index 0000000000000000000000000000000000000000..79371f689dfc3980933ae8b49f1bca9a8c0fbdbe --- /dev/null +++ b/src/python/dm/daq_web_service/cli/uploadDataCli.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +from daqWebServiceSessionCli import DaqWebServiceSessionCli +from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi +from dm.common.exceptions.invalidRequest import InvalidRequest + +class UploadDataCli(DaqWebServiceSessionCli): + def __init__(self): + DaqWebServiceCli.__init__(self) + self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') + self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory.') + + def checkArgs(self): + if self.options.experimentName is None: + raise InvalidRequest('Experiment name must be provided.') + if self.options.dataDirectory is None: + raise InvalidRequest('Experiment data directory must be provided.') + + def getExperimentName(self): + return self.options.experimentName + + def getDataDirectory(self): + return self.options.dataDirectory + + def runCommand(self): + self.parseArgs(usage=""" + dm-upload-data --name=EXPERIMENTNAME --data-directory=DATADIRECTORY + +Description: + Schedules data upload for a given experiment. + """) + self.checkArgs() + api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) + experiment = api.uploadData(self.getExperimentName(), self.getDataDirectory()) + print experiment.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) + +####################################################################### +# Run command. +if __name__ == '__main__': + cli = UploadDataCli() + cli.run() + diff --git a/src/python/dm/daq_web_service/service/__init__.py b/src/python/dm/daq_web_service/service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/python/dm/daq_web_service/service/daqWebService.py b/src/python/dm/daq_web_service/service/daqWebService.py new file mode 100755 index 0000000000000000000000000000000000000000..47be57fff4ec369f64704e9541ad29ed431aee75 --- /dev/null +++ b/src/python/dm/daq_web_service/service/daqWebService.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python + +# +# DM DAQ Web Service +# + +from dm.common.service.dmRestWebServiceBase import DmRestWebServiceBase +from dm.common.utility.dmModuleManager import DmModuleManager +from dm.common.utility.configurationManager import ConfigurationManager +from dm.daq_web_service.service.impl.fileSystemObserver import FileSystemObserver +from dm.daq_web_service.service.impl.fileProcessingManager import FileProcessingManager +from daqWebServiceRouteMapper import DaqWebServiceRouteMapper + +class DaqWebService(DmRestWebServiceBase): + + def __init__(self): + DmRestWebServiceBase.__init__(self, DaqWebServiceRouteMapper) + + def initDmModules(self): + self.logger.debug('Initializing dm modules') + + # Add modules that will be started. + moduleManager = DmModuleManager.getInstance() + moduleManager.addModule(FileSystemObserver.getInstance()) + moduleManager.addModule(FileProcessingManager.getInstance()) + self.logger.debug('Initialized dm modules') + + def getDefaultServerHost(self): + return ConfigurationManager.getInstance().getServiceHost() + + def getDefaultServerPort(self): + return ConfigurationManager.getInstance().getServicePort() + +#################################################################### +# Run service + +if __name__ == '__main__': + ConfigurationManager.getInstance().setServiceName('daq-web-service') + service = DaqWebService(); + service.run() diff --git a/src/python/dm/daq_web_service/service/daqWebServiceRouteMapper.py b/src/python/dm/daq_web_service/service/daqWebServiceRouteMapper.py new file mode 100755 index 0000000000000000000000000000000000000000..2474654d35f5257f83fc68c0b8d0b558631e5081 --- /dev/null +++ b/src/python/dm/daq_web_service/service/daqWebServiceRouteMapper.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +# +# Route mapper for DM DAQ web service. +# + +import sys +import os + +import cherrypy +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.configurationManager import ConfigurationManager +from dm.common.service.loginRouteDescriptor import LoginRouteDescriptor +from experimentRouteDescriptor import ExperimentRouteDescriptor + +class DaqWebServiceRouteMapper: + + @classmethod + def setupRoutes(cls): + """ Setup RESTFul routes. """ + logger = LoggingManager.getInstance().getLogger(cls.__name__) + contextRoot = ConfigurationManager.getInstance().getContextRoot() + logger.debug('Using context root: %s' % contextRoot) + + # Get routes. + routes = LoginRouteDescriptor.getRoutes() + routes += ExperimentRouteDescriptor.getRoutes() + + # Add routes to dispatcher. + d = cherrypy.dispatch.RoutesDispatcher() + for route in routes: + logger.debug('Connecting route: %s' % route) + d.connect(route['name'], route['path'], action=route['action'], controller=route['controller'], conditions=dict(method=route['method'])) + return d + + diff --git a/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py new file mode 100755 index 0000000000000000000000000000000000000000..615e072c7dfd98758a141c6acdfbddac272728ab --- /dev/null +++ b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python + +# +# User route descriptor. +# + +from dm.common.utility.configurationManager import ConfigurationManager +from experimentSessionController import ExperimentSessionController + +class ExperimentRouteDescriptor: + + @classmethod + def getRoutes(cls): + contextRoot = ConfigurationManager.getInstance().getContextRoot() + + # Static instances shared between different routes + experimentSessionController = ExperimentSessionController() + + # Define routes. + routes = [ + + # Start experiment daq + { + 'name' : 'startDaq', + 'path' : '%s/experiments/startDaq' % contextRoot, + 'controller' : experimentSessionController, + 'action' : 'startDaq', + 'method' : ['POST'] + }, + + # Stop experiment daq + { + 'name' : 'stopDaq', + 'path' : '%s/experiments/stopDaq' % contextRoot, + 'controller' : experimentSessionController, + 'action' : 'stopDaq', + 'method' : ['POST'] + }, + + ] + + return routes + + diff --git a/src/python/dm/daq_web_service/service/experimentSessionController.py b/src/python/dm/daq_web_service/service/experimentSessionController.py new file mode 100755 index 0000000000000000000000000000000000000000..bf36488e3658d36d6256ee294e546f3b4497d63c --- /dev/null +++ b/src/python/dm/daq_web_service/service/experimentSessionController.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +import cherrypy + +from dm.common.service.dmSessionController import DmSessionController +from dm.common.exceptions.invalidRequest import InvalidRequest +from dm.common.utility.encoder import Encoder + +from dm.daq_web_service.service.impl.experimentSessionControllerImpl import ExperimentSessionControllerImpl + + +class ExperimentSessionController(DmSessionController): + + def __init__(self): + DmSessionController.__init__(self) + self.experimentSessionControllerImpl = ExperimentSessionControllerImpl() + + @cherrypy.expose + @DmSessionController.require(DmSessionController.isLoggedIn()) + @DmSessionController.execute + def startDaq(self, **kwargs): + name = kwargs.get('name') + if name is None or not len(name): + raise InvalidRequest('Missing experiment name.') + name = Encoder.decode(name) + dataDirectory = kwargs.get('dataDirectory') + if dataDirectory is None or not len(dataDirectory): + raise InvalidRequest('Missing experiment data directory.') + dataDirectory = Encoder.decode(dataDirectory) + response = self.experimentSessionControllerImpl.startDaq(name, dataDirectory).getFullJsonRep() + self.logger.debug('Returning: %s' % response) + return response + + @cherrypy.expose + @DmSessionController.require(DmSessionController.isLoggedIn()) + @DmSessionController.execute + def stopDaq(self, **kwargs): + name = kwargs.get('name') + if name is None or not len(name): + raise InvalidRequest('Missing experiment name.') + name = Encoder.decode(name) + response = self.experimentSessionControllerImpl.stopDaq(name).getFullJsonRep() + self.logger.debug('Returning: %s' % response) + return response + diff --git a/src/python/dm/daq_web_service/service/impl/__init__.py b/src/python/dm/daq_web_service/service/impl/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py b/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py new file mode 100755 index 0000000000000000000000000000000000000000..74a7d4c4fd75d2e8425c439a3942201768f1dbf9 --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python + +import os + +from watchdog.events import FileSystemEventHandler + +from dm.common.utility.loggingManager import LoggingManager + +class DmFileSystemEventHandler(FileSystemEventHandler): + + def __init__(self, fileSystemObserver): + FileSystemEventHandler.__init__(self) + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + self.fileSystemObserver = fileSystemObserver + + def dispatch(self, event): + FileSystemEventHandler.dispatch(self, event) + + def on_any_event(self, event): + FileSystemEventHandler.on_any_event(self, event) + + def on_created(self, event): + FileSystemEventHandler.on_created(self, event) + + def on_deleted(self, event): + FileSystemEventHandler.on_deleted(self, event) + + def on_modified(self, event): + FileSystemEventHandler.on_modified(self, event) + path = event.src_path + self.logger.debug('File system modified event: %s' % (event.__dict__)) + if not event.is_directory: + self.fileSystemObserver.observedFileUpdated(path) + + def on_moved(self, event): + FileSystemEventHandler.on_moved(self, event) + +#################################################################### +# Testing + +if __name__ == '__main__': + import sys + import time + import logging + from watchdog.observers import Observer + from watchdog.observers.polling import PollingObserver + from watchdog.observers.api import ObservedWatch + from watchdog.observers.api import EventQueue + from watchdog.observers.api import EventEmitter + from watchdog.events import LoggingEventHandler + logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + path = sys.argv[1] if len(sys.argv) > 1 else '.' + eventHandler = DmFileSystemEventHandler() + + observer = PollingObserver() + observedWatch = observer.schedule(eventHandler, path, recursive=True) + print 'OBSERVED WATCH: ', observedWatch + + #observer.add_handler_for_watch(eventHandler2, observedWatch) + #observer._clear_emitters() + print observer.emitters + + observer.start() + try: + while True: + time.sleep(1) + print time.time() + except KeyboardInterrupt: + observer.stop() + observer.join() diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py new file mode 100755 index 0000000000000000000000000000000000000000..1c9da104f3fa21508cebaa40d0a4e9d679f23747 --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +# +# Implementation for experiment session controller. +# + +import time + +from dm.common.objects.experiment import Experiment +from dm.common.objects.dmObjectManager import DmObjectManager +from dm.common.exceptions.invalidRequest import InvalidRequest +from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory +from fileSystemObserver import FileSystemObserver + +class ExperimentSessionControllerImpl(DmObjectManager): + """ Experiment session controller implementation class. """ + + def __init__(self): + DmObjectManager.__init__(self) + self.experimentDict = {} + + def startDaq(self, name, dataDirectory): + FileSystemObserver.getInstance().startObservingPath(dataDirectory) + startTime = time.time() + experiment = Experiment({'name' : name, 'dataDirectory' : dataDirectory, 'daqStartTime' : startTime}) + self.experimentDict[name] = experiment + return experiment + + def stopDaq(self, name): + experiment = self.experimentDict.get(name) + if experiment is None or experiment.get('daqEndTime') is not None: + raise InvalidRequest('Experiment %s is not active.' % name) + dataDirectory = experiment.get('dataDirectory') + FileSystemObserver.getInstance().stopObservingPath(dataDirectory) + experiment['daqEndTime'] = time.time() + return experiment diff --git a/src/python/dm/daq_web_service/service/impl/fileProcessingManager.py b/src/python/dm/daq_web_service/service/impl/fileProcessingManager.py new file mode 100755 index 0000000000000000000000000000000000000000..7e24dfa7ecdec98ef569441ce2f135719fa8159a --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/fileProcessingManager.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python + +import threading +import time + +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.configurationManager import ConfigurationManager +from dm.common.utility.objectUtility import ObjectUtility +from dm.common.utility.threadSafeQueue import ThreadSafeQueue +from dm.common.utility.singleton import Singleton +from dm.daq_web_service.service.impl.fileProcessingThread import FileProcessingThread + +class FileProcessingManager(threading.Thread,Singleton): + + CONFIG_SECTION_NAME = 'FileProcessingManager' + N_PROCESSING_THREADS_KEY = 'nprocessingthreads' + FILE_PROCESSOR_KEY = 'fileprocessor' + + # Singleton. + __instanceLock = threading.RLock() + __instance = None + + def __init__(self): + FileProcessingManager.__instanceLock.acquire() + try: + if FileProcessingManager.__instance: + return + FileProcessingManager.__instance = self + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + + self.logger.debug('Initializing') + self.lock = threading.RLock() + self.fileProcessingThreadDict = {} + self.eventFlag = threading.Event() + self.fileProcessorDict = {} + self.fileProcessorKeyList = [] + self.fileProcessingQueue = ThreadSafeQueue() + self.processedFileDict = {} + self.unprocessedFileDict = {} + self.__configure() + self.logger.debug('Initialization complete') + finally: + FileProcessingManager.__instanceLock.release() + + def __configure(self): + cm = ConfigurationManager.getInstance() + configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME) + self.logger.debug('Got config items: %s' % configItems) + self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.N_PROCESSING_THREADS_KEY)) + + # Create processors + for (key,value) in configItems: + if key.startswith(FileProcessingManager.FILE_PROCESSOR_KEY): + (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value) + self.logger.debug('Creating file processor instance of class %s' % className) + fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor) + self.fileProcessorDict[key] = fileProcessor + self.fileProcessorKeyList = self.fileProcessorDict.keys() + self.fileProcessorKeyList.sort() + self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList) + + def processObservedFile(self, observedFile): + self.fileProcessingQueue.push(observedFile) + self.eventFlag.set() + + def start(self): + self.lock.acquire() + try: + self.logger.debug('Starting file processing threads') + for i in range(0, self.nProcessingThreads): + tName = 'FileProcessingThread-%s' % i + t = FileProcessingThread(tName, self.eventFlag, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue, self.processedFileDict, self.unprocessedFileDict) + t.start() + self.fileProcessingThreadDict[tName] = t + finally: + self.lock.release() + + def stop(self): + self.lock.acquire() + try: + self.logger.debug('Stopping file processing threads') + for (tName, t) in self.fileProcessingThreadDict.items(): + t.stop() + self.eventFlag.set() + for (tName, t) in self.fileProcessingThreadDict.items(): + t.join() + finally: + self.lock.release() + + + def setEvent(self): + self.lock.acquire() + try: + self.eventFlag.set() + finally: + self.lock.release() + + def clearEvent(self): + self.lock.acquire() + try: + self.eventFlag.clear() + finally: + self.lock.release() + +#################################################################### +# Testing + +if __name__ == '__main__': + fp = FileProcessingManager.getInstance() + print fp + #fp.start() + #time.sleep(30) + #fp.stop() + + diff --git a/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py b/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py new file mode 100755 index 0000000000000000000000000000000000000000..443f2b4cc902cae17067e07c527c10d577ef4283 --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +import threading + +from dm.common.utility.loggingManager import LoggingManager + +class FileProcessingThread(threading.Thread): + + def __init__ (self, name, eventFlag, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict): + + threading.Thread.__init__(self) + self.setName(name) + self.exitFlag = False + self.eventFlag = eventFlag + self.fileProcessorDict = fileProcessorDict + self.fileProcessorKeyList = fileProcessorKeyList + self.fileProcessingQueue = fileProcessingQueue + self.processedFileDict = processedFileDict + self.unprocessedFileDict = unprocessedFileDict + self.logger = LoggingManager.getInstance().getLogger(name) + + def run(self): + self.logger.debug('Starting thread: %s' % self.getName()) + while True: + if self.exitFlag: + self.logger.debug('Exit flag set, %s done' % self.getName()) + break + + while True: + file = self.fileProcessingQueue.pop() + if file is None: + break + path = file.getPath() + try: + for processorKey in self.fileProcessorKeyList: + processor = self.fileProcessorDict.get(processorKey) + processorName = processor.__class__.__name__ + self.logger.debug('%s is about to process file %s ' % (processorName, file)) + try: + processor.processFile(path) + self.logger.debug('%s processed file %s ' % (processorName, file)) + except Exception, ex: + self.logger.exception(ex) + self.logger.debug('%s processing failed for file %s ' % (processorName, file)) + file[processorName] = {'error' : ex} + self.unprocessedFileDict[path] = file + + except Exception, ex: + self.logger.exception(ex) + + self.eventFlag.wait() + + def stop(self): + self.exitFlag = True + +#################################################################### +# Testing + +if __name__ == '__main__': + pass + + diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py new file mode 100755 index 0000000000000000000000000000000000000000..bbe30f768c0efa6e8d8498febdebf7afe20f41dd --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python + +import threading +import time + +from watchdog.observers.polling import PollingObserver + +from dm.common.utility.loggingManager import LoggingManager +from dm.common.utility.configurationManager import ConfigurationManager +from dm.common.objects.observedFile import ObservedFile +from dm.common.utility.singleton import Singleton + +from dmFileSystemEventHandler import DmFileSystemEventHandler +from fileProcessingManager import FileProcessingManager + +class FileSystemObserver(threading.Thread,Singleton): + + CONFIG_SECTION_NAME = 'FileSystemObserver' + MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds' + FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds' + + # Singleton. + __instanceLock = threading.RLock() + __instance = None + + def __init__(self): + FileSystemObserver.__instanceLock.acquire() + try: + if FileSystemObserver.__instance: + return + FileSystemObserver.__instance = self + threading.Thread.__init__(self) + self.setName('FileSystemObserverThread') + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + + self.logger.debug('Initializing') + self.lock = threading.RLock() + self.eventFlag = threading.Event() + self.exitFlag = False + + self.observedFileMap = {} + self.observer = PollingObserver() + self.eventHandler = DmFileSystemEventHandler(self) + self.observedWatchDict = {} + self.__configure() + self.fileProcessingManager = FileProcessingManager.getInstance() + self.logger.debug('Initialization complete') + finally: + FileSystemObserver.__instanceLock.release() + + def __configure(self): + cm = ConfigurationManager.getInstance() + configItems = cm.getConfigItems(FileSystemObserver.CONFIG_SECTION_NAME) + self.logger.debug('Got config items: %s' % configItems) + self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY)) + self.fileSystemTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY)) + + def startObservingPath(self, path): + self.lock.acquire() + try: + self.logger.debug('Starting observer for %s' % path) + observedWatch = self.observer.schedule(self.eventHandler, path, recursive=True) + self.observedWatchDict[path] = observedWatch + finally: + self.lock.release() + + def stopObservingPath(self, path): + self.lock.acquire() + try: + observedWatch = self.observedWatchDict.get(path) + if observedWatch: + self.logger.debug('Stopping observer for %s' % path) + self.observer.unschedule(observedWatch) + else: + self.logger.debug('Observer for %s is not active' % path) + finally: + self.lock.release() + + def observedFileUpdated(self, path): + self.lock.acquire() + try: + observedFile = self.observedFileMap.get(path, ObservedFile(path=path)) + observedFile.setLastUpdatedTimestampToNow() + self.observedFileMap[path] = observedFile + self.logger.debug('Observed file updated: %s', observedFile) + finally: + self.lock.release() + + def checkObservedFilesForProcessing(self): + self.lock.acquire() + try: + now = time.time() + pathsForProcessing = [] + for (path,observedFile) in self.observedFileMap.items(): + timestamp = observedFile.get('lastUpdateTimestamp') + deltaT = now - timestamp + if deltaT > self.minFileProcessingDelayInSeconds: + self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (path, deltaT)) + pathsForProcessing.append(path) + return pathsForProcessing + finally: + self.lock.release() + + def processObservedFile(self, path): + self.lock.acquire() + try: + self.logger.debug('Processing file %s' % path) + observedFile = self.observedFileMap.get(path) + if observedFile is not None: + del self.observedFileMap[path] + self.fileProcessingManager.processObservedFile(observedFile) + finally: + self.lock.release() + + def start(self): + self.lock.acquire() + try: + self.logger.debug('Starting file observer thread') + threading.Thread.start(self) + + self.logger.debug('Starting watchdog observer') + self.observer.start() + finally: + self.lock.release() + + def run(self): + self.logger.debug('Starting thread: %s' % self.getName()) + while True: + if self.exitFlag: + self.logger.debug('Exit flag set, %s done' % self.getName()) + break + try: + self.logger.debug('Checking observed files') + + pathsForProcessing = self.checkObservedFilesForProcessing() + for path in pathsForProcessing: + self.processObservedFile(path) + except Exception, ex: + self.logger.exception(ex) + self.eventFlag.wait(timeout=self.fileSystemTimeoutInSeconds) + + def stop(self): + self.lock.acquire() + try: + self.logger.debug('Stopping watchdog observer') + self.observer.stop() + self.observer.join() + + self.logger.debug('Stopping file observer thread') + self.exitFlag = True + self.eventFlag.set() + self.logger.debug('Event is set, joining thread') + threading.Thread.join(self) + self.logger.debug('Module stopped') + finally: + self.lock.release() + + + def setEvent(self): + self.lock.acquire() + try: + self.eventFlag.set() + finally: + self.lock.release() + + def clearEvent(self): + self.lock.acquire() + try: + self.eventFlag.clear() + finally: + self.lock.release() + +#################################################################### +# Testing + +if __name__ == '__main__': + fp = FileSystemObserver.getInstance() + fp.start() + time.sleep(30) + fp.stop() + + diff --git a/src/python/dm/daq_web_service/service/impl/userInfoControllerImpl.py b/src/python/dm/daq_web_service/service/impl/userInfoControllerImpl.py new file mode 100755 index 0000000000000000000000000000000000000000..30d52aeb4b445c7105baf15b48beb7901b1afc26 --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/userInfoControllerImpl.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python + +# +# Implementation for user info controller. +# + +####################################################################### + +from dm.common.objects.dmObject import DmObject +from dm.common.objects.dmObjectManager import DmObjectManager +from dm.common.db.api.userDbApi import UserDbApi + +####################################################################### + +class UserInfoControllerImpl(DmObjectManager): + """ User info controller implementation class. """ + + def __init__(self): + DmObjectManager.__init__(self) + self.userDbApi = UserDbApi() + + def getUsers(self): + return self.userDbApi.getUsers() + + def getUserById(self, id): + return self.userDbApi.getUserById(id) + + def getUserByUsername(self, username): + return self.userDbApi.getUserByUsername(username) +