#!/usr/bin/env python from threading import Timer from fileSystemObserverAgent import FileSystemObserverAgent from dm.common.utility.osUtility import OsUtility class PollingFileSystemObserverAgent(FileSystemObserverAgent): DEFAULT_POLLING_PERIOD_IN_SECONDS = 15 DEFAULT_RETRY_PERIOD_IN_SECONDS = 60 def __init__(self, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS): FileSystemObserverAgent.__init__(self) self.pollingPeriod = pollingPeriod self.retryDelay = 0 self.observedDirDict = {} self.isDone = False def getFiles(self, dataDirectory): self.logger.debug('Retrieving files for directory: %s' % (dataDirectory)) return OsUtility.findFilesAsDict(dataDirectory, {}) def updateFile(self, filePath, dataDirectory, experiment): if self.fileSystemObserver: self.logger.debug('Processing file path: %s' % filePath) self.fileSystemObserver.fileUpdated(filePath, dataDirectory, experiment) def processFiles(self, fileDict, oldFileDict, dataDirectory, experiment): for filePath in fileDict.keys(): if not oldFileDict.has_key(filePath): # new file, must be updated self.logger.debug('New file path detected: %s' % filePath) self.updateFile(filePath, dataDirectory, experiment) else: # old file, check timestamp oldFileInfo = oldFileDict.get(filePath) oldModifyTime = oldFileInfo.get('fileModificationTime', '') fileInfo = fileDict.get(filePath) modifyTime = fileInfo.get('fileModificationTime') if modifyTime != oldModifyTime: # file has been modified, need to process it self.logger.debug('Modified file path detected: %s' % filePath) self.updateFile(filePath, dataDirectory, experiment) def pollFileSystem(self, dataDirectory, experiment): try: observedDirInfo = self.observedDirDict.get(dataDirectory) if not observedDirInfo: self.logger.debug('Polling cancelled for directory: %s' % dataDirectory) return oldFileDict = observedDirInfo.get('files') fileDict = self.getFiles(dataDirectory) observedDirInfo['files'] = fileDict self.processFiles(fileDict, oldFileDict, dataDirectory, experiment) self.retryDelay = 0 except Exception, ex: self.logger.error('Could not poll directory %s: %s' % (dataDirectory,ex)) self.retryDelay = self.DEFAULT_RETRY_PERIOD_IN_SECONDS self.logger.debug('Next polling for directory %s will be delayed by: %s seconds' % (dataDirectory, self.retryDelay)) self.startPollingTimer(observedDirInfo, dataDirectory, experiment) def startPollingTimer(self, observedDirInfo, dataDirectory, experiment): if self.isDone: return delay = self.pollingPeriod + self.retryDelay t = Timer(delay, self.pollFileSystem, [dataDirectory, experiment]) observedDirInfo['pollTimer'] = t t.start() def startObservingPath(self, dataDirectory, experiment): observedDirInfo = self.observedDirDict.get(dataDirectory) if observedDirInfo: self.logger.debug('Observer for %s is already active' % dataDirectory) return self.logger.debug('Starting observer for %s' % dataDirectory) fileDict = self.getFiles(dataDirectory) observedDirInfo = self.observedDirDict.get(dataDirectory, {}) observedDirInfo['files'] = fileDict observedDirInfo['experiment'] = experiment self.observedDirDict[dataDirectory] = observedDirInfo self.startPollingTimer(observedDirInfo, dataDirectory, experiment) def stopObservingPath(self, dataDirectory, experiment): observedDirInfo = self.observedDirDict.get(dataDirectory) if not observedDirInfo: self.logger.debug('Observer for %s is not active' % dataDirectory) return self.logger.debug('Stopping observer for %s' % dataDirectory) t = observedDirInfo.get('pollTimer') if t: t.cancel() del self.observedDirDict[dataDirectory] def start(self): self.logger.debug('Starting ftp observer agent') def stop(self): self.logger.debug('Stopping ftp observer agent') self.isDone = True for (dataDirectory,observedDirInfo) in self.observedDirDict.items(): experiment = observedDirInfo.get('experiment') self.stopObservingPath(dataDirectory, experiment) #################################################################### # Testing if __name__ == '__main__': import time agent = PollingFileSystemObserverAgent() print 'ORIGINAL FILES: ', agent.getFiles('/tmp/test') agent.startObservingPath('/tmp/test', 'e1') time.sleep(100) agent.stopObservingPath('/tmp/test', 'e1')