From 1af7defbf54ebd8244c7dae14fbacd7921f59667 Mon Sep 17 00:00:00 2001
From: Sinisa Veseli <sveseli@aps.anl.gov>
Date: Tue, 14 Apr 2015 13:24:16 +0000
Subject: [PATCH] changes related to new file processing interface that
 includes experiment and daqPath

---
 .../service/impl/dmFileSystemEventHandler.py  |  8 +--
 .../impl/experimentSessionControllerImpl.py   | 18 ++++---
 .../service/impl/fileProcessingThread.py      |  8 +--
 .../service/impl/fileSystemObserver.py        | 49 ++++++++++---------
 .../impl/experimentSessionControllerImpl.py   | 16 ++++--
 5 files changed, 60 insertions(+), 39 deletions(-)

diff --git a/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py b/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py
index 74a7d4c4..b9b5788e 100755
--- a/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py
+++ b/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py
@@ -8,10 +8,12 @@ from dm.common.utility.loggingManager import LoggingManager
 
 class DmFileSystemEventHandler(FileSystemEventHandler):
 
-    def __init__(self, fileSystemObserver):
+    def __init__(self, fileSystemObserver, daqPath, experiment):
         FileSystemEventHandler.__init__(self)
         self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
         self.fileSystemObserver = fileSystemObserver
+        self.daqPath = daqPath
+        self.experiment = experiment
 
     def dispatch(self, event):
         FileSystemEventHandler.dispatch(self, event)
@@ -27,10 +29,10 @@ class DmFileSystemEventHandler(FileSystemEventHandler):
 
     def on_modified(self, event):
         FileSystemEventHandler.on_modified(self, event)
-        path = event.src_path
+        filePath = event.src_path
         self.logger.debug('File system modified event: %s' % (event.__dict__))
         if not event.is_directory:
-            self.fileSystemObserver.observedFileUpdated(path)
+            self.fileSystemObserver.observedFileUpdated(filePath, self.daqPath, self.experiment)
         
     def on_moved(self, event):
         FileSystemEventHandler.on_moved(self, event)
diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
index 1c9da104..e5d73a5a 100755
--- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
+++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
@@ -10,6 +10,7 @@ from dm.common.objects.experiment import Experiment
 from dm.common.objects.dmObjectManager import DmObjectManager
 from dm.common.exceptions.invalidRequest import InvalidRequest
 from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
+from experimentTracker import ExperimentTracker
 from fileSystemObserver import FileSystemObserver
 
 class ExperimentSessionControllerImpl(DmObjectManager):
@@ -17,20 +18,25 @@ class ExperimentSessionControllerImpl(DmObjectManager):
 
     def __init__(self):
         DmObjectManager.__init__(self)
-        self.experimentDict = {}
+        self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
 
     def startDaq(self, name, dataDirectory):
-        FileSystemObserver.getInstance().startObservingPath(dataDirectory)
+        experiment = self.dsExperimentApi.getExperimentByName(name)
+        storageDirectory = experiment.get('storageDirectory')
+        if storageDirectory is None:
+            raise InvalidRequest('Experiment %s has not been started.' % name)
         startTime = time.time()
-        experiment = Experiment({'name' : name, 'dataDirectory' : dataDirectory, 'daqStartTime' : startTime})
-        self.experimentDict[name] = experiment
+        experiment['daqDataDirectory'] = dataDirectory 
+        experiment['daqStartTime'] = startTime
+        FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
+        ExperimentTracker.getInstance().put(name, experiment)
         return experiment
 
     def stopDaq(self, name):
-        experiment = self.experimentDict.get(name)
+        experiment = ExperimentTracker.getInstance().get(name)
         if experiment is None or experiment.get('daqEndTime') is not None:
             raise InvalidRequest('Experiment %s is not active.' % name)
         dataDirectory = experiment.get('dataDirectory')
-        FileSystemObserver.getInstance().stopObservingPath(dataDirectory)
         experiment['daqEndTime'] = time.time()
+        FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
         return experiment
diff --git a/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py b/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py
index 443f2b4c..fda96165 100755
--- a/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py
+++ b/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py
@@ -30,20 +30,22 @@ class FileProcessingThread(threading.Thread):
                 file = self.fileProcessingQueue.pop()
                 if file is None:
                     break
-                path = file.getPath()
+                filePath = file.getFilePath()
+                daqPath = file.getDaqPath()
+                experiment = file.getExperiment()
                 try:
                     for processorKey in self.fileProcessorKeyList: 
                         processor = self.fileProcessorDict.get(processorKey)
                         processorName = processor.__class__.__name__
                         self.logger.debug('%s is about to process file %s ' % (processorName, file))
                         try:
-                            processor.processFile(path)
+                            processor.processFile(filePath, daqPath, experiment)
                             self.logger.debug('%s processed file %s ' % (processorName, file))
                         except Exception, ex:
                             self.logger.exception(ex)
                             self.logger.debug('%s processing failed for file %s ' % (processorName, file))
                             file[processorName] = {'error' : ex}
-                            self.unprocessedFileDict[path] = file
+                            self.unprocessedFileDict[filePath] = file
 
                 except Exception, ex:
                     self.logger.exception(ex)
diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
index bbe30f76..e6f8ed2e 100755
--- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
+++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
@@ -40,7 +40,6 @@ class FileSystemObserver(threading.Thread,Singleton):
 
             self.observedFileMap = {}
             self.observer = PollingObserver()
-            self.eventHandler = DmFileSystemEventHandler(self)
             self.observedWatchDict = {}
             self.__configure()
             self.fileProcessingManager = FileProcessingManager.getInstance()
@@ -55,33 +54,35 @@ class FileSystemObserver(threading.Thread,Singleton):
         self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY))
         self.fileSystemTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
 
-    def startObservingPath(self, path):
+    def startObservingPath(self, daqPath, experiment):
         self.lock.acquire()
         try:
-            self.logger.debug('Starting observer for %s' % path)
-            observedWatch = self.observer.schedule(self.eventHandler, path, recursive=True)
-            self.observedWatchDict[path] = observedWatch
+            self.logger.debug('Starting observer for %s' % daqPath)
+            eventHandler = DmFileSystemEventHandler(self, daqPath, experiment)
+            observedWatch = self.observer.schedule(eventHandler, daqPath, recursive=True)
+            self.observedWatchDict[daqPath] = observedWatch
         finally:
             self.lock.release()
 
-    def stopObservingPath(self, path):
+    def stopObservingPath(self, daqPath, experiment):
         self.lock.acquire()
         try:
-            observedWatch = self.observedWatchDict.get(path)
+            observedWatch = self.observedWatchDict.get(daqPath)
             if observedWatch:
-                self.logger.debug('Stopping observer for %s' % path)
+                self.logger.debug('Stopping observer for %s' % daqPath)
                 self.observer.unschedule(observedWatch)
+                del self.observedWatchDict[daqPath] 
             else:
-                self.logger.debug('Observer for %s is not active' % path)
+                self.logger.debug('Observer for %s is not active' % daqPath)
         finally:
             self.lock.release()
         
-    def observedFileUpdated(self, path):
+    def observedFileUpdated(self, filePath, daqPath, experiment):
         self.lock.acquire()
         try:
-            observedFile = self.observedFileMap.get(path, ObservedFile(path=path))
+            observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, daqPath=daqPath, experiment=experiment))
             observedFile.setLastUpdatedTimestampToNow()
-            self.observedFileMap[path] = observedFile
+            self.observedFileMap[filePath] = observedFile
             self.logger.debug('Observed file updated: %s', observedFile)
         finally:
             self.lock.release()
@@ -90,24 +91,24 @@ class FileSystemObserver(threading.Thread,Singleton):
         self.lock.acquire()
         try:
             now = time.time()
-            pathsForProcessing = []
-            for (path,observedFile) in self.observedFileMap.items():
+            filePathsForProcessing = []
+            for (filePath,observedFile) in self.observedFileMap.items():
                 timestamp = observedFile.get('lastUpdateTimestamp')
                 deltaT = now - timestamp
                 if deltaT > self.minFileProcessingDelayInSeconds:
-                    self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (path, deltaT))
-                    pathsForProcessing.append(path)
-            return pathsForProcessing
+                    self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
+                    filePathsForProcessing.append(filePath)
+            return filePathsForProcessing
         finally:
             self.lock.release()
     
-    def processObservedFile(self, path):
+    def processObservedFile(self, filePath):
         self.lock.acquire()
         try:
-            self.logger.debug('Processing file %s' % path)
-            observedFile = self.observedFileMap.get(path)
+            self.logger.debug('Processing file %s' % filePath)
+            observedFile = self.observedFileMap.get(filePath)
             if observedFile is not None:
-                del self.observedFileMap[path]
+                del self.observedFileMap[filePath]
                 self.fileProcessingManager.processObservedFile(observedFile)
         finally:
             self.lock.release()
@@ -132,9 +133,9 @@ class FileSystemObserver(threading.Thread,Singleton):
             try:
                 self.logger.debug('Checking observed files')
 
-                pathsForProcessing = self.checkObservedFilesForProcessing()
-                for path in pathsForProcessing:
-                    self.processObservedFile(path)
+                filePathsForProcessing = self.checkObservedFilesForProcessing()
+                for filePath in filePathsForProcessing:
+                    self.processObservedFile(filePath)
             except Exception, ex:
                 self.logger.exception(ex)
             self.eventFlag.wait(timeout=self.fileSystemTimeoutInSeconds)
diff --git a/src/python/dm/ds_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/ds_web_service/service/impl/experimentSessionControllerImpl.py
index f41021dc..f19c114d 100755
--- a/src/python/dm/ds_web_service/service/impl/experimentSessionControllerImpl.py
+++ b/src/python/dm/ds_web_service/service/impl/experimentSessionControllerImpl.py
@@ -29,10 +29,12 @@ class ExperimentSessionControllerImpl(DmObjectManager):
 
     def getExperimentByName(self, name):
         experiment = self.experimentDbApi.getExperimentByName(name)
+        StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment)
         return experiment
 
     def getExperimentById(self, id):
         experiment = self.experimentDbApi.getExperimentById(id)
+        StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment)
         return experiment
 
     def addExperiment(self, name, experimentTypeId, description):
@@ -40,10 +42,18 @@ class ExperimentSessionControllerImpl(DmObjectManager):
         return experiment
 
     def startExperiment(self, name):
-        experiment = self.experimentDbApi.setExperimentStartDateToNow(name)
-        StorageManager.getInstance().createExperimentDataDirectory(experiment)
+        experiment = self.experimentDbApi.getExperimentByName(name)
+        if experiment.get('startDate') is None:
+            experiment = self.experimentDbApi.setExperimentStartDateToNow(name)
+            StorageManager.getInstance().createExperimentDataDirectory(experiment)
+        else:
+            StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment)
         return experiment
 
     def stopExperiment(self, name):
-        experiment = self.experimentDbApi.setExperimentEndDateToNow(name)
+        experiment = self.experimentDbApi.getExperimentByName(name)
+        if experiment.get('endDate') is None:
+            experiment = self.experimentDbApi.setExperimentEndDateToNow(name)
+        StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment)
         return experiment
+
-- 
GitLab