Skip to content
Snippets Groups Projects
Commit 038b8fb0 authored by sveseli's avatar sveseli
Browse files

convert file system observer to use threading utility synchronization

parent 9d639567
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,7 @@ from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.singleton import Singleton
from dm.common.utility.threadingUtility import ThreadingUtility
from dmFileSystemEventHandler import DmFileSystemEventHandler
from fileProcessingManager import FileProcessingManager
......@@ -54,75 +55,57 @@ class FileSystemObserver(threading.Thread,Singleton):
self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY))
self.fileSystemTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
@ThreadingUtility.synchronize
def startObservingPath(self, daqPath, experiment):
self.lock.acquire()
try:
self.logger.debug('Starting observer for %s' % daqPath)
eventHandler = DmFileSystemEventHandler(self, daqPath, experiment)
observedWatch = self.observer.schedule(eventHandler, daqPath, recursive=True)
self.observedWatchDict[daqPath] = observedWatch
finally:
self.lock.release()
self.logger.debug('Starting observer for %s' % daqPath)
eventHandler = DmFileSystemEventHandler(self, daqPath, experiment)
observedWatch = self.observer.schedule(eventHandler, daqPath, recursive=True)
self.observedWatchDict[daqPath] = observedWatch
@ThreadingUtility.synchronize
def stopObservingPath(self, daqPath, experiment):
self.lock.acquire()
try:
observedWatch = self.observedWatchDict.get(daqPath)
if observedWatch:
self.logger.debug('Stopping observer for %s' % daqPath)
self.observer.unschedule(observedWatch)
del self.observedWatchDict[daqPath]
else:
self.logger.debug('Observer for %s is not active' % daqPath)
finally:
self.lock.release()
observedWatch = self.observedWatchDict.get(daqPath)
if observedWatch:
self.logger.debug('Stopping observer for %s' % daqPath)
self.observer.unschedule(observedWatch)
del self.observedWatchDict[daqPath]
else:
self.logger.debug('Observer for %s is not active' % daqPath)
@ThreadingUtility.synchronize
def observedFileUpdated(self, filePath, daqPath, experiment):
self.lock.acquire()
try:
observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, daqPath=daqPath, experiment=experiment))
observedFile.setLastUpdatedTimestampToNow()
self.observedFileMap[filePath] = observedFile
self.logger.debug('Observed file updated: %s', observedFile)
finally:
self.lock.release()
observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, daqPath=daqPath, experiment=experiment))
observedFile.setLastUpdatedTimestampToNow()
self.observedFileMap[filePath] = observedFile
self.logger.debug('Observed file updated: %s', observedFile)
@ThreadingUtility.synchronize
def checkObservedFilesForProcessing(self):
self.lock.acquire()
try:
now = time.time()
filePathsForProcessing = []
for (filePath,observedFile) in self.observedFileMap.items():
timestamp = observedFile.get('lastUpdateTimestamp')
deltaT = now - timestamp
if deltaT > self.minFileProcessingDelayInSeconds:
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
filePathsForProcessing.append(filePath)
return filePathsForProcessing
finally:
self.lock.release()
now = time.time()
filePathsForProcessing = []
for (filePath,observedFile) in self.observedFileMap.items():
timestamp = observedFile.get('lastUpdateTimestamp')
deltaT = now - timestamp
if deltaT > self.minFileProcessingDelayInSeconds:
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
filePathsForProcessing.append(filePath)
return filePathsForProcessing
@ThreadingUtility.synchronize
def processObservedFile(self, filePath):
self.lock.acquire()
try:
self.logger.debug('Processing file %s' % filePath)
observedFile = self.observedFileMap.get(filePath)
if observedFile is not None:
del self.observedFileMap[filePath]
self.fileProcessingManager.processObservedFile(observedFile)
finally:
self.lock.release()
self.logger.debug('Processing file %s' % filePath)
observedFile = self.observedFileMap.get(filePath)
if observedFile is not None:
del self.observedFileMap[filePath]
self.fileProcessingManager.processObservedFile(observedFile)
@ThreadingUtility.synchronize
def start(self):
self.lock.acquire()
try:
self.logger.debug('Starting file observer thread')
threading.Thread.start(self)
self.logger.debug('Starting file observer thread')
threading.Thread.start(self)
self.logger.debug('Starting watchdog observer')
self.observer.start()
finally:
self.lock.release()
self.logger.debug('Starting watchdog observer')
self.observer.start()
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
......@@ -140,36 +123,26 @@ class FileSystemObserver(threading.Thread,Singleton):
self.logger.exception(ex)
self.eventFlag.wait(timeout=self.fileSystemTimeoutInSeconds)
@ThreadingUtility.synchronize
def stop(self):
self.lock.acquire()
try:
self.logger.debug('Stopping watchdog observer')
self.observer.stop()
self.observer.join()
self.logger.debug('Stopping file observer thread')
self.exitFlag = True
self.eventFlag.set()
self.logger.debug('Event is set, joining thread')
threading.Thread.join(self)
self.logger.debug('Module stopped')
finally:
self.lock.release()
self.logger.debug('Stopping watchdog observer')
self.observer.stop()
self.observer.join()
self.logger.debug('Stopping file observer thread')
self.exitFlag = True
self.eventFlag.set()
self.logger.debug('Event is set, joining thread')
threading.Thread.join(self)
self.logger.debug('Module stopped')
@ThreadingUtility.synchronize
def setEvent(self):
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
self.eventFlag.set()
@ThreadingUtility.synchronize
def clearEvent(self):
self.lock.acquire()
try:
self.eventFlag.clear()
finally:
self.lock.release()
self.eventFlag.clear()
####################################################################
# Testing
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment