Skip to content
Snippets Groups Projects
Commit 9be69efd authored by sveseli's avatar sveseli
Browse files

reworked accounting and status monitoring for daqs and uploads

parent f00bf950
No related branches found
No related tags found
No related merge requests found
Release 1.2 (04/18/2018)
=============================
- Modified scheduling algorithm for DAQs/uploads to simplify status monitoring
Release 1.1 (03/10/2017) Release 1.1 (03/10/2017)
============================= =============================
......
...@@ -19,6 +19,12 @@ class DaqInfo(DmObject): ...@@ -19,6 +19,12 @@ class DaqInfo(DmObject):
self.lock = threading.RLock() self.lock = threading.RLock()
self.originalKeys = dict.keys() self.originalKeys = dict.keys()
def getMonitorType(self):
return 'DAQ'
def getMonitorSignature(self):
return 'DAQ %s' % self.get('id', '')
def fileAdded(self, filePath): def fileAdded(self, filePath):
self.lock.acquire() self.lock.acquire()
try: try:
......
...@@ -26,6 +26,7 @@ class ObservedFile(DmObject): ...@@ -26,6 +26,7 @@ class ObservedFile(DmObject):
self['experimentName'] = experiment.get('name') self['experimentName'] = experiment.get('name')
self['storageHost'] = experiment.get('storageHost') self['storageHost'] = experiment.get('storageHost')
self['storageDirectory'] = experiment.get('storageDirectory') self['storageDirectory'] = experiment.get('storageDirectory')
def setLastUpdateTimeToNow(self): def setLastUpdateTimeToNow(self):
self['lastUpdateTime'] = time.time() self['lastUpdateTime'] = time.time()
......
...@@ -16,6 +16,12 @@ class UploadInfo(DmObject): ...@@ -16,6 +16,12 @@ class UploadInfo(DmObject):
DmObject.__init__(self, dict) DmObject.__init__(self, dict)
self.lock = threading.RLock() self.lock = threading.RLock()
def getMonitorType(self):
return 'Upload'
def getMonitorSignature(self):
return 'Upload %s' % self.get('id', '')
def fileProcessed(self, filePath, processingEndTime): def fileProcessed(self, filePath, processingEndTime):
self.lock.acquire() self.lock.acquire()
try: try:
......
...@@ -129,9 +129,20 @@ class FileProcessingManager(threading.Thread,Singleton): ...@@ -129,9 +129,20 @@ class FileProcessingManager(threading.Thread,Singleton):
self.logger.debug('There are %s processing candidates remaining' % (len(filePathsDict))) self.logger.debug('There are %s processing candidates remaining' % (len(filePathsDict)))
return checkedFilePathsDict return checkedFilePathsDict
def processFile(self, fileInfo): def queueFile(self, fileInfo, processingWaitTime=0):
self.fileProcessingQueue.push(fileInfo) owner = fileInfo.get('owner')
self.eventFlag.set() self.fileProcessingQueue.push(fileInfo, itemProcessingWaitTime=processingWaitTime, owner=owner)
if not processingWaitTime:
self.eventFlag.set()
def getQueuedFile(self):
return self.fileProcessingQueue.pop()
def getNumberOfQueuedFilesByOwner(self, owner):
return self.fileProcessingQueue.getQueuedItemCountByOwner(owner)
def getNumberOfQueuedFiles(self):
return self.fileProcessingQueue.getLength()
def appendFileProcessor(self, fileProcessor): def appendFileProcessor(self, fileProcessor):
key = fileProcessor.__class__.__name__ key = fileProcessor.__class__.__name__
...@@ -146,7 +157,7 @@ class FileProcessingManager(threading.Thread,Singleton): ...@@ -146,7 +157,7 @@ class FileProcessingManager(threading.Thread,Singleton):
self.logger.debug('Starting file processing threads') self.logger.debug('Starting file processing threads')
for i in range(0, self.nProcessingThreads): for i in range(0, self.nProcessingThreads):
tName = 'FileProcessingThread-%s' % i tName = 'FileProcessingThread-%s' % i
t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue) t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList)
t.start() t.start()
self.fileProcessingThreadDict[tName] = t self.fileProcessingThreadDict[tName] = t
finally: finally:
......
...@@ -10,7 +10,7 @@ class FileProcessingThread(threading.Thread): ...@@ -10,7 +10,7 @@ class FileProcessingThread(threading.Thread):
THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0 THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0
def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue): def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.setName(name) self.setName(name)
...@@ -18,7 +18,6 @@ class FileProcessingThread(threading.Thread): ...@@ -18,7 +18,6 @@ class FileProcessingThread(threading.Thread):
self.fileProcessingManager = fileProcessingManager self.fileProcessingManager = fileProcessingManager
self.fileProcessorDict = fileProcessorDict self.fileProcessorDict = fileProcessorDict
self.fileProcessorKeyList = fileProcessorKeyList self.fileProcessorKeyList = fileProcessorKeyList
self.fileProcessingQueue = fileProcessingQueue
self.logger = LoggingManager.getInstance().getLogger(name) self.logger = LoggingManager.getInstance().getLogger(name)
def processFile(self, fileInfo): def processFile(self, fileInfo):
...@@ -32,7 +31,7 @@ class FileProcessingThread(threading.Thread): ...@@ -32,7 +31,7 @@ class FileProcessingThread(threading.Thread):
try: try:
statusMonitor = fileInfo.get('statusMonitor') statusMonitor = fileInfo.get('statusMonitor')
statusMonitorId = '' statusMonitorSignature = ''
if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING: if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
self.logger.debug('File %s processing is cancelled' % (filePath)) self.logger.debug('File %s processing is cancelled' % (filePath))
endProcessingTime = time.time() endProcessingTime = time.time()
...@@ -40,8 +39,8 @@ class FileProcessingThread(threading.Thread): ...@@ -40,8 +39,8 @@ class FileProcessingThread(threading.Thread):
statusMonitor.updateStatus() statusMonitor.updateStatus()
return return
if statusMonitor: if statusMonitor:
statusMonitorId = statusMonitor.get('id', '') statusMonitorSignature = statusMonitor.getMonitorSignature()
self.logger.debug('Starting to process file %s (upload or DAQ id: %s)' % (filePath, statusMonitorId)) self.logger.debug('%s: starting to process file %s)' % (statusMonitorSignature, filePath))
startProcessingTime = fileInfo.get('startProcessingTime', time.time()) startProcessingTime = fileInfo.get('startProcessingTime', time.time())
fileInfo['startProcessingTime'] = startProcessingTime fileInfo['startProcessingTime'] = startProcessingTime
nProcessors = len(self.fileProcessorKeyList) nProcessors = len(self.fileProcessorKeyList)
...@@ -68,12 +67,12 @@ class FileProcessingThread(threading.Thread): ...@@ -68,12 +67,12 @@ class FileProcessingThread(threading.Thread):
self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
if processorNumber == nProcessors: if processorNumber == nProcessors:
endProcessingTime = time.time() endProcessingTime = time.time()
self.logger.debug('File %s processing is complete (upload or DAQ id: %s)' % (filePath, statusMonitorId)) self.logger.debug('%s: file %s processing is complete' % (statusMonitorSignature, filePath))
if statusMonitor: if statusMonitor:
statusMonitor.fileProcessed(filePath, endProcessingTime) statusMonitor.fileProcessed(filePath, endProcessingTime)
statusMonitor.updateStatus() statusMonitor.updateStatus()
nProcessedFiles = statusMonitor.get('nProcessedFiles', 0) nProcessedFiles = statusMonitor.get('nProcessedFiles', 0)
self.logger.debug('Upload or DAQ id %s has processed %s files so far)' % (statusMonitorId, nProcessedFiles)) self.logger.debug('%s: processed %s files so far)' % (statusMonitorSignature, nProcessedFiles))
except Exception, ex: except Exception, ex:
self.logger.exception(ex) self.logger.exception(ex)
processingError = '%s processing error: %s' % (processorName, str(ex)) processingError = '%s processing error: %s' % (processorName, str(ex))
...@@ -104,7 +103,7 @@ class FileProcessingThread(threading.Thread): ...@@ -104,7 +103,7 @@ class FileProcessingThread(threading.Thread):
else: else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s may retry processing file %s after at least %s seconds' % (processorName, filePath, retryWaitPeriod)) self.logger.debug('%s may retry processing file %s after at least %s seconds' % (processorName, filePath, retryWaitPeriod))
self.fileProcessingQueue.push(fileInfo, retryWaitPeriod) self.fileProcessingManager.queueFile(fileInfo, retryWaitPeriod)
# Do not process this file further until # Do not process this file further until
# this plugin is done # this plugin is done
break break
...@@ -122,10 +121,10 @@ class FileProcessingThread(threading.Thread): ...@@ -122,10 +121,10 @@ class FileProcessingThread(threading.Thread):
while True: while True:
try: try:
fileInfo = self.fileProcessingQueue.pop() fileInfo = self.fileProcessingManager.getQueuedFile()
if fileInfo is None: if fileInfo is None:
break break
self.logger.debug('Processing queue depth after pop: %s', self.fileProcessingQueue.getLength()) self.logger.debug('Processing queue depth after pop: %s', self.fileProcessingManager.getNumberOfQueuedFiles())
self.processFile(fileInfo) self.processFile(fileInfo)
except Exception, ex: except Exception, ex:
self.logger.exception(ex) self.logger.exception(ex)
......
...@@ -12,8 +12,9 @@ class TimeBasedProcessingQueue: ...@@ -12,8 +12,9 @@ class TimeBasedProcessingQueue:
self.lock = threading.RLock() self.lock = threading.RLock()
self.queue = [] self.queue = []
self.itemPopTimeList = [] self.itemPopTimeList = []
self.queuedItemCountMapByOwner = {}
def push(self, item, itemProcessingWaitTime=0): def push(self, item, itemProcessingWaitTime=0, owner=None):
self.lock.acquire() self.lock.acquire()
try: try:
earliestPopTime = time.time() + itemProcessingWaitTime earliestPopTime = time.time() + itemProcessingWaitTime
...@@ -23,7 +24,9 @@ class TimeBasedProcessingQueue: ...@@ -23,7 +24,9 @@ class TimeBasedProcessingQueue:
break break
popIndex += 1 popIndex += 1
self.itemPopTimeList.insert(popIndex, earliestPopTime) self.itemPopTimeList.insert(popIndex, earliestPopTime)
self.queue.insert(popIndex,item) self.queue.insert(popIndex,(item,owner))
if owner:
self.queuedItemCountMapByOwner[owner] = self.queuedItemCountMapByOwner.get(owner, 0) + 1
finally: finally:
self.lock.release() self.lock.release()
...@@ -35,8 +38,12 @@ class TimeBasedProcessingQueue: ...@@ -35,8 +38,12 @@ class TimeBasedProcessingQueue:
if len(self.queue): if len(self.queue):
if self.itemPopTimeList[0] <= time.time(): if self.itemPopTimeList[0] <= time.time():
del self.itemPopTimeList[0] del self.itemPopTimeList[0]
item = self.queue[0] (item,owner) = self.queue[0]
del self.queue[0] del self.queue[0]
if owner:
self.queuedItemCountMapByOwner[owner] = self.queuedItemCountMapByOwner.get(owner) - 1
if self.queuedItemCountMapByOwner[owner] <= 0:
del self.queuedItemCountMapByOwner[owner]
return item return item
finally: finally:
self.lock.release() self.lock.release()
...@@ -47,6 +54,11 @@ class TimeBasedProcessingQueue: ...@@ -47,6 +54,11 @@ class TimeBasedProcessingQueue:
def isEmpty(self): def isEmpty(self):
return len(self.queue) == 0 return len(self.queue) == 0
def getQueuedItemCountByOwner(self, owner):
if owner:
return self.queuedItemCountMapByOwner.get(owner, 0)
return 0
#################################################################### ####################################################################
# Testing # Testing
......
...@@ -216,11 +216,12 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -216,11 +216,12 @@ class ExperimentSessionControllerImpl(DmObjectManager):
self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles)) self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
return return
nCompletedFiles = uploadInfo.get('nCompletedFiles', 0) nCompletedFiles = uploadInfo.get('nCompletedFiles', 0)
nWaitingFiles = nProcessedFiles - nCompletedFiles nQueuedFiles = fileProcessingManager.getNumberOfQueuedFilesByOwner(uploadId)
if nWaitingFiles < self.UPLOAD_CHUNK_SIZE_IN_FILES:
if nQueuedFiles < self.UPLOAD_CHUNK_SIZE_IN_FILES:
# We need to add more files for upload # We need to add more files for upload
break break
self.logger.debug('Upload %s has %s files waiting for upload, will not add any more for %s seconds' % (uploadId, nWaitingFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)) self.logger.debug('Upload %s has %s completed files, %s queued files waiting for upload, will not add any more for %s seconds; upload timer has processed %s out of %s files' % (uploadId, nCompletedFiles, nQueuedFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS, nProcessedFiles, nFiles))
time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS) time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory) fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory)
...@@ -229,7 +230,8 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -229,7 +230,8 @@ class ExperimentSessionControllerImpl(DmObjectManager):
fileInfo['uploadId'] = uploadId fileInfo['uploadId'] = uploadId
fileInfo['statusMonitor'] = uploadInfo fileInfo['statusMonitor'] = uploadInfo
fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', []) fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', [])
fileProcessingManager.processFile(fileInfo) fileInfo['owner'] = uploadId
fileProcessingManager.queueFile(fileInfo)
nProcessedFiles += 1 nProcessedFiles += 1
except Exception, ex: except Exception, ex:
self.logger.error('Processing error: %s', ex) self.logger.error('Processing error: %s', ex)
......
...@@ -48,7 +48,6 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -48,7 +48,6 @@ class FileSystemObserver(threading.Thread,Singleton):
self.observedFileMap = {} self.observedFileMap = {}
self.__configure() self.__configure()
self.fileProcessingManager = FileProcessingManager.getInstance() self.fileProcessingManager = FileProcessingManager.getInstance()
self.nProcessedFilesDict = {}
self.logger.debug('Initialization complete') self.logger.debug('Initialization complete')
finally: finally:
FileSystemObserver.__instanceLock.release() FileSystemObserver.__instanceLock.release()
...@@ -111,6 +110,7 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -111,6 +110,7 @@ class FileSystemObserver(threading.Thread,Singleton):
observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory) observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory)
observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys() observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys()
observedFile['statusMonitor'] = daqInfo observedFile['statusMonitor'] = daqInfo
observedFile['owner'] = daqId
self.observedFileMap[filePath] = observedFile self.observedFileMap[filePath] = observedFile
self.logger.debug('New observed file: %s (daq id: %s)' % (filePath, daqId)) self.logger.debug('New observed file: %s (daq id: %s)' % (filePath, daqId))
daqInfo.fileAdded(filePath) daqInfo.fileAdded(filePath)
...@@ -126,17 +126,20 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -126,17 +126,20 @@ class FileSystemObserver(threading.Thread,Singleton):
# more files should be added for processing, so we need to # more files should be added for processing, so we need to
# update all daq infos before going over observed files # update all daq infos before going over observed files
DaqTracker.getInstance().updateDaqInfos() DaqTracker.getInstance().updateDaqInfos()
nObservedFiles = len(self.observedFileMap)
self.logger.debug('There are %s observed files waiting to be processed.' % (nObservedFiles))
scheduledDict = {}
skippedDict = {}
for (filePath,observedFile) in self.observedFileMap.items(): for (filePath,observedFile) in self.observedFileMap.items():
daqId = observedFile['daqInfo']['id'] daqId = observedFile['daqInfo']['id']
daqInfo = DaqTracker.getInstance().getDaqInfo(daqId) daqInfo = DaqTracker.getInstance().getDaqInfo(daqId)
nProcessedFiles = self.nProcessedFilesDict.get(daqId, 0)
nCompletedFiles = daqInfo.get('nCompletedFiles', 0) nCompletedFiles = daqInfo.get('nCompletedFiles', 0)
nWaitingFiles = nProcessedFiles - nCompletedFiles nQueuedFiles = self.fileProcessingManager.getNumberOfQueuedFilesByOwner(daqId)
if nQueuedFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
# We do not need to add more files for processing for this DAQ # We do not need to add more files for processing for this DAQ
#self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nWaitingFiles, daqInfo['id'])) #self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nQueuedFiles, daqId))
skippedDict[daqId] = skippedDict.get(daqId,0)+1
continue continue
timestamp = observedFile.get('lastUpdateTime') timestamp = observedFile.get('lastUpdateTime')
...@@ -144,7 +147,13 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -144,7 +147,13 @@ class FileSystemObserver(threading.Thread,Singleton):
if deltaT > self.minFileProcessingDelayInSeconds: if deltaT > self.minFileProcessingDelayInSeconds:
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT)) self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
filePathsForProcessing.append(filePath) filePathsForProcessing.append(filePath)
self.nProcessedFilesDict[daqId] = nProcessedFiles+1 scheduledDict[daqId] = scheduledDict.get(daqId,0)+1
else:
skippedDict[daqId] = skippedDict.get(daqId,0)+1
for (daqId,nSkipped) in skippedDict.items():
self.logger.debug('Skipped for procesing %s observed files for DAQ id %s.' % (nSkipped, daqId))
for (daqId,nScheduled) in scheduledDict.items():
self.logger.debug('Scheduled for processing %s observed files for DAQ id %s.' % (nScheduled, daqId))
return filePathsForProcessing return filePathsForProcessing
@ThreadingUtility.synchronize @ThreadingUtility.synchronize
...@@ -153,7 +162,9 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -153,7 +162,9 @@ class FileSystemObserver(threading.Thread,Singleton):
observedFile = self.observedFileMap.get(filePath) observedFile = self.observedFileMap.get(filePath)
if observedFile is not None: if observedFile is not None:
del self.observedFileMap[filePath] del self.observedFileMap[filePath]
self.fileProcessingManager.processFile(observedFile) self.fileProcessingManager.queueFile(observedFile)
else:
self.logger.warn('No observed file found for path %s' % filePath)
@ThreadingUtility.synchronize @ThreadingUtility.synchronize
def start(self): def start(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment