Newer
Older
#!/usr/bin/env python
from threading import Timer
from fileSystemObserverAgent import FileSystemObserverAgent
from dm.common.utility.osUtility import OsUtility
class PollingFileSystemObserverAgent(FileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
DEFAULT_RETRY_PERIOD_IN_SECONDS = 60
def __init__(self, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
FileSystemObserverAgent.__init__(self)
self.pollingPeriod = pollingPeriod
self.retryDelay = 0
self.observedDirDict = {}
self.isDone = False
def getFiles(self, dataDirectory):
self.logger.debug('Retrieving files for directory: %s' % (dataDirectory))
return OsUtility.findFilesAsDict(dataDirectory, {})
def updateFile(self, filePath, dataDirectory, experiment):
if self.fileSystemObserver:
self.logger.debug('Processing file path: %s' % filePath)
self.fileSystemObserver.fileUpdated(filePath, dataDirectory, experiment)
def processFiles(self, fileDict, oldFileDict, dataDirectory, experiment):
for filePath in fileDict.keys():
if not oldFileDict.has_key(filePath):
# new file, must be updated
self.logger.debug('New file path detected: %s' % filePath)
self.updateFile(filePath, dataDirectory, experiment)
else:
# old file, check timestamp
oldFileInfo = oldFileDict.get(filePath)
oldModifyTime = oldFileInfo.get('fileModificationTime', '')
fileInfo = fileDict.get(filePath)
modifyTime = fileInfo.get('fileModificationTime')
if modifyTime != oldModifyTime:
# file has been modified, need to process it
self.logger.debug('Modified file path detected: %s' % filePath)
self.updateFile(filePath, dataDirectory, experiment)
def pollFileSystem(self, dataDirectory, experiment):
try:
observedDirInfo = self.observedDirDict.get(dataDirectory)
if not observedDirInfo:
self.logger.debug('Polling cancelled for directory: %s' % dataDirectory)
return
oldFileDict = observedDirInfo.get('files')
fileDict = self.getFiles(dataDirectory)
observedDirInfo['files'] = fileDict
self.processFiles(fileDict, oldFileDict, dataDirectory, experiment)
self.retryDelay = 0
except Exception, ex:
self.logger.error('Could not poll directory %s: %s' % (dataDirectory,ex))
self.retryDelay = self.DEFAULT_RETRY_PERIOD_IN_SECONDS
self.logger.debug('Next polling for directory %s will be delayed by: %s seconds' % (dataDirectory, self.retryDelay))
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
def startPollingTimer(self, observedDirInfo, dataDirectory, experiment):
if self.isDone:
return
delay = self.pollingPeriod + self.retryDelay
t = Timer(delay, self.pollFileSystem, [dataDirectory, experiment])
observedDirInfo['pollTimer'] = t
t.start()
def startObservingPath(self, dataDirectory, experiment):
observedDirInfo = self.observedDirDict.get(dataDirectory)
if observedDirInfo:
self.logger.debug('Observer for %s is already active' % dataDirectory)
return
self.logger.debug('Starting observer for %s' % dataDirectory)
fileDict = self.getFiles(dataDirectory)
observedDirInfo = self.observedDirDict.get(dataDirectory, {})
observedDirInfo['files'] = fileDict
observedDirInfo['experiment'] = experiment
self.observedDirDict[dataDirectory] = observedDirInfo
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
def stopObservingPath(self, dataDirectory, experiment):
observedDirInfo = self.observedDirDict.get(dataDirectory)
if not observedDirInfo:
self.logger.debug('Observer for %s is not active' % dataDirectory)
return
self.logger.debug('Stopping observer for %s' % dataDirectory)
t = observedDirInfo.get('pollTimer')
if t:
t.cancel()
del self.observedDirDict[dataDirectory]
def start(self):
self.logger.debug('Starting ftp observer agent')
def stop(self):
self.logger.debug('Stopping ftp observer agent')
self.isDone = True
for (dataDirectory,observedDirInfo) in self.observedDirDict.items():
experiment = observedDirInfo.get('experiment')
self.stopObservingPath(dataDirectory, experiment)
####################################################################
# Testing
if __name__ == '__main__':
import time
agent = PollingFileSystemObserverAgent()
print 'ORIGINAL FILES: ', agent.getFiles('/tmp/test')
agent.startObservingPath('/tmp/test', 'e1')
time.sleep(100)
agent.stopObservingPath('/tmp/test', 'e1')