diff --git a/doc/RELEASE_NOTES.txt b/doc/RELEASE_NOTES.txt index 1ec6810c512f3177241a861da83d4c7c9bb3e298..89b75fb7f84aa0a0e0c8a9b06b458034eee1b87c 100644 --- a/doc/RELEASE_NOTES.txt +++ b/doc/RELEASE_NOTES.txt @@ -2,6 +2,8 @@ Release 0.15 () ============================= - Resolved issue with incorrect accounting of processing errors for DAQs +- Enhanced DAQ processing algorithm to avoid resource starvation between + simultaneous DAQs and uploads Release 0.14 (10/14/2016) ============================= diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index 660568de591e9cd75e56ed7c613d39475d8a1a23..0dc1c9f2fa88454163969105746551d41d651b65 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -11,7 +11,7 @@ from dm.common.utility.timeUtility import TimeUtility class DaqInfo(DmObject): - DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ] + DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ] def __init__(self, dict={}): @@ -61,6 +61,9 @@ class DaqInfo(DmObject): nProcessingErrors = self.get('nProcessingErrors', 0) processingErrors = self.get('processingErrors', {}) nCompletedFiles = nProcessedFiles+nProcessingErrors + self['nCompletedFiles'] = nCompletedFiles + nWaitingFiles = nFiles-nCompletedFiles + self['nWaitingFiles'] = nWaitingFiles startTime = self.get('startTime', now) runTime = now - startTime diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index fc94a2081d3493b44a2ec202708dcd99f5388a89..4c04ffa67304c1aead1098814e54d878e5bf45ce 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -10,7 +10,7 @@ from dm.common.utility.timeUtility import TimeUtility class UploadInfo(DmObject): - DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ] + DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ] def __init__(self, dict={}): DmObject.__init__(self, dict) @@ -69,6 +69,8 @@ class UploadInfo(DmObject): processingErrors = self.get('processingErrors', {}) nCompletedFiles = nProcessedFiles+nProcessingErrors self['nCompletedFiles'] = nCompletedFiles + nWaitingFiles = nFiles-nCompletedFiles-nCancelledFiles + self['nWaitingFiles'] = nWaitingFiles startTime = self.get('startTime', now) runTime = now - startTime diff --git a/src/python/dm/daq_web_service/service/impl/daqTracker.py b/src/python/dm/daq_web_service/service/impl/daqTracker.py index 2f8f17d6a23b3c7a4804ddc748d88ca744c5c951..72d9f841ef89baca2257e9ab3cb8ba47ebb9d05a 100755 --- a/src/python/dm/daq_web_service/service/impl/daqTracker.py +++ b/src/python/dm/daq_web_service/service/impl/daqTracker.py @@ -71,6 +71,13 @@ class DaqTracker(ObjectTracker): def getDaqInfo(self, id): return self.get(id) + def updateDaqInfos(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING): + + daqInfoList = self.getAll() + for daqInfo in daqInfoList: + if daqInfo.get('status', '') == status: + daqInfo.updateStatus() + def getDaqInfos(self, status=None): daqInfoList = self.getAll() if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY: diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index 6555ada21ddad466a657c0930ee076e5c1b7b59c..bafd9fa6e4be42e0f7b4e6f1c159a9b038a46a24 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -24,6 +24,7 @@ class FileSystemObserver(threading.Thread,Singleton): MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds' FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds' FILE_SYSTEM_OBSERVER_AGENT_KEY = 'filesystemobserveragent' + DAQ_CHUNK_SIZE_IN_FILES = 500 # Singleton. __instanceLock = threading.RLock() @@ -119,7 +120,17 @@ class FileSystemObserver(threading.Thread,Singleton): def checkObservedFilesForProcessing(self): now = time.time() filePathsForProcessing = [] + # We use number of waiting files to determine whether + # more files should be added for processing, so we need to + # update all daq infos before going over observed files + DaqTracker.getInstance().updateDaqInfos() for (filePath,observedFile) in self.observedFileMap.items(): + daqInfo = observedFile['daqInfo'] + nWaitingFiles = daqInfo.get('nWaitingFiles', 0) + if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES: + # We do not need to add more files for processing for this DAQ + continue + timestamp = observedFile.get('lastUpdateTime') deltaT = now - timestamp if deltaT > self.minFileProcessingDelayInSeconds: