From 27aacb57715888ea2175c809ee89bd26b5850800 Mon Sep 17 00:00:00 2001
From: Sinisa Veseli <sveseli@aps.anl.gov>
Date: Fri, 26 Jun 2015 20:12:21 +0000
Subject: [PATCH] changes related to file processing framework

---
 .../daq_web_service/api/experimentRestApi.py  | 25 +++++----
 .../service/experimentSessionController.py    | 47 +++++++++--------
 .../service/impl/dmFileSystemEventHandler.py  |  6 +--
 .../impl/dsProcessFileNotificationPlugin.py   | 16 +++---
 .../impl/experimentSessionControllerImpl.py   | 52 ++++++++++++-------
 .../service/impl/fileSystemObserver.py        | 30 +++++------
 .../dm/ds_web_service/api/fileRestApi.py      | 11 ++--
 .../service/fileSessionController.py          | 25 ++++-----
 .../service/impl/experimentManager.py         | 13 ++++-
 .../service/impl/fileSessionControllerImpl.py |  8 +--
 10 files changed, 132 insertions(+), 101 deletions(-)

diff --git a/src/python/dm/daq_web_service/api/experimentRestApi.py b/src/python/dm/daq_web_service/api/experimentRestApi.py
index 6f0e9aba..cf5c93fd 100755
--- a/src/python/dm/daq_web_service/api/experimentRestApi.py
+++ b/src/python/dm/daq_web_service/api/experimentRestApi.py
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 
 import os
+import json
 import urllib
 
 from dm.common.utility.encoder import Encoder
@@ -14,35 +15,37 @@ class ExperimentRestApi(DaqRestApi):
         DaqRestApi.__init__(self, username, password, host, port, protocol)
 
     @DaqRestApi.execute
-    def startDaq(self, name, dataDirectory):
+    def startDaq(self, experimentName, dataDirectory, daqInfo={}):
         url = '%s/experiments/startDaq' % (self.getContextRoot())
-        if name is None or not len(name):
+        if experimentName is None or not len(experimentName):
             raise InvalidRequest('Experiment name must be provided.')
-        url += '?name=%s' % Encoder.encode(name)
         if dataDirectory is None or not len(dataDirectory):
             raise InvalidRequest('Experiment data directory must be provided.')
-        url += '&dataDirectory=%s' % Encoder.encode(dataDirectory)
+        daqInfo['experimentName'] = experimentName
+        daqInfo['dataDirectory'] = dataDirectory
+        url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
         responseDict = self.sendSessionRequest(url=url, method='POST')
         return Experiment(responseDict)
 
     @DaqRestApi.execute
-    def stopDaq(self, name):
+    def stopDaq(self, experimentName):
         url = '%s/experiments/stopDaq' % (self.getContextRoot())
-        if name is None or not len(name):
+        if experimentName is None or not len(experimentName):
             raise InvalidRequest('Experiment name must be provided.')
-        url += '?name=%s' % Encoder.encode(name)
+        url += '?experimentName=%s' % Encoder.encode(experimentName)
         responseDict = self.sendSessionRequest(url=url, method='POST')
         return Experiment(responseDict)
 
     @DaqRestApi.execute
-    def upload(self, name, dataDirectory):
+    def upload(self, experimentName, dataDirectory, daqInfo={}):
         url = '%s/experiments/upload' % (self.getContextRoot())
-        if name is None or not len(name):
+        if experimentName is None or not len(experimentName):
             raise InvalidRequest('Experiment name must be provided.')
-        url += '?name=%s' % Encoder.encode(name)
         if dataDirectory is None or not len(dataDirectory):
             raise InvalidRequest('Experiment data directory must be provided.')
-        url += '&dataDirectory=%s' % Encoder.encode(dataDirectory)
+        daqInfo['experimentName'] = experimentName
+        daqInfo['dataDirectory'] = dataDirectory
+        url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
         responseDict = self.sendSessionRequest(url=url, method='POST')
         return Experiment(responseDict)
 
diff --git a/src/python/dm/daq_web_service/service/experimentSessionController.py b/src/python/dm/daq_web_service/service/experimentSessionController.py
index 25751d1b..36d258df 100755
--- a/src/python/dm/daq_web_service/service/experimentSessionController.py
+++ b/src/python/dm/daq_web_service/service/experimentSessionController.py
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 
 import cherrypy
+import json
 
 from dm.common.service.dmSessionController import DmSessionController
 from dm.common.exceptions.invalidRequest import InvalidRequest
@@ -19,15 +20,16 @@ class ExperimentSessionController(DmSessionController):
     @DmSessionController.require(DmSessionController.isAdministrator())
     @DmSessionController.execute
     def startDaq(self, **kwargs):
-        name = kwargs.get('name')
-        if name is None or not len(name):
-            raise InvalidRequest('Missing experiment name.')
-        name = Encoder.decode(name)
-        dataDirectory = kwargs.get('dataDirectory')
-        if dataDirectory is None or not len(dataDirectory):
-            raise InvalidRequest('Missing experiment data directory.')
-        dataDirectory = Encoder.decode(dataDirectory)
-        response = self.experimentSessionControllerImpl.startDaq(name, dataDirectory).getFullJsonRep()
+        encodedDaqInfo = kwargs.get('daqInfo')
+        if not encodedDaqInfo:
+            raise InvalidRequest('Invalid DAQ info provided.')
+        daqInfo = json.loads(Encoder.decode(encodedDaqInfo))
+
+        if not daqInfo.has_key('experimentName'):
+            raise InvalidRequest('Experiment name is missing.')
+        if not daqInfo.has_key('dataDirectory'):
+            raise InvalidRequest('Data directory is missing.')
+        response = self.experimentSessionControllerImpl.startDaq(daqInfo).getFullJsonRep()
         self.logger.debug('Returning: %s' % response)
         return response
 
@@ -35,11 +37,11 @@ class ExperimentSessionController(DmSessionController):
     @DmSessionController.require(DmSessionController.isAdministrator())
     @DmSessionController.execute
     def stopDaq(self, **kwargs):
-        name = kwargs.get('name')
-        if name is None or not len(name):
+        experimentName = kwargs.get('experimentName')
+        if not experimentName:
             raise InvalidRequest('Missing experiment name.')
-        name = Encoder.decode(name)
-        response = self.experimentSessionControllerImpl.stopDaq(name).getFullJsonRep()
+        experimentName = Encoder.decode(experimentName)
+        response = self.experimentSessionControllerImpl.stopDaq(experimentName).getFullJsonRep()
         self.logger.debug('Returning: %s' % response)
         return response
 
@@ -47,15 +49,16 @@ class ExperimentSessionController(DmSessionController):
     @DmSessionController.require(DmSessionController.isAdministrator())
     @DmSessionController.execute
     def upload(self, **kwargs):
-        name = kwargs.get('name')
-        if name is None or not len(name):
-            raise InvalidRequest('Missing experiment name.')
-        name = Encoder.decode(name)
-        dataDirectory = kwargs.get('dataDirectory')
-        if dataDirectory is None or not len(dataDirectory):
-            raise InvalidRequest('Missing experiment data directory.')
-        dataDirectory = Encoder.decode(dataDirectory)
-        response = self.experimentSessionControllerImpl.upload(name, dataDirectory).getFullJsonRep()
+        encodedDaqInfo = kwargs.get('daqInfo')
+        if not encodedDaqInfo:
+            raise InvalidRequest('Invalid DAQ info provided.')
+        daqInfo = json.loads(Encoder.decode(encodedDaqInfo))
+
+        if not daqInfo.has_key('experimentName'):
+            raise InvalidRequest('Experiment name is missing.')
+        if not daqInfo.has_key('dataDirectory'):
+            raise InvalidRequest('Data directory is missing.')
+        response = self.experimentSessionControllerImpl.upload(daqInfo).getFullJsonRep()
         self.logger.debug('Returning: %s' % response)
         return response
 
diff --git a/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py b/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py
index b9b5788e..0c11f0c8 100755
--- a/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py
+++ b/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py
@@ -8,11 +8,11 @@ from dm.common.utility.loggingManager import LoggingManager
 
 class DmFileSystemEventHandler(FileSystemEventHandler):
 
-    def __init__(self, fileSystemObserver, daqPath, experiment):
+    def __init__(self, fileSystemObserver, dataDirectory, experiment):
         FileSystemEventHandler.__init__(self)
         self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
         self.fileSystemObserver = fileSystemObserver
-        self.daqPath = daqPath
+        self.dataDirectory = dataDirectory
         self.experiment = experiment
 
     def dispatch(self, event):
@@ -32,7 +32,7 @@ class DmFileSystemEventHandler(FileSystemEventHandler):
         filePath = event.src_path
         self.logger.debug('File system modified event: %s' % (event.__dict__))
         if not event.is_directory:
-            self.fileSystemObserver.observedFileUpdated(filePath, self.daqPath, self.experiment)
+            self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
         
     def on_moved(self, event):
         FileSystemEventHandler.on_moved(self, event)
diff --git a/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py b/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py
index ac3b7ffd..3485ab6b 100755
--- a/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py
+++ b/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py
@@ -12,14 +12,18 @@ class DsProcessFileNotificationPlugin(FileProcessor):
         self.dsFileApi = DsRestApiFactory.getFileRestApi()
         self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
 
-    def processFile(self, filePath, daqPath, experiment):
+    def processFile(self, fileInfo):
+        experimentFilePath = fileInfo.get('experimentFilePath')
+        experiment = fileInfo.get('experiment')
         experimentName = experiment.get('name')
-        storageDirectory = experiment.get('storageDirectory')
-        fileName = os.path.basename(filePath)
-        daqFilePath = os.path.join(storageDirectory, fileName)
-        self.logger.debug('Processing file %s' % filePath)
+        self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
 
-        self.dsFileApi.processFile(fileName, daqFilePath, experimentName)
+        # Prepare dictionary for processing. Only send needed data.
+        fileInfo2 = {}
+        fileInfo2['experimentFilePath'] = experimentFilePath
+        fileInfo2['experimentName'] = experimentName
+        fileInfo2['daqInfo'] = experiment.get('daqInfo')
+        self.dsFileApi.processFile(experimentFilePath, experimentName, fileInfo2)
 
 #######################################################################
 # Testing.
diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
index f4febc39..8301fe22 100755
--- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
+++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
@@ -11,10 +11,10 @@ from dm.common.objects.dmObjectManager import DmObjectManager
 from dm.common.exceptions.invalidRequest import InvalidRequest
 from dm.common.utility.osUtility import OsUtility
 from dm.common.objects.observedFile import ObservedFile
+from dm.common.processing.fileProcessingManager import FileProcessingManager
 from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
 from experimentTracker import ExperimentTracker
 from fileSystemObserver import FileSystemObserver
-from fileProcessingManager import FileProcessingManager
 
 class ExperimentSessionControllerImpl(DmObjectManager):
     """ Experiment session controller implementation class. """
@@ -23,37 +23,51 @@ class ExperimentSessionControllerImpl(DmObjectManager):
         DmObjectManager.__init__(self)
         self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
 
-    def startDaq(self, name, dataDirectory):
-        experiment = self.dsExperimentApi.getExperimentByName(name)
+    def startDaq(self, daqInfo):
+        experimentName = daqInfo.get('experimentName')
+        experiment = ExperimentTracker.getInstance().get(experimentName)
+        if experiment is not None:
+            oldDaqInfo = experiment.get('daqInfo')
+            if oldDaqInfo.get('daqEndTime') is None:
+                raise InvalidRequest('DAQ for experiment %s is already active in directory %s.' % (experimentName,oldDaqInfo.get('dataDirectory')))
+
+        dataDirectory = daqInfo.get('dataDirectory')
+        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
         storageDirectory = experiment.get('storageDirectory')
         if storageDirectory is None:
-            raise InvalidRequest('Experiment %s has not been started.' % name)
+            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
         startTime = time.time()
-        experiment['daqDataDirectory'] = dataDirectory 
-        experiment['daqStartTime'] = startTime
+        daqInfo['daqStartTime'] = startTime
+        experiment['daqInfo'] = daqInfo
+        self.logger.debug('Starting DAQ %s' % daqInfo)
         FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
-        ExperimentTracker.getInstance().put(name, experiment)
+        ExperimentTracker.getInstance().put(experimentName, experiment)
         return experiment
 
-    def stopDaq(self, name):
-        experiment = ExperimentTracker.getInstance().get(name)
-        if experiment is None or experiment.get('daqEndTime') is not None:
-            raise InvalidRequest('Experiment %s is not active.' % name)
-        dataDirectory = experiment.get('dataDirectory')
-        experiment['daqEndTime'] = time.time()
+    def stopDaq(self, experimentName):
+        experiment = ExperimentTracker.getInstance().get(experimentName)
+        if experiment is not None:
+            daqInfo = experiment.get('daqInfo')
+        if experiment is None or daqInfo.get('daqEndTime') is not None:
+            raise InvalidRequest('Experiment %s is not active.' % experimentName)
+        dataDirectory = daqInfo.get('dataDirectory')
+        daqInfo['daqEndTime'] = time.time()
         FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
         return experiment
 
-    def upload(self, name, dataDirectory):
-        experiment = self.dsExperimentApi.getExperimentByName(name)
+    def upload(self, daqInfo):
+        experimentName = daqInfo.get('experimentName')
+        experiment = self.dsExperimentApi.getExperimentByName(experimentName)
+        experiment['daqInfo'] = daqInfo
         storageDirectory = experiment.get('storageDirectory')
         if storageDirectory is None:
-            raise InvalidRequest('Experiment %s has not been started.' % name)
+            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
+        dataDirectory = daqInfo.get('dataDirectory')
         filePaths = OsUtility.findFiles(dataDirectory)
         fileProcessingManager = FileProcessingManager.getInstance()
         for filePath in filePaths:
-            observedFile = ObservedFile(filePath=filePath, daqPath=dataDirectory, experiment=experiment)
-            fileProcessingManager.processObservedFile(observedFile)
-        ExperimentTracker.getInstance().put(name, experiment)
+            fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
+            fileProcessingManager.processFile(fileInfo)
+        ExperimentTracker.getInstance().put(experimentName, experiment)
         return experiment
 
diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
index 27dad6d0..b4764f5f 100755
--- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
+++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
@@ -56,25 +56,25 @@ class FileSystemObserver(threading.Thread,Singleton):
         self.fileSystemTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
 
     @ThreadingUtility.synchronize
-    def startObservingPath(self, daqPath, experiment):
-        self.logger.debug('Starting observer for %s' % daqPath)
-        eventHandler = DmFileSystemEventHandler(self, daqPath, experiment)
-        observedWatch = self.observer.schedule(eventHandler, daqPath, recursive=True)
-        self.observedWatchDict[daqPath] = observedWatch
+    def startObservingPath(self, dataDirectory, experiment):
+        self.logger.debug('Starting observer for %s' % dataDirectory)
+        eventHandler = DmFileSystemEventHandler(self, dataDirectory, experiment)
+        observedWatch = self.observer.schedule(eventHandler, dataDirectory, recursive=True)
+        self.observedWatchDict[dataDirectory] = observedWatch
 
     @ThreadingUtility.synchronize
-    def stopObservingPath(self, daqPath, experiment):
-        observedWatch = self.observedWatchDict.get(daqPath)
+    def stopObservingPath(self, dataDirectory, experiment):
+        observedWatch = self.observedWatchDict.get(dataDirectory)
         if observedWatch:
-            self.logger.debug('Stopping observer for %s' % daqPath)
+            self.logger.debug('Stopping observer for %s' % dataDirectory)
             self.observer.unschedule(observedWatch)
-            del self.observedWatchDict[daqPath] 
+            del self.observedWatchDict[dataDirectory] 
         else:
-            self.logger.debug('Observer for %s is not active' % daqPath)
+            self.logger.debug('Observer for %s is not active' % dataDirectory)
         
     @ThreadingUtility.synchronize
-    def observedFileUpdated(self, filePath, daqPath, experiment):
-        observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, daqPath=daqPath, experiment=experiment))
+    def fileUpdated(self, filePath, dataDirectory, experiment):
+        observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment))
         observedFile.setLastUpdatedTimestampToNow()
         self.observedFileMap[filePath] = observedFile
         self.logger.debug('Observed file updated: %s', observedFile)
@@ -92,12 +92,12 @@ class FileSystemObserver(threading.Thread,Singleton):
         return filePathsForProcessing
     
     @ThreadingUtility.synchronize
-    def processObservedFile(self, filePath):
+    def processFile(self, filePath):
         self.logger.debug('Processing file %s' % filePath)
         observedFile = self.observedFileMap.get(filePath)
         if observedFile is not None:
             del self.observedFileMap[filePath]
-            self.fileProcessingManager.processObservedFile(observedFile)
+            self.fileProcessingManager.processFile(observedFile)
      
     @ThreadingUtility.synchronize
     def start(self):
@@ -118,7 +118,7 @@ class FileSystemObserver(threading.Thread,Singleton):
 
                 filePathsForProcessing = self.checkObservedFilesForProcessing()
                 for filePath in filePathsForProcessing:
-                    self.processObservedFile(filePath)
+                    self.processFile(filePath)
             except Exception, ex:
                 self.logger.exception(ex)
             self.eventFlag.wait(timeout=self.fileSystemTimeoutInSeconds)
diff --git a/src/python/dm/ds_web_service/api/fileRestApi.py b/src/python/dm/ds_web_service/api/fileRestApi.py
index a0de650d..6e28ef0b 100755
--- a/src/python/dm/ds_web_service/api/fileRestApi.py
+++ b/src/python/dm/ds_web_service/api/fileRestApi.py
@@ -2,6 +2,7 @@
 
 import os
 import urllib
+import json
 
 from dm.common.utility.encoder import Encoder
 from dm.common.exceptions.dmException import DmException
@@ -14,17 +15,15 @@ class FileRestApi(DsRestApi):
         DsRestApi.__init__(self, username, password, host, port, protocol)
 
     @DsRestApi.execute
-    def processFile(self, fileName, filePath, experimentName):
+    def processFile(self, filePath, experimentName, fileInfo={}):
         url = '%s/files/processFile' % (self.getContextRoot())
-        if not fileName:
-            raise InvalidRequest('File name must be provided.')
-        url += '?fileName=%s' % Encoder.encode(fileName)
         if not filePath:
             raise InvalidRequest('File path must be provided.')
-        url += '&filePath=%s' % Encoder.encode(filePath)
         if not experimentName:
             raise InvalidRequest('Experiment name must be provided.')
-        url += '&experimentName=%s' % Encoder.encode(experimentName)
+        fileInfo['filePath'] = filePath
+        fileInfo['experimentName'] = experimentName
+        url += '?fileInfo=%s' % (Encoder.encode(json.dumps(fileInfo)))
         responseDict = self.sendSessionRequest(url=url, method='POST')
         return FileMetadata(responseDict)
 
diff --git a/src/python/dm/ds_web_service/service/fileSessionController.py b/src/python/dm/ds_web_service/service/fileSessionController.py
index 5652a39f..8dcbfb2f 100755
--- a/src/python/dm/ds_web_service/service/fileSessionController.py
+++ b/src/python/dm/ds_web_service/service/fileSessionController.py
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 
 import cherrypy
+import json
 
 from dm.common.service.dmSessionController import DmSessionController
 from dm.common.exceptions.invalidRequest import InvalidRequest
@@ -19,20 +20,16 @@ class FileSessionController(DmSessionController):
     @DmSessionController.require(DmSessionController.isAdministrator())
     @DmSessionController.execute
     def processFile(self, **kwargs):
-        fileName = kwargs.get('fileName')
-        if not fileName: 
-            raise InvalidRequest('Missing file name.')
-        fileName = Encoder.decode(fileName)
-        filePath = kwargs.get('filePath')
-        if not filePath:
-            raise InvalidRequest('Missing file path.')
-        filePath = Encoder.decode(filePath)
-        experimentName = kwargs.get('experimentName')
-        if not experimentName:
-            raise InvalidRequest('Missing experiment name.')
-        experimentName = Encoder.decode(experimentName)
-
-        response = self.fileSessionControllerImpl.processFile(fileName, filePath, experimentName).getFullJsonRep()
+        encodedFileInfo = kwargs.get('fileInfo')
+        if not encodedFileInfo:
+            raise InvalidRequest('Invalid file info provided.')
+        fileInfo = json.loads(Encoder.decode(encodedFileInfo))
+
+        if not fileInfo.has_key('experimentFilePath'):
+            raise InvalidRequest('Experiment file path is missing.')
+        if not fileInfo.has_key('experimentName'):
+            raise InvalidRequest('Experiment name is missing.')
+        response = self.fileSessionControllerImpl.processFile(fileInfo).getFullJsonRep()
         self.logger.debug('Returning: %s' % response)
         return response
 
diff --git a/src/python/dm/ds_web_service/service/impl/experimentManager.py b/src/python/dm/ds_web_service/service/impl/experimentManager.py
index e5af7d68..2a7bfddb 100755
--- a/src/python/dm/ds_web_service/service/impl/experimentManager.py
+++ b/src/python/dm/ds_web_service/service/impl/experimentManager.py
@@ -12,6 +12,7 @@ from dm.common.utility.valueUtility import ValueUtility
 from dm.common.utility.objectUtility import ObjectUtility
 from dm.common.utility.threadingUtility import ThreadingUtility
 from dm.common.db.api.experimentDbApi import ExperimentDbApi
+from dm.common.processing.fileProcessingManager import FileProcessingManager
 
 class ExperimentManager(Singleton):
 
@@ -40,6 +41,7 @@ class ExperimentManager(Singleton):
             self.experimentDbApi = ExperimentDbApi()
             self.platformUtility = None
             self.__configure()
+            self.fileProcessingManager = FileProcessingManager.getInstance()
             self.logger.debug('Initialization complete')
         finally:
             ExperimentManager.__instanceLock.release()
@@ -94,13 +96,20 @@ class ExperimentManager(Singleton):
                 self.platformUtility.addUserToGroup(username, experimentName)
       
     @ThreadingUtility.synchronize
-    def processExperimentFile(self, fileName, filePath, experiment):
+    def processExperimentFile(self, experimentFilePath, experiment, fileInfo={}):
         experimentName = experiment.get('name')
+        self.updateExperimentWithStorageDataDirectory(experiment)
+        storageDirectory = experiment.get('storageDirectory')
+        filePath = os.path.join(storageDirectory, experimentFilePath)
+        fileInfo['filePath'] = filePath
+        fileInfo['experiment'] = experiment
         if os.path.exists(filePath):
-            self.logger.debug('Processing file %s' % filePath)
+            self.logger.debug('Processing file path %s (fileInfo: %s)' % (filePath, fileInfo))
             if self.manageStoragePermissions:
                 self.logger.debug('Modifying permissions for %s' % filePath)
                 OsUtility.chmodPath(filePath, fileMode=self.FILE_PERMISSIONS_MODE)
+                self.logger.debug('Processing file %s' % filePath)
+                self.fileProcessingManager.processFile(fileInfo)
         else:
             self.logger.debug('File path %s does not exist' % filePath)
 
diff --git a/src/python/dm/ds_web_service/service/impl/fileSessionControllerImpl.py b/src/python/dm/ds_web_service/service/impl/fileSessionControllerImpl.py
index cbe32532..8c50368c 100755
--- a/src/python/dm/ds_web_service/service/impl/fileSessionControllerImpl.py
+++ b/src/python/dm/ds_web_service/service/impl/fileSessionControllerImpl.py
@@ -19,7 +19,9 @@ class FileSessionControllerImpl(DmObjectManager):
         DmObjectManager.__init__(self)
         self.experimentDbApi = ExperimentDbApi()
 
-    def processFile(self, fileName, filePath, experimentName, **kwargs):
+    def processFile(self, fileInfo):
+        experimentFilePath = fileInfo.get('experimentFilePath')
+        experimentName = fileInfo.get('experimentName')
         experiment = self.experimentDbApi.getExperimentByName(experimentName)
-        ExperimentManager.getInstance().processExperimentFile(fileName, filePath, experiment)
-        return FileMetadata({'fileName' : fileName})
+        ExperimentManager.getInstance().processExperimentFile(experimentFilePath, experiment, fileInfo)
+        return FileMetadata(fileInfo)
-- 
GitLab