Skip to content
Snippets Groups Projects
fileSystemObserver.py 6.69 KiB
Newer Older
#!/usr/bin/env python

import threading
import time

from watchdog.observers.polling import PollingObserver

from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.singleton import Singleton

from dmFileSystemEventHandler import DmFileSystemEventHandler
from fileProcessingManager import FileProcessingManager

class FileSystemObserver(threading.Thread,Singleton):

    CONFIG_SECTION_NAME = 'FileSystemObserver'
    MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds'
    FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds'

    # Singleton.
    __instanceLock = threading.RLock()
    __instance = None

    def __init__(self):
        FileSystemObserver.__instanceLock.acquire()
        try:
            if FileSystemObserver.__instance:
                return
            FileSystemObserver.__instance = self
            threading.Thread.__init__(self)
            self.setName('FileSystemObserverThread')
            self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)

            self.logger.debug('Initializing')
            self.lock = threading.RLock()
            self.eventFlag = threading.Event()
            self.exitFlag = False

            self.observedFileMap = {}
            self.observer = PollingObserver()
            self.observedWatchDict = {}
            self.__configure()
            self.fileProcessingManager = FileProcessingManager.getInstance()
            self.logger.debug('Initialization complete')
        finally:
            FileSystemObserver.__instanceLock.release()

    def __configure(self):
        cm = ConfigurationManager.getInstance()
        configItems = cm.getConfigItems(FileSystemObserver.CONFIG_SECTION_NAME)
        self.logger.debug('Got config items: %s' % configItems)
        self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY))
        self.fileSystemTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))

    def startObservingPath(self, daqPath, experiment):
            self.logger.debug('Starting observer for %s' % daqPath)
            eventHandler = DmFileSystemEventHandler(self, daqPath, experiment)
            observedWatch = self.observer.schedule(eventHandler, daqPath, recursive=True)
            self.observedWatchDict[daqPath] = observedWatch
    def stopObservingPath(self, daqPath, experiment):
            observedWatch = self.observedWatchDict.get(daqPath)
                self.logger.debug('Stopping observer for %s' % daqPath)
                self.observer.unschedule(observedWatch)
                self.logger.debug('Observer for %s is not active' % daqPath)
    def observedFileUpdated(self, filePath, daqPath, experiment):
            observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, daqPath=daqPath, experiment=experiment))
            observedFile.setLastUpdatedTimestampToNow()
            self.observedFileMap[filePath] = observedFile
            self.logger.debug('Observed file updated: %s', observedFile)
        finally:
            self.lock.release()
       
    def checkObservedFilesForProcessing(self):
        self.lock.acquire()
        try:
            now = time.time()
            filePathsForProcessing = []
            for (filePath,observedFile) in self.observedFileMap.items():
                timestamp = observedFile.get('lastUpdateTimestamp')
                deltaT = now - timestamp
                if deltaT > self.minFileProcessingDelayInSeconds:
                    self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
                    filePathsForProcessing.append(filePath)
            return filePathsForProcessing
    def processObservedFile(self, filePath):
            self.logger.debug('Processing file %s' % filePath)
            observedFile = self.observedFileMap.get(filePath)
                self.fileProcessingManager.processObservedFile(observedFile)
        finally:
            self.lock.release()
     
    def start(self):
        self.lock.acquire()
        try:
            self.logger.debug('Starting file observer thread')
            threading.Thread.start(self)

            self.logger.debug('Starting watchdog observer')
            self.observer.start()
        finally:
             self.lock.release()

    def run(self):
        self.logger.debug('Starting thread: %s' % self.getName())
        while True:
            if self.exitFlag:
                self.logger.debug('Exit flag set, %s done' % self.getName())
                break
            try:
                self.logger.debug('Checking observed files')

                filePathsForProcessing = self.checkObservedFilesForProcessing()
                for filePath in filePathsForProcessing:
                    self.processObservedFile(filePath)
            except Exception, ex:
                self.logger.exception(ex)
            self.eventFlag.wait(timeout=self.fileSystemTimeoutInSeconds)

    def stop(self):
        self.lock.acquire()
        try:
            self.logger.debug('Stopping watchdog observer')
            self.observer.stop()
            self.observer.join()

            self.logger.debug('Stopping file observer thread')
            self.exitFlag = True
            self.eventFlag.set()
            self.logger.debug('Event is set, joining thread')
            threading.Thread.join(self)
            self.logger.debug('Module stopped')
        finally:
             self.lock.release()


    def setEvent(self):
        self.lock.acquire()
        try:
             self.eventFlag.set()
        finally:
            self.lock.release()

    def clearEvent(self):
        self.lock.acquire()
        try:
            self.eventFlag.clear()
        finally:
            self.lock.release()

####################################################################
# Testing

if __name__ == '__main__':
    fp = FileSystemObserver.getInstance()
    fp.start()
    time.sleep(30)
    fp.stop()