From 0befd799f4c52e22cdf8af14c543569fd6fd1454 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Sun, 5 Mar 2017 05:47:44 +0000 Subject: [PATCH] fix daq queuing issue --- .../service/impl/fileSystemObserver.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index 75e71bf6..279bdf83 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -48,6 +48,7 @@ class FileSystemObserver(threading.Thread,Singleton): self.observedFileMap = {} self.__configure() self.fileProcessingManager = FileProcessingManager.getInstance() + self.nProcessedFilesDict = {} self.logger.debug('Initialization complete') finally: FileSystemObserver.__instanceLock.release() @@ -125,11 +126,14 @@ class FileSystemObserver(threading.Thread,Singleton): # more files should be added for processing, so we need to # update all daq infos before going over observed files DaqTracker.getInstance().updateDaqInfos() - nWaitingFilesDict = {} for (filePath,observedFile) in self.observedFileMap.items(): - daqInfo = observedFile['daqInfo'] - daqId = daqInfo['id'] - nWaitingFiles = nWaitingFilesDict.get(daqId, daqInfo.get('nWaitingFiles', 0)) + daqId = observedFile['daqInfo']['id'] + daqInfo = DaqTracker.getInstance().getDaqInfo(daqId) + + nProcessedFiles = self.nProcessedFilesDict.get(daqId, 0) + nCompletedFiles = daqInfo.get('nCompletedFiles', 0) + nWaitingFiles = nProcessedFiles - nCompletedFiles + if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES: # We do not need to add more files for processing for this DAQ #self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nWaitingFiles, daqInfo['id'])) @@ -140,7 +144,7 @@ class FileSystemObserver(threading.Thread,Singleton): if deltaT > self.minFileProcessingDelayInSeconds: self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT)) filePathsForProcessing.append(filePath) - nWaitingFilesDict[daqId] = nWaitingFiles+1 + self.nProcessedFilesDict[daqId] = nProcessedFiles+1 return filePathsForProcessing @ThreadingUtility.synchronize -- GitLab