Skip to content
Snippets Groups Projects
Commit 68e463b5 authored by sveseli's avatar sveseli
Browse files

added file system observer interface for DAQ; implemented FTP observer agent

parent 688b85d9
No related branches found
No related tags found
No related merge requests found
Release 0.6 ()
=============================
- Added file system observer agent interface for DAQ service
- Implemented FTP file system observer
Release 0.5 (10/08/2015)
=============================
......
......@@ -20,6 +20,8 @@ sessionTimeoutInSeconds=3600
# Minimum file processing delay since last update
minFileProcessingDelayInSeconds=10
fileSystemEventTimeoutInSeconds=10
fileSystemObserverAgent=dm.daq_web_service.service.impl.watchdogFileSystemObserverAgent.WatchdogFileSystemObserverAgent()
#fileSystemObserverAgent=dm.daq_web_service.service.impl.ftpFileSystemObserverAgent.FtpFileSystemObserverAgent('dmdaq', 2811)
[FileProcessingManager]
numberOfProcessingThreads=3
......
......@@ -25,9 +25,7 @@ class DmFileSystemEventHandler(FileSystemEventHandler):
def on_created(self, event):
FileSystemEventHandler.on_created(self, event)
self.logger.debug('File system created event: %s' % (event.__dict__))
if not event.is_directory:
filePath = event.src_path
self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
self.processEvent(event)
def on_moved(self, event):
FileSystemEventHandler.on_moved(self, event)
......@@ -40,24 +38,24 @@ class DmFileSystemEventHandler(FileSystemEventHandler):
def on_modified(self, event):
FileSystemEventHandler.on_modified(self, event)
self.logger.debug('File system directory modified event: %s' % (event.__dict__))
self.processEvent(event)
def processEvent(self, event):
if event.is_directory:
self.logger.debug('Processing directory event: %s , src path: %s , latest files: %s' % (event.__dict__, event.src_path, files))
try:
files = glob.glob(os.path.join(event.src_path,'*.*'))
self.logger.debug('File system directory modified event: %s , src path: %s , latest files: %s' % (event.__dict__, event.src_path, files))
if len(files) > 0:
filePath = max(files, key=os.path.getctime)
self.logger.debug('File system directory modified event: %s , latest file: %s' % (event.__dict__, filePath))
self.logger.debug('Latest file: %s' % (filePath))
self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
except Exception, ex:
self.logger.error('Exception occured when searching for file in directory %s: %s' % (event.__dict__, ex))
else:
filePath = event.src_path
self.logger.debug('File system modified event: %s' % (event.__dict__))
self.logger.debug('Processing file event: %s' % (event.__dict__))
self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
def on_moved(self, event):
FileSystemEventHandler.on_moved(self, event)
####################################################################
# Testing
......
......@@ -8,6 +8,7 @@ from watchdog.observers.polling import PollingObserver
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.singleton import Singleton
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.processing.fileProcessingManager import FileProcessingManager
......@@ -19,6 +20,7 @@ class FileSystemObserver(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileSystemObserver'
MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds'
FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds'
FILE_SYSTEM_OBSERVER_AGENT_KEY = 'filesystemobserveragent'
# Singleton.
__instanceLock = threading.RLock()
......@@ -40,8 +42,6 @@ class FileSystemObserver(threading.Thread,Singleton):
self.exitFlag = False
self.observedFileMap = {}
self.observer = PollingObserver()
self.observedWatchDict = {}
self.__configure()
self.fileProcessingManager = FileProcessingManager.getInstance()
self.logger.debug('Initialization complete')
......@@ -57,22 +57,21 @@ class FileSystemObserver(threading.Thread,Singleton):
self.fileSystemEventTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
self.logger.debug('File system event timeout: %s seconds' % self.fileSystemEventTimeoutInSeconds)
agentClass = cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_OBSERVER_AGENT_KEY)
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(agentClass)
self.logger.debug('Creating file system observer agent instance of class %s' % className)
self.fileSystemObserverAgent = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.fileSystemObserverAgent.setFileSystemObserver(self)
@ThreadingUtility.synchronize
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Starting observer for %s' % dataDirectory)
eventHandler = DmFileSystemEventHandler(self, dataDirectory, experiment)
observedWatch = self.observer.schedule(eventHandler, dataDirectory, recursive=True)
self.observedWatchDict[dataDirectory] = observedWatch
self.logger.debug('Agent is starting observer for %s' % dataDirectory)
self.fileSystemObserverAgent.startObservingPath(dataDirectory, experiment)
@ThreadingUtility.synchronize
def stopObservingPath(self, dataDirectory, experiment):
observedWatch = self.observedWatchDict.get(dataDirectory)
if observedWatch:
self.logger.debug('Stopping observer for %s' % dataDirectory)
self.observer.unschedule(observedWatch)
del self.observedWatchDict[dataDirectory]
else:
self.logger.debug('Observer for %s is not active' % dataDirectory)
self.logger.debug('Agent is stopping observer for %s' % dataDirectory)
self.fileSystemObserverAgent.stopObservingPath(dataDirectory, experiment)
@ThreadingUtility.synchronize
def fileUpdated(self, filePath, dataDirectory, experiment):
......@@ -106,8 +105,8 @@ class FileSystemObserver(threading.Thread,Singleton):
self.logger.debug('Starting file observer thread')
threading.Thread.start(self)
self.logger.debug('Starting watchdog observer')
self.observer.start()
self.logger.debug('Starting file observer agent')
self.fileSystemObserverAgent.start()
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
......@@ -127,9 +126,8 @@ class FileSystemObserver(threading.Thread,Singleton):
@ThreadingUtility.synchronize
def stop(self):
self.logger.debug('Stopping watchdog observer')
self.observer.stop()
self.observer.join()
self.logger.debug('Stopping file observer agent')
self.fileSystemObserverAgent.stop()
self.logger.debug('Stopping file observer thread')
self.exitFlag = True
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment