Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python
import threading
import time
from watchdog.observers.polling import PollingObserver
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.singleton import Singleton
from dmFileSystemEventHandler import DmFileSystemEventHandler
from fileProcessingManager import FileProcessingManager
class FileSystemObserver(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileSystemObserver'
MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds'
FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds'
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
FileSystemObserver.__instanceLock.acquire()
try:
if FileSystemObserver.__instance:
return
FileSystemObserver.__instance = self
threading.Thread.__init__(self)
self.setName('FileSystemObserverThread')
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.eventFlag = threading.Event()
self.exitFlag = False
self.observedFileMap = {}
self.observer = PollingObserver()
self.observedWatchDict = {}
self.__configure()
self.fileProcessingManager = FileProcessingManager.getInstance()
self.logger.debug('Initialization complete')
finally:
FileSystemObserver.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(FileSystemObserver.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY))
self.fileSystemTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))

sveseli
committed
def startObservingPath(self, daqPath, experiment):
self.lock.acquire()
try:

sveseli
committed
self.logger.debug('Starting observer for %s' % daqPath)
eventHandler = DmFileSystemEventHandler(self, daqPath, experiment)
observedWatch = self.observer.schedule(eventHandler, daqPath, recursive=True)
self.observedWatchDict[daqPath] = observedWatch
finally:
self.lock.release()

sveseli
committed
def stopObservingPath(self, daqPath, experiment):
self.lock.acquire()
try:

sveseli
committed
observedWatch = self.observedWatchDict.get(daqPath)
if observedWatch:

sveseli
committed
self.logger.debug('Stopping observer for %s' % daqPath)
self.observer.unschedule(observedWatch)

sveseli
committed
del self.observedWatchDict[daqPath]
else:

sveseli
committed
self.logger.debug('Observer for %s is not active' % daqPath)
finally:
self.lock.release()

sveseli
committed
def observedFileUpdated(self, filePath, daqPath, experiment):
self.lock.acquire()
try:

sveseli
committed
observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, daqPath=daqPath, experiment=experiment))
observedFile.setLastUpdatedTimestampToNow()

sveseli
committed
self.observedFileMap[filePath] = observedFile
self.logger.debug('Observed file updated: %s', observedFile)
finally:
self.lock.release()
def checkObservedFilesForProcessing(self):
self.lock.acquire()
try:
now = time.time()

sveseli
committed
filePathsForProcessing = []
for (filePath,observedFile) in self.observedFileMap.items():
timestamp = observedFile.get('lastUpdateTimestamp')
deltaT = now - timestamp
if deltaT > self.minFileProcessingDelayInSeconds:

sveseli
committed
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
filePathsForProcessing.append(filePath)
return filePathsForProcessing
finally:
self.lock.release()

sveseli
committed
def processObservedFile(self, filePath):
self.lock.acquire()
try:

sveseli
committed
self.logger.debug('Processing file %s' % filePath)
observedFile = self.observedFileMap.get(filePath)
if observedFile is not None:

sveseli
committed
del self.observedFileMap[filePath]
self.fileProcessingManager.processObservedFile(observedFile)
finally:
self.lock.release()
def start(self):
self.lock.acquire()
try:
self.logger.debug('Starting file observer thread')
threading.Thread.start(self)
self.logger.debug('Starting watchdog observer')
self.observer.start()
finally:
self.lock.release()
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
while True:
if self.exitFlag:
self.logger.debug('Exit flag set, %s done' % self.getName())
break
try:
self.logger.debug('Checking observed files')

sveseli
committed
filePathsForProcessing = self.checkObservedFilesForProcessing()
for filePath in filePathsForProcessing:
self.processObservedFile(filePath)
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
except Exception, ex:
self.logger.exception(ex)
self.eventFlag.wait(timeout=self.fileSystemTimeoutInSeconds)
def stop(self):
self.lock.acquire()
try:
self.logger.debug('Stopping watchdog observer')
self.observer.stop()
self.observer.join()
self.logger.debug('Stopping file observer thread')
self.exitFlag = True
self.eventFlag.set()
self.logger.debug('Event is set, joining thread')
threading.Thread.join(self)
self.logger.debug('Module stopped')
finally:
self.lock.release()
def setEvent(self):
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
def clearEvent(self):
self.lock.acquire()
try:
self.eventFlag.clear()
finally:
self.lock.release()
####################################################################
# Testing
if __name__ == '__main__':
fp = FileSystemObserver.getInstance()
fp.start()
time.sleep(30)
fp.stop()