From 355aac0906f6d1ee1c88e92f442fe3404f12cc30 Mon Sep 17 00:00:00 2001
From: Sinisa Veseli <sveseli@aps.anl.gov>
Date: Wed, 26 Oct 2016 20:52:07 +0000
Subject: [PATCH] fixes for daq processing algorithm

---
 src/python/dm/__init__.py                              |  2 +-
 src/python/dm/common/utility/sftpUtility.py            | 10 ++++++----
 .../daq_web_service/service/impl/fileSystemObserver.py |  8 ++++++--
 3 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/src/python/dm/__init__.py b/src/python/dm/__init__.py
index 7000cfa1..04562a4f 100644
--- a/src/python/dm/__init__.py
+++ b/src/python/dm/__init__.py
@@ -1 +1 @@
-__version__ = "0.14 (2016.10.13)"
+__version__ = "0.15 (2016.10.26)"
diff --git a/src/python/dm/common/utility/sftpUtility.py b/src/python/dm/common/utility/sftpUtility.py
index 3a4829ca..846db26b 100755
--- a/src/python/dm/common/utility/sftpUtility.py
+++ b/src/python/dm/common/utility/sftpUtility.py
@@ -144,11 +144,13 @@ class SftpUtility:
 
 if __name__ == '__main__':
     #sftpUtility = SftpUtility('s1dserv', username='dmadmin', password='theKey12')
-    sftpUtility = SftpUtility('s1dserv',privateKey='/home/beams/DMADMIN/.ssh/id_dsa')
+    #sftpUtility = SftpUtility('s1dserv',privateKey='/home/beams/DMADMIN/.ssh/id_dsa')
+    sftpUtility = SftpUtility('xstor-devel')
+    files = sftpUtility.getFiles('/data/testing/id7-test02')
     #files = sftpUtility.getFiles('/export/dm/test')
-    #print files
-    print sftpUtility.getMd5Sum('/export/dm/test/testfile03(2nd).txt')
-    print sftpUtility.getMd5Sum('/export/dm/test/testfile 04.txt')
+    print files
+    #print sftpUtility.getMd5Sum('/export/dm/test/testfile03(2nd).txt')
+    #print sftpUtility.getMd5Sum('/export/dm/test/testfile 04.txt')
     #print 'Closing connection'
     #sftpUtility.closeConnection()
     #print sftpUtility.statFile('/export/dm/test/testfile01')
diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
index bafd9fa6..9a0a6d90 100755
--- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
+++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
@@ -124,11 +124,14 @@ class FileSystemObserver(threading.Thread,Singleton):
         # more files should be added for processing, so we need to
         # update all daq infos before going over observed files
         DaqTracker.getInstance().updateDaqInfos()
+        nWaitingFilesDict = {}
         for (filePath,observedFile) in self.observedFileMap.items():
             daqInfo = observedFile['daqInfo']
-            nWaitingFiles = daqInfo.get('nWaitingFiles', 0)
+            daqId = daqInfo['id']
+            nWaitingFiles = nWaitingFilesDict.get(daqId, daqInfo.get('nWaitingFiles', 0))
             if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
                 # We do not need to add more files for processing for this DAQ
+                #self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nWaitingFiles, daqInfo['id']))
                 continue
 
             timestamp = observedFile.get('lastUpdateTime')
@@ -136,6 +139,7 @@ class FileSystemObserver(threading.Thread,Singleton):
             if deltaT > self.minFileProcessingDelayInSeconds:
                 self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
                 filePathsForProcessing.append(filePath)
+                nWaitingFilesDict[daqId] = nWaitingFiles+1
         return filePathsForProcessing
     
     @ThreadingUtility.synchronize
@@ -163,7 +167,7 @@ class FileSystemObserver(threading.Thread,Singleton):
             try:
                 filePathsForProcessing = self.checkObservedFilesForProcessing()
                 if len(filePathsForProcessing):
-                    self.logger.debug('Checking observed files')
+                    self.logger.debug('Will queue %s new files for processing' % (len(filePathsForProcessing)))
                 for filePath in filePathsForProcessing:
                     self.processFile(filePath)
             except Exception, ex:
-- 
GitLab