diff --git a/doc/RELEASE_NOTES.txt b/doc/RELEASE_NOTES.txt index 7a4b673e5892152e7a8288bd8e0007b8a85b9d03..5214a129dc9bf9691be1d8f8a34d611bd7429aa4 100644 --- a/doc/RELEASE_NOTES.txt +++ b/doc/RELEASE_NOTES.txt @@ -1,3 +1,8 @@ +Release 1.2 (04/18/2018) +============================= + +- Modified scheduling algorithm for DAQs/uploads to simplify status monitoring + Release 1.1 (03/10/2017) ============================= diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index 607980c3a13ccd92d5e68cece8913e4d4faef6a8..200be975bbcfee9f7c29252593f9a533f65f1b8f 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -19,6 +19,12 @@ class DaqInfo(DmObject): self.lock = threading.RLock() self.originalKeys = dict.keys() + def getMonitorType(self): + return 'DAQ' + + def getMonitorSignature(self): + return 'DAQ %s' % self.get('id', '') + def fileAdded(self, filePath): self.lock.acquire() try: diff --git a/src/python/dm/common/objects/observedFile.py b/src/python/dm/common/objects/observedFile.py index ffde9efca89516a3bf9d7aebacfc07381dcc9510..1d452ff4d2cab292043254c68fbde3ad36475f92 100755 --- a/src/python/dm/common/objects/observedFile.py +++ b/src/python/dm/common/objects/observedFile.py @@ -26,6 +26,7 @@ class ObservedFile(DmObject): self['experimentName'] = experiment.get('name') self['storageHost'] = experiment.get('storageHost') self['storageDirectory'] = experiment.get('storageDirectory') + def setLastUpdateTimeToNow(self): self['lastUpdateTime'] = time.time() diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index 20c12f4840a97b4b16ddd12d645b462b0be047ab..c57393f80f6700ff546cec36a68c6be9de697411 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -16,6 +16,12 @@ class UploadInfo(DmObject): DmObject.__init__(self, dict) self.lock = threading.RLock() + def getMonitorType(self): + return 'Upload' + + def getMonitorSignature(self): + return 'Upload %s' % self.get('id', '') + def fileProcessed(self, filePath, processingEndTime): self.lock.acquire() try: diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index 43646748dce45e4ba1a25fcd06892918fa421b1c..b55001c2acd65854b8451bc73dc3ad795e5405e3 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -129,9 +129,20 @@ class FileProcessingManager(threading.Thread,Singleton): self.logger.debug('There are %s processing candidates remaining' % (len(filePathsDict))) return checkedFilePathsDict - def processFile(self, fileInfo): - self.fileProcessingQueue.push(fileInfo) - self.eventFlag.set() + def queueFile(self, fileInfo, processingWaitTime=0): + owner = fileInfo.get('owner') + self.fileProcessingQueue.push(fileInfo, itemProcessingWaitTime=processingWaitTime, owner=owner) + if not processingWaitTime: + self.eventFlag.set() + + def getQueuedFile(self): + return self.fileProcessingQueue.pop() + + def getNumberOfQueuedFilesByOwner(self, owner): + return self.fileProcessingQueue.getQueuedItemCountByOwner(owner) + + def getNumberOfQueuedFiles(self): + return self.fileProcessingQueue.getLength() def appendFileProcessor(self, fileProcessor): key = fileProcessor.__class__.__name__ @@ -146,7 +157,7 @@ class FileProcessingManager(threading.Thread,Singleton): self.logger.debug('Starting file processing threads') for i in range(0, self.nProcessingThreads): tName = 'FileProcessingThread-%s' % i - t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue) + t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList) t.start() self.fileProcessingThreadDict[tName] = t finally: diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index 8ca4c999f5988a9d6fe6feb6440c57dfa65d3004..6d598cc164ed5be5020669ce6b671f11500c0f9d 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -10,7 +10,7 @@ class FileProcessingThread(threading.Thread): THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0 - def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue): + def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList): threading.Thread.__init__(self) self.setName(name) @@ -18,7 +18,6 @@ class FileProcessingThread(threading.Thread): self.fileProcessingManager = fileProcessingManager self.fileProcessorDict = fileProcessorDict self.fileProcessorKeyList = fileProcessorKeyList - self.fileProcessingQueue = fileProcessingQueue self.logger = LoggingManager.getInstance().getLogger(name) def processFile(self, fileInfo): @@ -32,7 +31,7 @@ class FileProcessingThread(threading.Thread): try: statusMonitor = fileInfo.get('statusMonitor') - statusMonitorId = '' + statusMonitorSignature = '' if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING: self.logger.debug('File %s processing is cancelled' % (filePath)) endProcessingTime = time.time() @@ -40,8 +39,8 @@ class FileProcessingThread(threading.Thread): statusMonitor.updateStatus() return if statusMonitor: - statusMonitorId = statusMonitor.get('id', '') - self.logger.debug('Starting to process file %s (upload or DAQ id: %s)' % (filePath, statusMonitorId)) + statusMonitorSignature = statusMonitor.getMonitorSignature() + self.logger.debug('%s: starting to process file %s)' % (statusMonitorSignature, filePath)) startProcessingTime = fileInfo.get('startProcessingTime', time.time()) fileInfo['startProcessingTime'] = startProcessingTime nProcessors = len(self.fileProcessorKeyList) @@ -68,12 +67,12 @@ class FileProcessingThread(threading.Thread): self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) if processorNumber == nProcessors: endProcessingTime = time.time() - self.logger.debug('File %s processing is complete (upload or DAQ id: %s)' % (filePath, statusMonitorId)) + self.logger.debug('%s: file %s processing is complete' % (statusMonitorSignature, filePath)) if statusMonitor: statusMonitor.fileProcessed(filePath, endProcessingTime) statusMonitor.updateStatus() nProcessedFiles = statusMonitor.get('nProcessedFiles', 0) - self.logger.debug('Upload or DAQ id %s has processed %s files so far)' % (statusMonitorId, nProcessedFiles)) + self.logger.debug('%s: processed %s files so far)' % (statusMonitorSignature, nProcessedFiles)) except Exception, ex: self.logger.exception(ex) processingError = '%s processing error: %s' % (processorName, str(ex)) @@ -104,7 +103,7 @@ class FileProcessingThread(threading.Thread): else: retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() self.logger.debug('%s may retry processing file %s after at least %s seconds' % (processorName, filePath, retryWaitPeriod)) - self.fileProcessingQueue.push(fileInfo, retryWaitPeriod) + self.fileProcessingManager.queueFile(fileInfo, retryWaitPeriod) # Do not process this file further until # this plugin is done break @@ -122,10 +121,10 @@ class FileProcessingThread(threading.Thread): while True: try: - fileInfo = self.fileProcessingQueue.pop() + fileInfo = self.fileProcessingManager.getQueuedFile() if fileInfo is None: break - self.logger.debug('Processing queue depth after pop: %s', self.fileProcessingQueue.getLength()) + self.logger.debug('Processing queue depth after pop: %s', self.fileProcessingManager.getNumberOfQueuedFiles()) self.processFile(fileInfo) except Exception, ex: self.logger.exception(ex) diff --git a/src/python/dm/common/utility/timeBasedProcessingQueue.py b/src/python/dm/common/utility/timeBasedProcessingQueue.py index 3850638aaba1f8cb8179877b69b07925c0103aaa..ac0e917e680b295f4975b5fb9fbb4e55d6fb9931 100755 --- a/src/python/dm/common/utility/timeBasedProcessingQueue.py +++ b/src/python/dm/common/utility/timeBasedProcessingQueue.py @@ -12,8 +12,9 @@ class TimeBasedProcessingQueue: self.lock = threading.RLock() self.queue = [] self.itemPopTimeList = [] + self.queuedItemCountMapByOwner = {} - def push(self, item, itemProcessingWaitTime=0): + def push(self, item, itemProcessingWaitTime=0, owner=None): self.lock.acquire() try: earliestPopTime = time.time() + itemProcessingWaitTime @@ -23,7 +24,9 @@ class TimeBasedProcessingQueue: break popIndex += 1 self.itemPopTimeList.insert(popIndex, earliestPopTime) - self.queue.insert(popIndex,item) + self.queue.insert(popIndex,(item,owner)) + if owner: + self.queuedItemCountMapByOwner[owner] = self.queuedItemCountMapByOwner.get(owner, 0) + 1 finally: self.lock.release() @@ -35,8 +38,12 @@ class TimeBasedProcessingQueue: if len(self.queue): if self.itemPopTimeList[0] <= time.time(): del self.itemPopTimeList[0] - item = self.queue[0] + (item,owner) = self.queue[0] del self.queue[0] + if owner: + self.queuedItemCountMapByOwner[owner] = self.queuedItemCountMapByOwner.get(owner) - 1 + if self.queuedItemCountMapByOwner[owner] <= 0: + del self.queuedItemCountMapByOwner[owner] return item finally: self.lock.release() @@ -47,6 +54,11 @@ class TimeBasedProcessingQueue: def isEmpty(self): return len(self.queue) == 0 + def getQueuedItemCountByOwner(self, owner): + if owner: + return self.queuedItemCountMapByOwner.get(owner, 0) + return 0 + #################################################################### # Testing diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index 9d32435f91b3dc88a69f7feabd1bbae9563caeaa..e7e032040d2fd2ed65658c7a6f73ef196367ca08 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -216,11 +216,12 @@ class ExperimentSessionControllerImpl(DmObjectManager): self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles)) return nCompletedFiles = uploadInfo.get('nCompletedFiles', 0) - nWaitingFiles = nProcessedFiles - nCompletedFiles - if nWaitingFiles < self.UPLOAD_CHUNK_SIZE_IN_FILES: + nQueuedFiles = fileProcessingManager.getNumberOfQueuedFilesByOwner(uploadId) + + if nQueuedFiles < self.UPLOAD_CHUNK_SIZE_IN_FILES: # We need to add more files for upload break - self.logger.debug('Upload %s has %s files waiting for upload, will not add any more for %s seconds' % (uploadId, nWaitingFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)) + self.logger.debug('Upload %s has %s completed files, %s queued files waiting for upload, will not add any more for %s seconds; upload timer has processed %s out of %s files' % (uploadId, nCompletedFiles, nQueuedFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS, nProcessedFiles, nFiles)) time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS) fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory) @@ -229,7 +230,8 @@ class ExperimentSessionControllerImpl(DmObjectManager): fileInfo['uploadId'] = uploadId fileInfo['statusMonitor'] = uploadInfo fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', []) - fileProcessingManager.processFile(fileInfo) + fileInfo['owner'] = uploadId + fileProcessingManager.queueFile(fileInfo) nProcessedFiles += 1 except Exception, ex: self.logger.error('Processing error: %s', ex) diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index 279bdf83f13678cb73cced2ccf17bbc1e73dba98..1cb0f9435c8bcbe3fae29e8245c466c4a8d04aa2 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -48,7 +48,6 @@ class FileSystemObserver(threading.Thread,Singleton): self.observedFileMap = {} self.__configure() self.fileProcessingManager = FileProcessingManager.getInstance() - self.nProcessedFilesDict = {} self.logger.debug('Initialization complete') finally: FileSystemObserver.__instanceLock.release() @@ -111,6 +110,7 @@ class FileSystemObserver(threading.Thread,Singleton): observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory) observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys() observedFile['statusMonitor'] = daqInfo + observedFile['owner'] = daqId self.observedFileMap[filePath] = observedFile self.logger.debug('New observed file: %s (daq id: %s)' % (filePath, daqId)) daqInfo.fileAdded(filePath) @@ -126,17 +126,20 @@ class FileSystemObserver(threading.Thread,Singleton): # more files should be added for processing, so we need to # update all daq infos before going over observed files DaqTracker.getInstance().updateDaqInfos() + nObservedFiles = len(self.observedFileMap) + self.logger.debug('There are %s observed files waiting to be processed.' % (nObservedFiles)) + scheduledDict = {} + skippedDict = {} for (filePath,observedFile) in self.observedFileMap.items(): daqId = observedFile['daqInfo']['id'] daqInfo = DaqTracker.getInstance().getDaqInfo(daqId) - nProcessedFiles = self.nProcessedFilesDict.get(daqId, 0) nCompletedFiles = daqInfo.get('nCompletedFiles', 0) - nWaitingFiles = nProcessedFiles - nCompletedFiles - - if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES: + nQueuedFiles = self.fileProcessingManager.getNumberOfQueuedFilesByOwner(daqId) + if nQueuedFiles >= self.DAQ_CHUNK_SIZE_IN_FILES: # We do not need to add more files for processing for this DAQ - #self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nWaitingFiles, daqInfo['id'])) + #self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nQueuedFiles, daqId)) + skippedDict[daqId] = skippedDict.get(daqId,0)+1 continue timestamp = observedFile.get('lastUpdateTime') @@ -144,7 +147,13 @@ class FileSystemObserver(threading.Thread,Singleton): if deltaT > self.minFileProcessingDelayInSeconds: self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT)) filePathsForProcessing.append(filePath) - self.nProcessedFilesDict[daqId] = nProcessedFiles+1 + scheduledDict[daqId] = scheduledDict.get(daqId,0)+1 + else: + skippedDict[daqId] = skippedDict.get(daqId,0)+1 + for (daqId,nSkipped) in skippedDict.items(): + self.logger.debug('Skipped for procesing %s observed files for DAQ id %s.' % (nSkipped, daqId)) + for (daqId,nScheduled) in scheduledDict.items(): + self.logger.debug('Scheduled for processing %s observed files for DAQ id %s.' % (nScheduled, daqId)) return filePathsForProcessing @ThreadingUtility.synchronize @@ -153,7 +162,9 @@ class FileSystemObserver(threading.Thread,Singleton): observedFile = self.observedFileMap.get(filePath) if observedFile is not None: del self.observedFileMap[filePath] - self.fileProcessingManager.processFile(observedFile) + self.fileProcessingManager.queueFile(observedFile) + else: + self.logger.warn('No observed file found for path %s' % filePath) @ThreadingUtility.synchronize def start(self):