From 68e463b52e9b82fb3f7829ccc710bc4c7a10597b Mon Sep 17 00:00:00 2001
From: Sinisa Veseli <sveseli@aps.anl.gov>
Date: Mon, 2 Nov 2015 13:48:56 +0000
Subject: [PATCH] added file system observer interface for DAQ; implemented FTP
 observer agent

---
 doc/RELEASE_NOTES.txt                         |  6 ++++
 etc/daq-web-service.conf.template             |  2 ++
 .../service/impl/dmFileSystemEventHandler.py  | 16 ++++-----
 .../service/impl/fileSystemObserver.py        | 34 +++++++++----------
 4 files changed, 31 insertions(+), 27 deletions(-)

diff --git a/doc/RELEASE_NOTES.txt b/doc/RELEASE_NOTES.txt
index 7270cb6b..21e99947 100644
--- a/doc/RELEASE_NOTES.txt
+++ b/doc/RELEASE_NOTES.txt
@@ -1,3 +1,9 @@
+Release 0.6 ()
+=============================
+
+- Added file system observer agent interface for DAQ service
+- Implemented FTP file system observer
+
 Release 0.5 (10/08/2015)
 =============================
 
diff --git a/etc/daq-web-service.conf.template b/etc/daq-web-service.conf.template
index 4f5f4fe2..7df7b387 100644
--- a/etc/daq-web-service.conf.template
+++ b/etc/daq-web-service.conf.template
@@ -20,6 +20,8 @@ sessionTimeoutInSeconds=3600
 # Minimum file processing delay since last update
 minFileProcessingDelayInSeconds=10
 fileSystemEventTimeoutInSeconds=10
+fileSystemObserverAgent=dm.daq_web_service.service.impl.watchdogFileSystemObserverAgent.WatchdogFileSystemObserverAgent()
+#fileSystemObserverAgent=dm.daq_web_service.service.impl.ftpFileSystemObserverAgent.FtpFileSystemObserverAgent('dmdaq', 2811)
 
 [FileProcessingManager]
 numberOfProcessingThreads=3
diff --git a/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py b/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py
index f342ca47..7cc20a76 100755
--- a/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py
+++ b/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py
@@ -25,9 +25,7 @@ class DmFileSystemEventHandler(FileSystemEventHandler):
     def on_created(self, event):
         FileSystemEventHandler.on_created(self, event)
         self.logger.debug('File system created event: %s' % (event.__dict__))
-        if not event.is_directory:
-            filePath = event.src_path
-            self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
+        self.processEvent(event)
 
     def on_moved(self, event):
         FileSystemEventHandler.on_moved(self, event)
@@ -40,24 +38,24 @@ class DmFileSystemEventHandler(FileSystemEventHandler):
     def on_modified(self, event):
         FileSystemEventHandler.on_modified(self, event)
         self.logger.debug('File system directory modified event: %s' % (event.__dict__))
+        self.processEvent(event)
+
+    def processEvent(self, event):
         if event.is_directory:
+            self.logger.debug('Processing directory event: %s , src path: %s ,  latest files: %s' % (event.__dict__, event.src_path, files))
             try:
                 files = glob.glob(os.path.join(event.src_path,'*.*'))
-                self.logger.debug('File system directory modified event: %s , src path: %s ,  latest files: %s' % (event.__dict__, event.src_path, files))
                 if len(files) > 0:
                     filePath = max(files, key=os.path.getctime)
-                    self.logger.debug('File system directory modified event: %s , latest file: %s' % (event.__dict__, filePath))
+                    self.logger.debug('Latest file: %s' % (filePath))
                     self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
             except Exception, ex:
                 self.logger.error('Exception occured when searching for file in directory %s: %s' % (event.__dict__, ex))
         else:
             filePath = event.src_path
-            self.logger.debug('File system modified event: %s' % (event.__dict__))
+            self.logger.debug('Processing file event: %s' % (event.__dict__))
             self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
         
-    def on_moved(self, event):
-        FileSystemEventHandler.on_moved(self, event)
-
 ####################################################################
 # Testing
 
diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
index 3345e898..c9e4bfda 100755
--- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
+++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
@@ -8,6 +8,7 @@ from watchdog.observers.polling import PollingObserver
 from dm.common.utility.loggingManager import LoggingManager
 from dm.common.utility.configurationManager import ConfigurationManager
 from dm.common.objects.observedFile import ObservedFile
+from dm.common.utility.objectUtility import ObjectUtility
 from dm.common.utility.singleton import Singleton
 from dm.common.utility.threadingUtility import ThreadingUtility
 from dm.common.processing.fileProcessingManager import FileProcessingManager
@@ -19,6 +20,7 @@ class FileSystemObserver(threading.Thread,Singleton):
     CONFIG_SECTION_NAME = 'FileSystemObserver'
     MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds'
     FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds'
+    FILE_SYSTEM_OBSERVER_AGENT_KEY = 'filesystemobserveragent'
 
     # Singleton.
     __instanceLock = threading.RLock()
@@ -40,8 +42,6 @@ class FileSystemObserver(threading.Thread,Singleton):
             self.exitFlag = False
 
             self.observedFileMap = {}
-            self.observer = PollingObserver()
-            self.observedWatchDict = {}
             self.__configure()
             self.fileProcessingManager = FileProcessingManager.getInstance()
             self.logger.debug('Initialization complete')
@@ -57,22 +57,21 @@ class FileSystemObserver(threading.Thread,Singleton):
         self.fileSystemEventTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
         self.logger.debug('File system event timeout: %s seconds' % self.fileSystemEventTimeoutInSeconds)
 
+        agentClass = cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_OBSERVER_AGENT_KEY)
+        (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(agentClass)
+        self.logger.debug('Creating file system observer agent instance of class %s' % className)
+        self.fileSystemObserverAgent = ObjectUtility.createObjectInstance(moduleName, className, constructor)
+        self.fileSystemObserverAgent.setFileSystemObserver(self)
+
     @ThreadingUtility.synchronize
     def startObservingPath(self, dataDirectory, experiment):
-        self.logger.debug('Starting observer for %s' % dataDirectory)
-        eventHandler = DmFileSystemEventHandler(self, dataDirectory, experiment)
-        observedWatch = self.observer.schedule(eventHandler, dataDirectory, recursive=True)
-        self.observedWatchDict[dataDirectory] = observedWatch
+        self.logger.debug('Agent is starting observer for %s' % dataDirectory)
+        self.fileSystemObserverAgent.startObservingPath(dataDirectory, experiment)
 
     @ThreadingUtility.synchronize
     def stopObservingPath(self, dataDirectory, experiment):
-        observedWatch = self.observedWatchDict.get(dataDirectory)
-        if observedWatch:
-            self.logger.debug('Stopping observer for %s' % dataDirectory)
-            self.observer.unschedule(observedWatch)
-            del self.observedWatchDict[dataDirectory] 
-        else:
-            self.logger.debug('Observer for %s is not active' % dataDirectory)
+        self.logger.debug('Agent is stopping observer for %s' % dataDirectory)
+        self.fileSystemObserverAgent.stopObservingPath(dataDirectory, experiment)
         
     @ThreadingUtility.synchronize
     def fileUpdated(self, filePath, dataDirectory, experiment):
@@ -106,8 +105,8 @@ class FileSystemObserver(threading.Thread,Singleton):
         self.logger.debug('Starting file observer thread')
         threading.Thread.start(self)
 
-        self.logger.debug('Starting watchdog observer')
-        self.observer.start()
+        self.logger.debug('Starting file observer agent')
+        self.fileSystemObserverAgent.start()
 
     def run(self):
         self.logger.debug('Starting thread: %s' % self.getName())
@@ -127,9 +126,8 @@ class FileSystemObserver(threading.Thread,Singleton):
 
     @ThreadingUtility.synchronize
     def stop(self):
-        self.logger.debug('Stopping watchdog observer')
-        self.observer.stop()
-        self.observer.join()
+        self.logger.debug('Stopping file observer agent')
+        self.fileSystemObserverAgent.stop()
 
         self.logger.debug('Stopping file observer thread')
         self.exitFlag = True
-- 
GitLab