From 038b8fb02e15b41aa0c9de95d8b9e85701ecf7e2 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Tue, 14 Apr 2015 15:18:30 +0000 Subject: [PATCH] convert file system observer to use threading utility synchronization --- .../service/impl/fileSystemObserver.py | 139 +++++++----------- 1 file changed, 56 insertions(+), 83 deletions(-) diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index e6f8ed2e..be8245f6 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -9,6 +9,7 @@ from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.configurationManager import ConfigurationManager from dm.common.objects.observedFile import ObservedFile from dm.common.utility.singleton import Singleton +from dm.common.utility.threadingUtility import ThreadingUtility from dmFileSystemEventHandler import DmFileSystemEventHandler from fileProcessingManager import FileProcessingManager @@ -54,75 +55,57 @@ class FileSystemObserver(threading.Thread,Singleton): self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY)) self.fileSystemTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY)) + @ThreadingUtility.synchronize def startObservingPath(self, daqPath, experiment): - self.lock.acquire() - try: - self.logger.debug('Starting observer for %s' % daqPath) - eventHandler = DmFileSystemEventHandler(self, daqPath, experiment) - observedWatch = self.observer.schedule(eventHandler, daqPath, recursive=True) - self.observedWatchDict[daqPath] = observedWatch - finally: - self.lock.release() + self.logger.debug('Starting observer for %s' % daqPath) + eventHandler = DmFileSystemEventHandler(self, daqPath, experiment) + observedWatch = self.observer.schedule(eventHandler, daqPath, recursive=True) + self.observedWatchDict[daqPath] = observedWatch + @ThreadingUtility.synchronize def stopObservingPath(self, daqPath, experiment): - self.lock.acquire() - try: - observedWatch = self.observedWatchDict.get(daqPath) - if observedWatch: - self.logger.debug('Stopping observer for %s' % daqPath) - self.observer.unschedule(observedWatch) - del self.observedWatchDict[daqPath] - else: - self.logger.debug('Observer for %s is not active' % daqPath) - finally: - self.lock.release() + observedWatch = self.observedWatchDict.get(daqPath) + if observedWatch: + self.logger.debug('Stopping observer for %s' % daqPath) + self.observer.unschedule(observedWatch) + del self.observedWatchDict[daqPath] + else: + self.logger.debug('Observer for %s is not active' % daqPath) + @ThreadingUtility.synchronize def observedFileUpdated(self, filePath, daqPath, experiment): - self.lock.acquire() - try: - observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, daqPath=daqPath, experiment=experiment)) - observedFile.setLastUpdatedTimestampToNow() - self.observedFileMap[filePath] = observedFile - self.logger.debug('Observed file updated: %s', observedFile) - finally: - self.lock.release() + observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, daqPath=daqPath, experiment=experiment)) + observedFile.setLastUpdatedTimestampToNow() + self.observedFileMap[filePath] = observedFile + self.logger.debug('Observed file updated: %s', observedFile) + @ThreadingUtility.synchronize def checkObservedFilesForProcessing(self): - self.lock.acquire() - try: - now = time.time() - filePathsForProcessing = [] - for (filePath,observedFile) in self.observedFileMap.items(): - timestamp = observedFile.get('lastUpdateTimestamp') - deltaT = now - timestamp - if deltaT > self.minFileProcessingDelayInSeconds: - self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT)) - filePathsForProcessing.append(filePath) - return filePathsForProcessing - finally: - self.lock.release() + now = time.time() + filePathsForProcessing = [] + for (filePath,observedFile) in self.observedFileMap.items(): + timestamp = observedFile.get('lastUpdateTimestamp') + deltaT = now - timestamp + if deltaT > self.minFileProcessingDelayInSeconds: + self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT)) + filePathsForProcessing.append(filePath) + return filePathsForProcessing + @ThreadingUtility.synchronize def processObservedFile(self, filePath): - self.lock.acquire() - try: - self.logger.debug('Processing file %s' % filePath) - observedFile = self.observedFileMap.get(filePath) - if observedFile is not None: - del self.observedFileMap[filePath] - self.fileProcessingManager.processObservedFile(observedFile) - finally: - self.lock.release() + self.logger.debug('Processing file %s' % filePath) + observedFile = self.observedFileMap.get(filePath) + if observedFile is not None: + del self.observedFileMap[filePath] + self.fileProcessingManager.processObservedFile(observedFile) + @ThreadingUtility.synchronize def start(self): - self.lock.acquire() - try: - self.logger.debug('Starting file observer thread') - threading.Thread.start(self) + self.logger.debug('Starting file observer thread') + threading.Thread.start(self) - self.logger.debug('Starting watchdog observer') - self.observer.start() - finally: - self.lock.release() + self.logger.debug('Starting watchdog observer') + self.observer.start() def run(self): self.logger.debug('Starting thread: %s' % self.getName()) @@ -140,36 +123,26 @@ class FileSystemObserver(threading.Thread,Singleton): self.logger.exception(ex) self.eventFlag.wait(timeout=self.fileSystemTimeoutInSeconds) + @ThreadingUtility.synchronize def stop(self): - self.lock.acquire() - try: - self.logger.debug('Stopping watchdog observer') - self.observer.stop() - self.observer.join() - - self.logger.debug('Stopping file observer thread') - self.exitFlag = True - self.eventFlag.set() - self.logger.debug('Event is set, joining thread') - threading.Thread.join(self) - self.logger.debug('Module stopped') - finally: - self.lock.release() - - + self.logger.debug('Stopping watchdog observer') + self.observer.stop() + self.observer.join() + + self.logger.debug('Stopping file observer thread') + self.exitFlag = True + self.eventFlag.set() + self.logger.debug('Event is set, joining thread') + threading.Thread.join(self) + self.logger.debug('Module stopped') + + @ThreadingUtility.synchronize def setEvent(self): - self.lock.acquire() - try: - self.eventFlag.set() - finally: - self.lock.release() + self.eventFlag.set() + @ThreadingUtility.synchronize def clearEvent(self): - self.lock.acquire() - try: - self.eventFlag.clear() - finally: - self.lock.release() + self.eventFlag.clear() #################################################################### # Testing -- GitLab