Forked from
DM / dm-docs
261 commits behind, 664 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ftpFileSystemObserverAgent.py 5.00 KiB
#!/usr/bin/env python
from threading import Timer
from fileSystemObserverAgent import FileSystemObserverAgent
from dm.common.utility.ftpUtility import FtpUtility
class FtpFileSystemObserverAgent(FileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 5
def __init__(self, host, port, username=None, password=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
FileSystemObserverAgent.__init__(self)
self.host = host
self.port = port
self.username = username
self.password = password
self.pollingPeriod = pollingPeriod
self.observedDirDict = {}
self.isDone = False
def getFiles(self, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultHost=self.host, defaultPort=self.port)
self.logger.debug('Retrieving files from FTP host: %s, port: %s, directory path: %s' % (host, port, dirPath))
ftpUtility = FtpUtility(host, port, self.username, self.password)
return ftpUtility.getFiles(dirPath, {})
def updateFile(self, filePath, dataDirectory, experiment):
if self.fileSystemObserver:
self.logger.debug('Processing file path: %s' % filePath)
self.fileSystemObserver.fileUpdated(filePath, dataDirectory, experiment)
def processFiles(self, fileDict, oldFileDict, dataDirectory, experiment):
for filePath in fileDict.keys():
if not oldFileDict.has_key(filePath):
# new file, must be updated
self.logger.debug('New file path detected: %s' % filePath)
self.updateFile(filePath, dataDirectory, experiment)
else:
# old file, check timestamp
oldFileInfo = oldFileDict.get(filePath)
oldModifyTime = oldFileInfo.get('fileModificationTime', '')
fileInfo = fileDict.get(filePath)
modifyTime = fileInfo.get('fileModificationTime')
if modifyTime != oldModifyTime:
# file has been modified, need to process it
self.logger.debug('Modified file path detected: %s' % filePath)
self.updateFile(filePath, dataDirectory, experiment)
def pollFileSystem(self, dataDirectory, experiment):
try:
fileDict = self.getFiles(dataDirectory)
observedDirInfo = self.observedDirDict.get(dataDirectory)
if not observedDirInfo:
self.logger.debug('Polling cancelled for directory: %s' % dataDirectory)
return
oldFileDict = observedDirInfo.get('files')
observedDirInfo['files'] = fileDict
self.processFiles(fileDict, oldFileDict, dataDirectory, experiment)
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
except Exception, ex:
self.logger.error('Could not poll directory %s: %s' % (dataDirectory,ex))
def startPollingTimer(self, observedDirInfo, dataDirectory, experiment):
if self.isDone:
return
t = Timer(self.pollingPeriod, self.pollFileSystem, [dataDirectory, experiment])
observedDirInfo['pollTimer'] = t
t.start()
def startObservingPath(self, dataDirectory, experiment):
observedDirInfo = self.observedDirDict.get(dataDirectory)
if observedDirInfo:
self.logger.debug('Observer for %s is already active' % dataDirectory)
return
self.logger.debug('Starting observer for %s' % dataDirectory)
fileDict = self.getFiles(dataDirectory)
observedDirInfo = self.observedDirDict.get(dataDirectory, {})
observedDirInfo['files'] = fileDict
observedDirInfo['experiment'] = experiment
self.observedDirDict[dataDirectory] = observedDirInfo
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
def stopObservingPath(self, dataDirectory, experiment):
observedDirInfo = self.observedDirDict.get(dataDirectory)
if not observedDirInfo:
self.logger.debug('Observer for %s is not active' % dataDirectory)
return
self.logger.debug('Stopping observer for %s' % dataDirectory)
t = observedDirInfo.get('pollTimer')
if t:
t.cancel()
del self.observedDirDict[dataDirectory]
def start(self):
self.logger.debug('Starting ftp observer agent')
def stop(self):
self.logger.debug('Stopping ftp observer agent')
self.isDone = True
for (dataDirectory,observedDirInfo) in self.observedDirDict.items():
experiment = observedDirInfo.get('experiment')
self.stopObservingPath(dataDirectory, experiment)
####################################################################
# Testing
if __name__ == '__main__':
import time
agent = FtpFileSystemObserverAgent('zagreb', 2811)
print 'ORIGINAL FILES: ', agent.getFiles('/tmp/test')
agent.startObservingPath('/tmp/test', 'e1')
time.sleep(100)
agent.stopObservingPath('/tmp/test', 'e1')