diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index 75e71bf655afece8cbd72656c6b1c42a0edb36c9..279bdf83f13678cb73cced2ccf17bbc1e73dba98 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -48,6 +48,7 @@ class FileSystemObserver(threading.Thread,Singleton): self.observedFileMap = {} self.__configure() self.fileProcessingManager = FileProcessingManager.getInstance() + self.nProcessedFilesDict = {} self.logger.debug('Initialization complete') finally: FileSystemObserver.__instanceLock.release() @@ -125,11 +126,14 @@ class FileSystemObserver(threading.Thread,Singleton): # more files should be added for processing, so we need to # update all daq infos before going over observed files DaqTracker.getInstance().updateDaqInfos() - nWaitingFilesDict = {} for (filePath,observedFile) in self.observedFileMap.items(): - daqInfo = observedFile['daqInfo'] - daqId = daqInfo['id'] - nWaitingFiles = nWaitingFilesDict.get(daqId, daqInfo.get('nWaitingFiles', 0)) + daqId = observedFile['daqInfo']['id'] + daqInfo = DaqTracker.getInstance().getDaqInfo(daqId) + + nProcessedFiles = self.nProcessedFilesDict.get(daqId, 0) + nCompletedFiles = daqInfo.get('nCompletedFiles', 0) + nWaitingFiles = nProcessedFiles - nCompletedFiles + if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES: # We do not need to add more files for processing for this DAQ #self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nWaitingFiles, daqInfo['id'])) @@ -140,7 +144,7 @@ class FileSystemObserver(threading.Thread,Singleton): if deltaT > self.minFileProcessingDelayInSeconds: self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT)) filePathsForProcessing.append(filePath) - nWaitingFilesDict[daqId] = nWaitingFiles+1 + self.nProcessedFilesDict[daqId] = nProcessedFiles+1 return filePathsForProcessing @ThreadingUtility.synchronize