Skip to content
Snippets Groups Projects
Commit dbec81cf authored by sveseli's avatar sveseli
Browse files

fixes for daq scheduling

parent 5763b4d5
No related branches found
No related tags found
No related merge requests found
__version__ = "1.1 (2017.04.12)"
__version__ = "1.1 (2017.04.18)"
......@@ -132,6 +132,8 @@ class FileProcessingManager(threading.Thread,Singleton):
def queueFile(self, fileInfo, processingWaitTime=0):
owner = fileInfo.get('owner')
self.fileProcessingQueue.push(fileInfo, itemProcessingWaitTime=processingWaitTime, owner=owner)
if not owner:
self.logger.warn('Owner is not set for file %s' % fileInfo)
if not processingWaitTime:
self.eventFlag.set()
......
......@@ -235,7 +235,9 @@ class ExperimentSessionControllerImpl(DmObjectManager):
nProcessedFiles += 1
except Exception, ex:
self.logger.error('Processing error: %s', ex)
self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict)))
nCompletedFiles = uploadInfo.get('nCompletedFiles', 0)
nQueuedFiles = fileProcessingManager.getNumberOfQueuedFilesByOwner(uploadId)
self.logger.debug('Done preparing upload %s; it has a total of %s files (%s completed and %s queued)' % (uploadId, nFiles, nCompletedFiles, nQueuedFiles))
def getUploadInfo(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
......
......@@ -127,6 +127,9 @@ class FileSystemObserver(threading.Thread,Singleton):
# update all daq infos before going over observed files
DaqTracker.getInstance().updateDaqInfos()
nObservedFiles = len(self.observedFileMap)
if not nObservedFiles:
return
self.logger.debug('There are %s observed files waiting to be processed.' % (nObservedFiles))
scheduledDict = {}
skippedDict = {}
......@@ -136,7 +139,9 @@ class FileSystemObserver(threading.Thread,Singleton):
nCompletedFiles = daqInfo.get('nCompletedFiles', 0)
nQueuedFiles = self.fileProcessingManager.getNumberOfQueuedFilesByOwner(daqId)
if nQueuedFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
nScheduledFiles = scheduledDict.get(daqId,0)
#self.logger.debug('There are %s scheduled and %s already queued files for DAQ id %s.' % (nScheduledFiles, nQueuedFiles, daqId))
if nQueuedFiles+nScheduledFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
# We do not need to add more files for processing for this DAQ
#self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nQueuedFiles, daqId))
skippedDict[daqId] = skippedDict.get(daqId,0)+1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment