Skip to content
Snippets Groups Projects
Commit f31f93fc authored by sveseli's avatar sveseli
Browse files

refactor observer to use agents for monitoring file systems

parent 68ab5154
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
from dm.common.utility.loggingManager import LoggingManager
class FileSystemObserverAgent:
def __init__(self):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.fileSystemObserver = None
def setFileSystemObserver(self, fileSystemObserver):
self.fileSystemObserver = fileSystemObserver
def startObservingPath(self, dataDirectory, experiment):
pass
def stopObservingPath(self, dataDirectory, experiment):
pass
def start(self):
pass
def stop(self):
pass
#!/usr/bin/env python
from threading import Timer
from fileSystemObserverAgent import FileSystemObserverAgent
from dm.common.utility.ftpUtility import FtpUtility
class FtpFileSystemObserverAgent(FileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 5
def __init__(self, host, port, username=None, password=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
FileSystemObserverAgent.__init__(self)
self.host = host
self.port = port
self.username = username
self.password = password
self.pollingPeriod = pollingPeriod
self.observedDirDict = {}
self.isDone = False
def getFiles(self, dataDirectory):
ftpUtility = FtpUtility(self.host, self.port, self.username, self.password)
return ftpUtility.getFiles(dataDirectory, {})
def updateFile(self, filePath, dataDirectory, experiment):
if self.fileSystemObserver:
self.logger.debug('Processing file path: %s' % filePath)
self.fileSystemObserver.fileUpdated(filePath, dataDirectory, experiment)
def processFiles(self, fileDict, oldFileDict, dataDirectory, experiment):
for filePath in fileDict.keys():
if not oldFileDict.has_key(filePath):
# new file, must be updated
self.logger.debug('New file path detected: %s' % filePath)
self.updateFile(filePath, dataDirectory, experiment)
else:
# old file, check timestamp
oldFileInfo = oldFileDict.get(filePath)
oldModifyTime = oldFileInfo.get('Modify', '')
fileInfo = fileDict.get(filePath)
modifyTime = fileInfo.get('Modify')
if modifyTime != oldModifyTime:
# file has been modified, need to process it
self.logger.debug('Modified file path detected: %s' % filePath)
self.updateFile(filePath, dataDirectory, experiment)
def pollFileSystem(self, dataDirectory, experiment):
try:
fileDict = self.getFiles(dataDirectory)
observedDirInfo = self.observedDirDict.get(dataDirectory)
if not observedDirInfo:
self.logger.debug('Polling cancelled for directory: %s' % dataDirectory)
return
oldFileDict = observedDirInfo.get('files')
observedDirInfo['files'] = fileDict
self.processFiles(fileDict, oldFileDict, dataDirectory, experiment)
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
except Exception, ex:
self.logger.error('Could not poll directory %s: %s' % (dataDirectory,ex))
def startPollingTimer(self, observedDirInfo, dataDirectory, experiment):
if self.isDone:
return
t = Timer(self.pollingPeriod, self.pollFileSystem, [dataDirectory, experiment])
observedDirInfo['pollTimer'] = t
t.start()
def startObservingPath(self, dataDirectory, experiment):
observedDirInfo = self.observedDirDict.get(dataDirectory)
if observedDirInfo:
self.logger.debug('Observer for %s is already active' % dataDirectory)
return
self.logger.debug('Starting observer for %s' % dataDirectory)
fileDict = self.getFiles(dataDirectory)
observedDirInfo = self.observedDirDict.get(dataDirectory, {})
observedDirInfo['files'] = fileDict
observedDirInfo['experiment'] = experiment
self.observedDirDict[dataDirectory] = observedDirInfo
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
def stopObservingPath(self, dataDirectory, experiment):
observedDirInfo = self.observedDirDict.get(dataDirectory)
if not observedDirInfo:
self.logger.debug('Observer for %s is not active' % dataDirectory)
return
self.logger.debug('Stopping observer for %s' % dataDirectory)
t = observedDirInfo.get('pollTimer')
if t:
t.cancel()
del self.observedDirDict[dataDirectory]
def start(self):
self.logger.debug('Starting ftp observer agent')
def stop(self):
self.logger.debug('Stopping ftp observer agent')
self.isDone = True
for (dataDirectory,observedDirInfo) in self.observedDirDict.items():
experiment = observedDirInfo.get('experiment')
self.stopObservingPath(dataDirectory, experiment)
####################################################################
# Testing
if __name__ == '__main__':
import time
agent = FtpFileSystemObserverAgent('zagreb', 2811)
print 'ORIGINAL FILES: ', agent.getFiles('/tmp/test')
agent.startObservingPath('/tmp/test', 'e1')
time.sleep(100)
agent.stopObservingPath('/tmp/test', 'e1')
#!/usr/bin/env python
from watchdog.observers.polling import PollingObserver
from fileSystemObserverAgent import FileSystemObserverAgent
from dmFileSystemEventHandler import DmFileSystemEventHandler
class WatchdogFileSystemObserverAgent(FileSystemObserverAgent):
def __init__(self):
FileSystemObserverAgent.__init__(self)
self.observer = PollingObserver()
self.observedWatchDict = {}
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Starting observer for %s' % dataDirectory)
eventHandler = DmFileSystemEventHandler(self.fileSystemObserver, dataDirectory, experiment)
observedWatch = self.observer.schedule(eventHandler, dataDirectory, recursive=True)
self.observedWatchDict[dataDirectory] = observedWatch
def stopObservingPath(self, dataDirectory, experiment):
observedWatch = self.observedWatchDict.get(dataDirectory)
if observedWatch:
self.logger.debug('Stopping observer for %s' % dataDirectory)
self.observer.unschedule(observedWatch)
del self.observedWatchDict[dataDirectory]
else:
self.logger.debug('Observer for %s is not active' % dataDirectory)
def start(self):
self.logger.debug('Starting watchdog observer agent')
self.observer.start()
def stop(self):
self.logger.debug('Stopping watchdog observer agent')
self.observer.stop()
self.observer.join()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment