Skip to content
Snippets Groups Projects
pollingFileSystemObserverAgent.py 4.95 KiB
Newer Older
#!/usr/bin/env python

from threading import Timer
from fileSystemObserverAgent import FileSystemObserverAgent
from dm.common.utility.osUtility import OsUtility

class PollingFileSystemObserverAgent(FileSystemObserverAgent):

    DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
    DEFAULT_RETRY_PERIOD_IN_SECONDS = 60

    def __init__(self, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
        FileSystemObserverAgent.__init__(self)
        self.pollingPeriod = pollingPeriod
        self.retryDelay = 0
        self.observedDirDict = {}
        self.isDone = False

    def getFiles(self, dataDirectory):
        self.logger.debug('Retrieving files for directory: %s' % (dataDirectory))
        return OsUtility.findFilesAsDict(dataDirectory, {})

    def updateFile(self, filePath, dataDirectory, experiment):
        if self.fileSystemObserver:
            self.logger.debug('Processing file path: %s' % filePath)
            self.fileSystemObserver.fileUpdated(filePath, dataDirectory, experiment)

    def processFiles(self, fileDict, oldFileDict, dataDirectory, experiment):
        for filePath in fileDict.keys():
            if not oldFileDict.has_key(filePath):
                # new file, must be updated
                self.logger.debug('New file path detected: %s' % filePath)
                self.updateFile(filePath, dataDirectory, experiment)
            else:
                # old file, check timestamp
                oldFileInfo = oldFileDict.get(filePath)
                oldModifyTime = oldFileInfo.get('fileModificationTime', '')
                fileInfo = fileDict.get(filePath)
                modifyTime = fileInfo.get('fileModificationTime')
                if modifyTime != oldModifyTime:
                    # file has been modified, need to process it
                    self.logger.debug('Modified file path detected: %s' % filePath)
                    self.updateFile(filePath, dataDirectory, experiment)


    def pollFileSystem(self, dataDirectory, experiment):
        try:
            observedDirInfo = self.observedDirDict.get(dataDirectory)
            if not observedDirInfo:
                self.logger.debug('Polling cancelled for directory: %s' % dataDirectory)
                return 
            oldFileDict = observedDirInfo.get('files')
            fileDict = self.getFiles(dataDirectory)
            observedDirInfo['files'] = fileDict
            self.processFiles(fileDict, oldFileDict, dataDirectory, experiment)
            self.retryDelay = 0
        except Exception, ex:
            self.logger.error('Could not poll directory %s: %s' % (dataDirectory,ex))
            self.retryDelay = self.DEFAULT_RETRY_PERIOD_IN_SECONDS 
            self.logger.debug('Next polling for directory %s will be delayed by: %s seconds' % (dataDirectory, self.retryDelay))
        self.startPollingTimer(observedDirInfo, dataDirectory, experiment)

    def startPollingTimer(self, observedDirInfo, dataDirectory, experiment):
        if self.isDone:
            return

        delay = self.pollingPeriod + self.retryDelay
        t = Timer(delay, self.pollFileSystem, [dataDirectory, experiment])
        observedDirInfo['pollTimer'] = t
        t.start()

    def startObservingPath(self, dataDirectory, experiment):
        observedDirInfo = self.observedDirDict.get(dataDirectory)
        if observedDirInfo:
            self.logger.debug('Observer for %s is already active' % dataDirectory)
            return
        self.logger.debug('Starting observer for %s' % dataDirectory)
        fileDict = self.getFiles(dataDirectory)
        observedDirInfo = self.observedDirDict.get(dataDirectory, {})
        observedDirInfo['files'] = fileDict
        observedDirInfo['experiment'] = experiment
        self.observedDirDict[dataDirectory] = observedDirInfo 
        self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
        
    def stopObservingPath(self, dataDirectory, experiment):
        observedDirInfo = self.observedDirDict.get(dataDirectory)
        if not observedDirInfo:
            self.logger.debug('Observer for %s is not active' % dataDirectory)
            return

        self.logger.debug('Stopping observer for %s' % dataDirectory)
        t = observedDirInfo.get('pollTimer')
        if t:
            t.cancel()
        del self.observedDirDict[dataDirectory]

    def start(self):
        self.logger.debug('Starting ftp observer agent')

    def stop(self):
        self.logger.debug('Stopping ftp observer agent')
        self.isDone = True
        for (dataDirectory,observedDirInfo)  in self.observedDirDict.items():
            experiment = observedDirInfo.get('experiment')
            self.stopObservingPath(dataDirectory, experiment)

####################################################################
# Testing

if __name__ == '__main__':
    import time
    agent = PollingFileSystemObserverAgent()
    print 'ORIGINAL FILES: ', agent.getFiles('/tmp/test')
    agent.startObservingPath('/tmp/test', 'e1')
    time.sleep(100)
    agent.stopObservingPath('/tmp/test', 'e1')