Skip to content
Snippets Groups Projects
fileSystemObserver.py 6.37 KiB
Newer Older
#!/usr/bin/env python

import threading
import time

from watchdog.observers.polling import PollingObserver

from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.singleton import Singleton
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.processing.fileProcessingManager import FileProcessingManager

from dmFileSystemEventHandler import DmFileSystemEventHandler

class FileSystemObserver(threading.Thread,Singleton):

    CONFIG_SECTION_NAME = 'FileSystemObserver'
    MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds'
    FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds'

    # Singleton.
    __instanceLock = threading.RLock()
    __instance = None

    def __init__(self):
        FileSystemObserver.__instanceLock.acquire()
        try:
            if FileSystemObserver.__instance:
                return
            FileSystemObserver.__instance = self
            threading.Thread.__init__(self)
            self.setName('FileSystemObserverThread')
            self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)

            self.logger.debug('Initializing')
            self.lock = threading.RLock()
            self.eventFlag = threading.Event()
            self.exitFlag = False

            self.observedFileMap = {}
            self.observer = PollingObserver()
            self.observedWatchDict = {}
            self.__configure()
            self.fileProcessingManager = FileProcessingManager.getInstance()
            self.logger.debug('Initialization complete')
        finally:
            FileSystemObserver.__instanceLock.release()

    def __configure(self):
        cm = ConfigurationManager.getInstance()
        configItems = cm.getConfigItems(FileSystemObserver.CONFIG_SECTION_NAME)
        self.logger.debug('Got config items: %s' % configItems)
        self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY))
sveseli's avatar
sveseli committed
        self.logger.debug('Minimum file processing delay: %s seconds' % self.minFileProcessingDelayInSeconds)
        self.fileSystemEventTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
        self.logger.debug('File system event timeout: %s seconds' % self.fileSystemEventTimeoutInSeconds)
    @ThreadingUtility.synchronize
    def startObservingPath(self, dataDirectory, experiment):
        self.logger.debug('Starting observer for %s' % dataDirectory)
        eventHandler = DmFileSystemEventHandler(self, dataDirectory, experiment)
        observedWatch = self.observer.schedule(eventHandler, dataDirectory, recursive=True)
        self.observedWatchDict[dataDirectory] = observedWatch
    @ThreadingUtility.synchronize
    def stopObservingPath(self, dataDirectory, experiment):
        observedWatch = self.observedWatchDict.get(dataDirectory)
            self.logger.debug('Stopping observer for %s' % dataDirectory)
            self.observer.unschedule(observedWatch)
            del self.observedWatchDict[dataDirectory] 
            self.logger.debug('Observer for %s is not active' % dataDirectory)
    @ThreadingUtility.synchronize
    def fileUpdated(self, filePath, dataDirectory, experiment):
        observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment))
        observedFile.setLastUpdatedTimestampToNow()
        self.observedFileMap[filePath] = observedFile
        self.logger.debug('Observed file updated: %s', observedFile)
    @ThreadingUtility.synchronize
    def checkObservedFilesForProcessing(self):
        now = time.time()
        filePathsForProcessing = []
        for (filePath,observedFile) in self.observedFileMap.items():
            timestamp = observedFile.get('lastUpdateTimestamp')
            deltaT = now - timestamp
            if deltaT > self.minFileProcessingDelayInSeconds:
                self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
                filePathsForProcessing.append(filePath)
        return filePathsForProcessing
    @ThreadingUtility.synchronize
    def processFile(self, filePath):
        self.logger.debug('Processing file %s' % filePath)
        observedFile = self.observedFileMap.get(filePath)
        if observedFile is not None:
            del self.observedFileMap[filePath]
            self.fileProcessingManager.processFile(observedFile)
        self.logger.debug('Starting file observer thread')
        threading.Thread.start(self)
        self.logger.debug('Starting watchdog observer')
        self.observer.start()

    def run(self):
        self.logger.debug('Starting thread: %s' % self.getName())
        while True:
            if self.exitFlag:
                self.logger.debug('Exit flag set, %s done' % self.getName())
                break
            try:
                self.logger.debug('Checking observed files')

                filePathsForProcessing = self.checkObservedFilesForProcessing()
                for filePath in filePathsForProcessing:
                    self.processFile(filePath)
            except Exception, ex:
                self.logger.exception(ex)
sveseli's avatar
sveseli committed
            self.eventFlag.wait(timeout=self.fileSystemEventTimeoutInSeconds)
        self.logger.debug('Stopping watchdog observer')
        self.observer.stop()
        self.observer.join()

        self.logger.debug('Stopping file observer thread')
        self.exitFlag = True
        self.eventFlag.set()
        self.logger.debug('Event is set, joining thread')
        threading.Thread.join(self)
        self.logger.debug('Module stopped')

    @ThreadingUtility.synchronize

####################################################################
# Testing

if __name__ == '__main__':
    fp = FileSystemObserver.getInstance()
    fp.start()
    time.sleep(30)
    fp.stop()