From dbec81cf05bf74c3604d75ba26c820b2ffbf2bec Mon Sep 17 00:00:00 2001
From: Sinisa Veseli <sveseli@aps.anl.gov>
Date: Tue, 18 Apr 2017 19:34:32 +0000
Subject: [PATCH] fixes for daq scheduling

---
 src/python/dm/__init__.py                                  | 2 +-
 src/python/dm/common/processing/fileProcessingManager.py   | 2 ++
 .../service/impl/experimentSessionControllerImpl.py        | 4 +++-
 .../dm/daq_web_service/service/impl/fileSystemObserver.py  | 7 ++++++-
 4 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/src/python/dm/__init__.py b/src/python/dm/__init__.py
index 17a60e3c..f881e473 100644
--- a/src/python/dm/__init__.py
+++ b/src/python/dm/__init__.py
@@ -1 +1 @@
-__version__ = "1.1 (2017.04.12)"
+__version__ = "1.1 (2017.04.18)"
diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py
index b55001c2..ac72f784 100755
--- a/src/python/dm/common/processing/fileProcessingManager.py
+++ b/src/python/dm/common/processing/fileProcessingManager.py
@@ -132,6 +132,8 @@ class FileProcessingManager(threading.Thread,Singleton):
     def queueFile(self, fileInfo, processingWaitTime=0):
         owner = fileInfo.get('owner')
         self.fileProcessingQueue.push(fileInfo, itemProcessingWaitTime=processingWaitTime, owner=owner)
+        if not owner:
+	    self.logger.warn('Owner is not set for file %s' % fileInfo)
         if not processingWaitTime:
             self.eventFlag.set()
 
diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
index e7e03204..7732c9dd 100755
--- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
+++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
@@ -235,7 +235,9 @@ class ExperimentSessionControllerImpl(DmObjectManager):
                 nProcessedFiles += 1
             except Exception, ex:
                 self.logger.error('Processing error: %s', ex)
-        self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict)))
+        nCompletedFiles = uploadInfo.get('nCompletedFiles', 0)
+        nQueuedFiles = fileProcessingManager.getNumberOfQueuedFilesByOwner(uploadId)
+        self.logger.debug('Done preparing upload %s; it has a total of %s files (%s completed and %s queued)' % (uploadId, nFiles, nCompletedFiles, nQueuedFiles))
 
     def getUploadInfo(self, id):
         uploadInfo = UploadTracker.getInstance().get(id)
diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
index 1cb0f943..a3150139 100755
--- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
+++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
@@ -127,6 +127,9 @@ class FileSystemObserver(threading.Thread,Singleton):
         # update all daq infos before going over observed files
         DaqTracker.getInstance().updateDaqInfos()
         nObservedFiles = len(self.observedFileMap)
+        if not nObservedFiles:
+            return
+        
         self.logger.debug('There are %s observed files waiting to be processed.' % (nObservedFiles))
         scheduledDict = {}
         skippedDict = {}
@@ -136,7 +139,9 @@ class FileSystemObserver(threading.Thread,Singleton):
 
             nCompletedFiles = daqInfo.get('nCompletedFiles', 0)
             nQueuedFiles = self.fileProcessingManager.getNumberOfQueuedFilesByOwner(daqId)
-            if nQueuedFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
+            nScheduledFiles = scheduledDict.get(daqId,0)
+            #self.logger.debug('There are %s scheduled and %s already queued files for DAQ id %s.' % (nScheduledFiles, nQueuedFiles, daqId))
+            if nQueuedFiles+nScheduledFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
                 # We do not need to add more files for processing for this DAQ
                 #self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nQueuedFiles, daqId))
                 skippedDict[daqId] = skippedDict.get(daqId,0)+1
-- 
GitLab