From 329269fa0be2dbf8b07585f8c4560427612f50c1 Mon Sep 17 00:00:00 2001
From: Sinisa Veseli <sveseli@aps.anl.gov>
Date: Thu, 2 Mar 2017 19:41:44 +0000
Subject: [PATCH] fixed issue with starting directory uploads or daqs with
 large source directories

---
 src/python/dm/common/objects/daqInfo.py       |  2 +-
 .../service/impl/daqTracker.py                |  9 +++-
 .../impl/experimentSessionControllerImpl.py   | 41 +++++++++++++++----
 .../impl/pollingFileSystemObserverAgent.py    | 23 ++++++++---
 4 files changed, 58 insertions(+), 17 deletions(-)

diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py
index ae9a0d5b..607980c3 100755
--- a/src/python/dm/common/objects/daqInfo.py
+++ b/src/python/dm/common/objects/daqInfo.py
@@ -11,7 +11,7 @@ from dm.common.utility.timeUtility import TimeUtility
 
 class DaqInfo(DmObject):
 
-    DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ]
+    DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
     
 
     def __init__(self, dict={}):
diff --git a/src/python/dm/daq_web_service/service/impl/daqTracker.py b/src/python/dm/daq_web_service/service/impl/daqTracker.py
index 72d9f841..4cc952c4 100755
--- a/src/python/dm/daq_web_service/service/impl/daqTracker.py
+++ b/src/python/dm/daq_web_service/service/impl/daqTracker.py
@@ -54,7 +54,7 @@ class DaqTracker(ObjectTracker):
         self.put(daqId, daqInfo2)
         return daqInfo2
 
-    def stopDaq(self, experiment, dataDirectory):
+    def stopDaq(self, experiment, dataDirectory, status=None, errorMessage=None):
         experimentName = experiment.get('name') 
         dataDir = os.path.normpath(dataDirectory)
         activeDaqKey = experimentName + dataDir
@@ -64,7 +64,12 @@ class DaqTracker(ObjectTracker):
         endTime = time.time()
         daqInfo['endTime'] = endTime
         daqInfo['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
-        daqInfo.updateStatus()
+        if status:
+            daqInfo['status'] = status
+        else:
+            daqInfo.updateStatus()
+        if errorMessage:
+            daqInfo['errorMessage'] = errorMessage
         del self.activeDaqDict[activeDaqKey]
         return daqInfo
 
diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
index 09600f4f..e8e60c8a 100755
--- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
+++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
@@ -80,12 +80,12 @@ class ExperimentSessionControllerImpl(DmObjectManager):
         FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
         daqInfo.updateStatus()
         daqId = daqInfo.get('id')
-        self.logger.error('Stopped DAQ id %s for experiment %s' % (daqId, experimentName))
+        self.logger.debug('Stopped DAQ id %s for experiment %s' % (daqId, experimentName))
 
         # Prepare upload on exit
         uploadDataDirectoryOnExit = daqInfo.get('uploadDataDirectoryOnExit')
         if uploadDataDirectoryOnExit:
-            self.logger.error('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName))
+            self.logger.debug('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName))
             daqInfo2 = {}
             daqInfo2['originalDaqId'] = daqId
             uploadTargetDirectoryOnExit = daqInfo.get('uploadTargetDirectoryOnExit')
@@ -137,7 +137,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
 
         startTime = time.time()
         uploadInfo['startTime'] = startTime
-        uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
+        uploadInfo['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
         daqInfo['experimentName'] = experimentName
         daqInfo['storageDirectory'] = experiment.get('storageDirectory')
         daqInfo['storageHost'] = experiment.get('storageHost')
@@ -262,7 +262,6 @@ class ExperimentSessionControllerImpl(DmObjectManager):
         storageDirectory = experiment.get('storageDirectory')
         if storageDirectory is None:
             raise InvalidRequest('Experiment %s has not been started.' % experimentName)
-        filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
         uploadId = str(uuid.uuid4())
         self.logger.debug('Starting upload id %s' % uploadId)
         uploadInfo = DirectoryUploadInfo(daqInfo)
@@ -275,7 +274,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
 
         startTime = time.time()
         uploadInfo['startTime'] = startTime
-        uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
+        uploadInfo['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
         daqInfo['experimentName'] = experimentName
         daqInfo['storageDirectory'] = experiment.get('storageDirectory')
         daqInfo['storageHost'] = experiment.get('storageHost')
@@ -287,11 +286,37 @@ class ExperimentSessionControllerImpl(DmObjectManager):
         if len(skipPlugins):
             skipPlugins = skipPlugins.split(',')
             uploadInfo['skipPlugins'] = skipPlugins
-        else:
-            skipPlugins = []
+
+        UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
+        uploadInfo['nFiles'] = 0
+        uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
+        self.logger.debug('Starting upload directory %s timer for experiment %s' % (dataDirectory, experimentName))
+        timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadDirectory, args=[uploadInfo, daqInfo, experiment])
+        timer.start()
+        return uploadInfo
+
+    def prepareUploadDirectory(self, uploadInfo, daqInfo, experiment):
+        uploadId = uploadInfo['id']
+        dataDirectory = uploadInfo['dataDirectory'] 
+        experimentName = uploadInfo['experimentName'] 
+        skipPlugins = uploadInfo.get('skipPlugins', [])
+
+        self.logger.debug('Preparing directory %s upload for experiment %s' % (dataDirectory, experimentName))
+
+        try:
+            filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
+            self.logger.debug('There are %s files in directory %s (experiment %s)' % (len(filePathsDict), dataDirectory, experimentName))
+        except Exception, ex:
+            self.logger.error('Cannot retrieve files for directory upload %s: %s' % (uploadId, str(ex)))
+            self.logger.error('Marking directory upload %s as failed' % (uploadId))
+            uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
+            uploadInfo['errorMessage'] = str(ex)
+            return
+
         fileProcessingManager = FileProcessingManager.getInstance()
         processingInfo = {}
         uploadInfo['processingInfo'] = processingInfo
+        self.logger.debug('Preparing plugin timers for directory %s upload (experiment %s)' % (dataDirectory, experimentName))
         for processorKey in fileProcessingManager.fileProcessorKeyList:
             processor = fileProcessingManager.fileProcessorDict.get(processorKey)
             processorName = processor.name
@@ -303,10 +328,8 @@ class ExperimentSessionControllerImpl(DmObjectManager):
                 processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_PENDING}
                 timer.start()
 
-        UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
         uploadInfo['nFiles'] = len(filePathsDict)
         uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
-        return uploadInfo
 
     def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict):
         uploadId = uploadInfo.get('id')
diff --git a/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py
index 9d8b2834..ec63e4eb 100755
--- a/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py
+++ b/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py
@@ -1,11 +1,14 @@
 #!/usr/bin/env python
 
 from threading import Timer
-from fileSystemObserverAgent import FileSystemObserverAgent
+from dm.common.constants import dmProcessingStatus
 from dm.common.utility.osUtility import OsUtility
+from fileSystemObserverAgent import FileSystemObserverAgent
+from daqTracker import DaqTracker
 
 class PollingFileSystemObserverAgent(FileSystemObserverAgent):
 
+    DEFAULT_START_OBSERVING_PATH_DELAY_IN_SECONDS = 3
     DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
     DEFAULT_RETRY_PERIOD_IN_SECONDS = 60
 
@@ -75,13 +78,23 @@ class PollingFileSystemObserverAgent(FileSystemObserverAgent):
             self.logger.debug('Observer for %s is already active' % dataDirectory)
             return
         self.logger.debug('Starting observer for %s' % dataDirectory)
-        fileDict = self.getFiles(dataDirectory)
         observedDirInfo = self.observedDirDict.get(dataDirectory, {})
-        observedDirInfo['files'] = fileDict
         observedDirInfo['experiment'] = experiment
         self.observedDirDict[dataDirectory] = observedDirInfo 
-        self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
-        
+        t = Timer(self.DEFAULT_START_OBSERVING_PATH_DELAY_IN_SECONDS, self.startObservingPathTimer, [observedDirInfo, dataDirectory, experiment])
+        t.start()
+
+    def startObservingPathTimer(self, observedDirInfo, dataDirectory, experiment):
+        try:
+            self.logger.debug('Starting initial retrieval of files for directory %s' % (dataDirectory))
+            fileDict = self.getFiles(dataDirectory)
+            observedDirInfo['files'] = fileDict
+            self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
+        except Exception, ex:
+            self.logger.error('Could not retrieve files for directory %s: %s' % (dataDirectory,ex))
+            DaqTracker.getInstance().stopDaq(experiment, dataDirectory, status=dmProcessingStatus.DM_PROCESSING_STATUS_FAILED, errorMessage=str(ex))
+            self.logger.error('Marked as failed DAQ for experiment %s, data directory %s' % (experiment['name'], dataDirectory))
+
     def stopObservingPath(self, dataDirectory, experiment):
         observedDirInfo = self.observedDirDict.get(dataDirectory)
         if not observedDirInfo:
-- 
GitLab