diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserverAgent.py new file mode 100644 index 0000000000000000000000000000000000000000..3603f19dfad70bc8b9ceb7ddd1f5725e35428aa0 --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserverAgent.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python + +from dm.common.utility.loggingManager import LoggingManager + +class FileSystemObserverAgent: + + def __init__(self): + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + self.fileSystemObserver = None + + def setFileSystemObserver(self, fileSystemObserver): + self.fileSystemObserver = fileSystemObserver + + def startObservingPath(self, dataDirectory, experiment): + pass + + def stopObservingPath(self, dataDirectory, experiment): + pass + + def start(self): + pass + + def stop(self): + pass + diff --git a/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py new file mode 100755 index 0000000000000000000000000000000000000000..d7580914ff1a39e4e0a6574830c17cdbd4e3ba87 --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python + +from threading import Timer +from fileSystemObserverAgent import FileSystemObserverAgent +from dm.common.utility.ftpUtility import FtpUtility + +class FtpFileSystemObserverAgent(FileSystemObserverAgent): + + DEFAULT_POLLING_PERIOD_IN_SECONDS = 5 + + def __init__(self, host, port, username=None, password=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS): + FileSystemObserverAgent.__init__(self) + self.host = host + self.port = port + self.username = username + self.password = password + self.pollingPeriod = pollingPeriod + self.observedDirDict = {} + self.isDone = False + + def getFiles(self, dataDirectory): + ftpUtility = FtpUtility(self.host, self.port, self.username, self.password) + return ftpUtility.getFiles(dataDirectory, {}) + + def updateFile(self, filePath, dataDirectory, experiment): + if self.fileSystemObserver: + self.logger.debug('Processing file path: %s' % filePath) + self.fileSystemObserver.fileUpdated(filePath, dataDirectory, experiment) + + def processFiles(self, fileDict, oldFileDict, dataDirectory, experiment): + for filePath in fileDict.keys(): + if not oldFileDict.has_key(filePath): + # new file, must be updated + self.logger.debug('New file path detected: %s' % filePath) + self.updateFile(filePath, dataDirectory, experiment) + else: + # old file, check timestamp + oldFileInfo = oldFileDict.get(filePath) + oldModifyTime = oldFileInfo.get('Modify', '') + fileInfo = fileDict.get(filePath) + modifyTime = fileInfo.get('Modify') + if modifyTime != oldModifyTime: + # file has been modified, need to process it + self.logger.debug('Modified file path detected: %s' % filePath) + self.updateFile(filePath, dataDirectory, experiment) + + + def pollFileSystem(self, dataDirectory, experiment): + try: + fileDict = self.getFiles(dataDirectory) + observedDirInfo = self.observedDirDict.get(dataDirectory) + if not observedDirInfo: + self.logger.debug('Polling cancelled for directory: %s' % dataDirectory) + return + oldFileDict = observedDirInfo.get('files') + observedDirInfo['files'] = fileDict + self.processFiles(fileDict, oldFileDict, dataDirectory, experiment) + self.startPollingTimer(observedDirInfo, dataDirectory, experiment) + except Exception, ex: + self.logger.error('Could not poll directory %s: %s' % (dataDirectory,ex)) + + def startPollingTimer(self, observedDirInfo, dataDirectory, experiment): + if self.isDone: + return + + t = Timer(self.pollingPeriod, self.pollFileSystem, [dataDirectory, experiment]) + observedDirInfo['pollTimer'] = t + t.start() + + def startObservingPath(self, dataDirectory, experiment): + observedDirInfo = self.observedDirDict.get(dataDirectory) + if observedDirInfo: + self.logger.debug('Observer for %s is already active' % dataDirectory) + return + self.logger.debug('Starting observer for %s' % dataDirectory) + fileDict = self.getFiles(dataDirectory) + observedDirInfo = self.observedDirDict.get(dataDirectory, {}) + observedDirInfo['files'] = fileDict + observedDirInfo['experiment'] = experiment + self.observedDirDict[dataDirectory] = observedDirInfo + self.startPollingTimer(observedDirInfo, dataDirectory, experiment) + + def stopObservingPath(self, dataDirectory, experiment): + observedDirInfo = self.observedDirDict.get(dataDirectory) + if not observedDirInfo: + self.logger.debug('Observer for %s is not active' % dataDirectory) + return + + self.logger.debug('Stopping observer for %s' % dataDirectory) + t = observedDirInfo.get('pollTimer') + if t: + t.cancel() + del self.observedDirDict[dataDirectory] + + def start(self): + self.logger.debug('Starting ftp observer agent') + + def stop(self): + self.logger.debug('Stopping ftp observer agent') + self.isDone = True + for (dataDirectory,observedDirInfo) in self.observedDirDict.items(): + experiment = observedDirInfo.get('experiment') + self.stopObservingPath(dataDirectory, experiment) + +#################################################################### +# Testing + +if __name__ == '__main__': + import time + agent = FtpFileSystemObserverAgent('zagreb', 2811) + print 'ORIGINAL FILES: ', agent.getFiles('/tmp/test') + agent.startObservingPath('/tmp/test', 'e1') + time.sleep(100) + agent.stopObservingPath('/tmp/test', 'e1') + diff --git a/src/python/dm/daq_web_service/service/impl/watchdogFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/watchdogFileSystemObserverAgent.py new file mode 100644 index 0000000000000000000000000000000000000000..81def9895419d17a872f07c27e3b035bf7a1c7d0 --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/watchdogFileSystemObserverAgent.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python + +from watchdog.observers.polling import PollingObserver +from fileSystemObserverAgent import FileSystemObserverAgent +from dmFileSystemEventHandler import DmFileSystemEventHandler + +class WatchdogFileSystemObserverAgent(FileSystemObserverAgent): + + def __init__(self): + FileSystemObserverAgent.__init__(self) + self.observer = PollingObserver() + self.observedWatchDict = {} + + def startObservingPath(self, dataDirectory, experiment): + self.logger.debug('Starting observer for %s' % dataDirectory) + eventHandler = DmFileSystemEventHandler(self.fileSystemObserver, dataDirectory, experiment) + observedWatch = self.observer.schedule(eventHandler, dataDirectory, recursive=True) + self.observedWatchDict[dataDirectory] = observedWatch + + def stopObservingPath(self, dataDirectory, experiment): + observedWatch = self.observedWatchDict.get(dataDirectory) + if observedWatch: + self.logger.debug('Stopping observer for %s' % dataDirectory) + self.observer.unschedule(observedWatch) + del self.observedWatchDict[dataDirectory] + else: + self.logger.debug('Observer for %s is not active' % dataDirectory) + + def start(self): + self.logger.debug('Starting watchdog observer agent') + self.observer.start() + + def stop(self): + self.logger.debug('Stopping watchdog observer agent') + self.observer.stop() + self.observer.join() +