Skip to content
Snippets Groups Projects
Commit 1beadf0c authored by sveseli's avatar sveseli
Browse files

enhanced daq processing algorithm to avoid resource starvation between...

enhanced daq processing algorithm to avoid resource starvation between simultaneous DAQs and uploads
parent db74c565
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,8 @@ Release 0.15 ()
=============================
- Resolved issue with incorrect accounting of processing errors for DAQs
- Enhanced DAQ processing algorithm to avoid resource starvation between
simultaneous DAQs and uploads
Release 0.14 (10/14/2016)
=============================
......
......@@ -11,7 +11,7 @@ from dm.common.utility.timeUtility import TimeUtility
class DaqInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ]
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ]
def __init__(self, dict={}):
......@@ -61,6 +61,9 @@ class DaqInfo(DmObject):
nProcessingErrors = self.get('nProcessingErrors', 0)
processingErrors = self.get('processingErrors', {})
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nCompletedFiles'] = nCompletedFiles
nWaitingFiles = nFiles-nCompletedFiles
self['nWaitingFiles'] = nWaitingFiles
startTime = self.get('startTime', now)
runTime = now - startTime
......
......@@ -10,7 +10,7 @@ from dm.common.utility.timeUtility import TimeUtility
class UploadInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
......@@ -69,6 +69,8 @@ class UploadInfo(DmObject):
processingErrors = self.get('processingErrors', {})
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nCompletedFiles'] = nCompletedFiles
nWaitingFiles = nFiles-nCompletedFiles-nCancelledFiles
self['nWaitingFiles'] = nWaitingFiles
startTime = self.get('startTime', now)
runTime = now - startTime
......
......@@ -71,6 +71,13 @@ class DaqTracker(ObjectTracker):
def getDaqInfo(self, id):
return self.get(id)
def updateDaqInfos(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING):
daqInfoList = self.getAll()
for daqInfo in daqInfoList:
if daqInfo.get('status', '') == status:
daqInfo.updateStatus()
def getDaqInfos(self, status=None):
daqInfoList = self.getAll()
if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY:
......
......@@ -24,6 +24,7 @@ class FileSystemObserver(threading.Thread,Singleton):
MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds'
FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds'
FILE_SYSTEM_OBSERVER_AGENT_KEY = 'filesystemobserveragent'
DAQ_CHUNK_SIZE_IN_FILES = 500
# Singleton.
__instanceLock = threading.RLock()
......@@ -119,7 +120,17 @@ class FileSystemObserver(threading.Thread,Singleton):
def checkObservedFilesForProcessing(self):
now = time.time()
filePathsForProcessing = []
# We use number of waiting files to determine whether
# more files should be added for processing, so we need to
# update all daq infos before going over observed files
DaqTracker.getInstance().updateDaqInfos()
for (filePath,observedFile) in self.observedFileMap.items():
daqInfo = observedFile['daqInfo']
nWaitingFiles = daqInfo.get('nWaitingFiles', 0)
if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
# We do not need to add more files for processing for this DAQ
continue
timestamp = observedFile.get('lastUpdateTime')
deltaT = now - timestamp
if deltaT > self.minFileProcessingDelayInSeconds:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment