Skip to content
Snippets Groups Projects
fileSystemObserver.py 10.2 KiB
Newer Older
#!/usr/bin/env python

import threading
import time

from watchdog.observers.polling import PollingObserver

from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.singleton import Singleton
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.processing.fileProcessingManager import FileProcessingManager

from dmFileSystemEventHandler import DmFileSystemEventHandler
from daqTracker import DaqTracker

class FileSystemObserver(threading.Thread,Singleton):

    CONFIG_SECTION_NAME = 'FileSystemObserver'
    MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds'
    FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds'
    FILE_SYSTEM_OBSERVER_AGENT_KEY = 'filesystemobserveragent'

    # Singleton.
    __instanceLock = threading.RLock()
    __instance = None

    def __init__(self):
        FileSystemObserver.__instanceLock.acquire()
        try:
            if FileSystemObserver.__instance:
                return
            FileSystemObserver.__instance = self
            threading.Thread.__init__(self)
            self.setName('FileSystemObserverThread')
            self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)

            self.logger.debug('Initializing')
            self.lock = threading.RLock()
            self.eventFlag = threading.Event()
            self.exitFlag = False

            self.observedFileMap = {}
            self.__configure()
            self.fileProcessingManager = FileProcessingManager.getInstance()
            self.logger.debug('Initialization complete')
        finally:
            FileSystemObserver.__instanceLock.release()

    def __configure(self):
        cm = ConfigurationManager.getInstance()
        configItems = cm.getConfigItems(FileSystemObserver.CONFIG_SECTION_NAME)
        self.logger.debug('Got config items: %s' % configItems)
        self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY))
sveseli's avatar
sveseli committed
        self.logger.debug('Minimum file processing delay: %s seconds' % self.minFileProcessingDelayInSeconds)
        self.fileSystemEventTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
        self.logger.debug('File system event timeout: %s seconds' % self.fileSystemEventTimeoutInSeconds)
        agentClass = cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_OBSERVER_AGENT_KEY)
        (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(agentClass)
        self.logger.debug('Creating file system observer agent instance of class %s' % className)
        self.fileSystemObserverAgent = ObjectUtility.createObjectInstance(moduleName, className, constructor)
        self.fileSystemObserverAgent.setFileSystemObserver(self)

    @ThreadingUtility.synchronize
    def createDirectory(self, dataDirectory):
        self.fileSystemObserverAgent.createDirectory(dataDirectory)

    @ThreadingUtility.synchronize
    def getFiles(self, dataDirectory):
        self.logger.debug('Agent is retrieving files for %s' % dataDirectory)
        return self.fileSystemObserverAgent.getFiles(dataDirectory)

    @ThreadingUtility.synchronize
    def startObservingPath(self, dataDirectory, experiment):
        self.logger.debug('Agent is starting observer for %s' % dataDirectory)
        self.fileSystemObserverAgent.startObservingPath(dataDirectory, experiment)
    @ThreadingUtility.synchronize
    def stopObservingPath(self, dataDirectory, experiment):
        self.logger.debug('Agent is stopping observer for %s' % dataDirectory)
        self.fileSystemObserverAgent.stopObservingPath(dataDirectory, experiment)
    @ThreadingUtility.synchronize
    def fileUpdated(self, filePath, dataDirectory, experiment):
        daqInfo = DaqTracker.getInstance().getDaqInfoByExperimentAndDataDirectory(experiment, dataDirectory)
        experimentName = experiment.get('name')

        # No daq info, ignore
        if not daqInfo:
            self.logger.debug('No daq for data directory %s and experiment %s, file path %s will not be processed' % (dataDirectory, experimentName, experimentfilePath))
        # Do not process hidden files unless requested
        if not ValueUtility.toBoolean(daqInfo.get('processHiddenFiles')):
            fileName = os.path.basename(filePath)
            if fileName.startswith('.'):
                self.logger.debug('File path %s is hidden file, will not process it' % filePath)
                return

        observedFile = self.observedFileMap.get(filePath)
        destDirectory = daqInfo.get('destDirectory')
            observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory)
            observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys()
            observedFile['statusMonitor'] = daqInfo
            observedFile['owner'] = daqId
            self.observedFileMap[filePath] = observedFile
            self.logger.debug('New observed file: %s (daq id: %s)' % (filePath, daqId))
            daqInfo.fileAdded(filePath)
            self.logger.debug('Observed file updated: %s (daq id: %s)' % (filePath, daqId))
    def checkObservedFilesForProcessing(self):
        now = time.time()
        filePathsForProcessing = []
        # We use number of waiting files to determine whether
        # more files should be added for processing, so we need to
        # update all daq infos before going over observed files
        DaqTracker.getInstance().updateDaqInfos()
        nObservedFiles = len(self.observedFileMap)
sveseli's avatar
sveseli committed
        if not nObservedFiles:
            return
        
        self.logger.debug('There are %s observed files waiting to be processed.' % (nObservedFiles))
        scheduledDict = {}
        skippedDict = {}
        for (filePath,observedFile) in self.observedFileMap.items():
sveseli's avatar
sveseli committed
            daqId = observedFile['daqInfo']['id']
            daqInfo = DaqTracker.getInstance().getDaqInfo(daqId)

            nCompletedFiles = daqInfo.get('nCompletedFiles', 0)
            nQueuedFiles = self.fileProcessingManager.getNumberOfQueuedFilesByOwner(daqId)
sveseli's avatar
sveseli committed
            nScheduledFiles = scheduledDict.get(daqId,0)
            #self.logger.debug('There are %s scheduled and %s already queued files for DAQ id %s.' % (nScheduledFiles, nQueuedFiles, daqId))
            if nQueuedFiles+nScheduledFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
                # We do not need to add more files for processing for this DAQ
                #self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nQueuedFiles, daqId))
                skippedDict[daqId] = skippedDict.get(daqId,0)+1
            timestamp = observedFile.get('lastUpdateTime')
            deltaT = now - timestamp
            if deltaT > self.minFileProcessingDelayInSeconds:
                self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
                filePathsForProcessing.append(filePath)
                scheduledDict[daqId] = scheduledDict.get(daqId,0)+1
            else:
                skippedDict[daqId] = skippedDict.get(daqId,0)+1
        for (daqId,nSkipped) in skippedDict.items():
            self.logger.debug('Skipped for procesing %s observed files for DAQ id %s.' % (nSkipped, daqId))
        for (daqId,nScheduled) in scheduledDict.items():
            self.logger.debug('Scheduled for processing %s observed files for DAQ id %s.' % (nScheduled, daqId))
    def processFile(self, filePath):
        self.logger.debug('Processing file %s' % filePath)
        observedFile = self.observedFileMap.get(filePath)
        if observedFile is not None:
            del self.observedFileMap[filePath]
            self.fileProcessingManager.queueFile(observedFile)
        else:
            self.logger.warn('No observed file found for path %s' % filePath)
        self.logger.debug('Starting file observer thread')
        threading.Thread.start(self)
        self.logger.debug('Starting file observer agent')
        self.fileSystemObserverAgent.start()

    def run(self):
        self.logger.debug('Starting thread: %s' % self.getName())
        while True:
            if self.exitFlag:
                self.logger.debug('Exit flag set, %s done' % self.getName())
                break
            try:
                filePathsForProcessing = self.checkObservedFilesForProcessing()
                    self.logger.debug('Will queue %s new files for processing' % (len(filePathsForProcessing)))
                    self.processFile(filePath)
            except Exception, ex:
                self.logger.exception(ex)
sveseli's avatar
sveseli committed
            self.eventFlag.wait(timeout=self.fileSystemEventTimeoutInSeconds)
        self.logger.debug('Stopping file observer agent')
        self.fileSystemObserverAgent.stop()

        self.logger.debug('Stopping file observer thread')
        self.exitFlag = True
        self.eventFlag.set()
        self.logger.debug('Event is set, joining thread')
        threading.Thread.join(self)
        self.logger.debug('Module stopped')

    @ThreadingUtility.synchronize

####################################################################
# Testing

if __name__ == '__main__':
    fp = FileSystemObserver.getInstance()
    fp.start()
    time.sleep(30)
    fp.stop()