Newer
Older
#!/usr/bin/env python
import threading
import time
import os
from watchdog.observers.polling import PollingObserver
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.singleton import Singleton
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dmFileSystemEventHandler import DmFileSystemEventHandler
from daqTracker import DaqTracker
class FileSystemObserver(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileSystemObserver'
MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds'
FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds'
FILE_SYSTEM_OBSERVER_AGENT_KEY = 'filesystemobserveragent'
DAQ_CHUNK_SIZE_IN_FILES = 500
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
FileSystemObserver.__instanceLock.acquire()
try:
if FileSystemObserver.__instance:
return
FileSystemObserver.__instance = self
threading.Thread.__init__(self)
self.setName('FileSystemObserverThread')
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.eventFlag = threading.Event()
self.exitFlag = False
self.observedFileMap = {}
self.__configure()
self.fileProcessingManager = FileProcessingManager.getInstance()
self.logger.debug('Initialization complete')
finally:
FileSystemObserver.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(FileSystemObserver.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY))
self.logger.debug('Minimum file processing delay: %s seconds' % self.minFileProcessingDelayInSeconds)
self.fileSystemEventTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
self.logger.debug('File system event timeout: %s seconds' % self.fileSystemEventTimeoutInSeconds)
agentClass = cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_OBSERVER_AGENT_KEY)
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(agentClass)
self.logger.debug('Creating file system observer agent instance of class %s' % className)
self.fileSystemObserverAgent = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.fileSystemObserverAgent.setFileSystemObserver(self)
@ThreadingUtility.synchronize
def createDirectory(self, dataDirectory):
self.fileSystemObserverAgent.createDirectory(dataDirectory)
@ThreadingUtility.synchronize
def getFiles(self, dataDirectory):
self.logger.debug('Agent is retrieving files for %s' % dataDirectory)
return self.fileSystemObserverAgent.getFiles(dataDirectory)
@ThreadingUtility.synchronize
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Agent is starting observer for %s' % dataDirectory)
self.fileSystemObserverAgent.startObservingPath(dataDirectory, experiment)
@ThreadingUtility.synchronize
def stopObservingPath(self, dataDirectory, experiment):
self.logger.debug('Agent is stopping observer for %s' % dataDirectory)
self.fileSystemObserverAgent.stopObservingPath(dataDirectory, experiment)
@ThreadingUtility.synchronize
def fileUpdated(self, filePath, dataDirectory, experiment):
daqInfo = DaqTracker.getInstance().getDaqInfoByExperimentAndDataDirectory(experiment, dataDirectory)
experimentName = experiment.get('name')
# No daq info, ignore
if not daqInfo:

sveseli
committed
self.logger.debug('No daq for data directory %s and experiment %s, file path %s will not be processed' % (dataDirectory, experimentName, experimentfilePath))
return
# Do not process hidden files unless requested
if not ValueUtility.toBoolean(daqInfo.get('processHiddenFiles')):
fileName = os.path.basename(filePath)
if fileName.startswith('.'):
self.logger.debug('File path %s is hidden file, will not process it' % filePath)
return

sveseli
committed
daqId = daqInfo['id']
observedFile = self.observedFileMap.get(filePath)
destDirectory = daqInfo.get('destDirectory')
if not observedFile:
observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory)
observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys()
observedFile['statusMonitor'] = daqInfo
observedFile['owner'] = daqId
self.observedFileMap[filePath] = observedFile

sveseli
committed
self.logger.debug('New observed file: %s (daq id: %s)' % (filePath, daqId))
daqInfo.fileAdded(filePath)

sveseli
committed
self.logger.debug('Observed file updated: %s (daq id: %s)' % (filePath, daqId))
observedFile.setLastUpdateTimeToNow()
@ThreadingUtility.synchronize
def checkObservedFilesForProcessing(self):
now = time.time()
filePathsForProcessing = []
# We use number of waiting files to determine whether
# more files should be added for processing, so we need to
# update all daq infos before going over observed files
DaqTracker.getInstance().updateDaqInfos()
nObservedFiles = len(self.observedFileMap)
self.logger.debug('There are %s observed files waiting to be processed.' % (nObservedFiles))
scheduledDict = {}
skippedDict = {}
for (filePath,observedFile) in self.observedFileMap.items():
daqId = observedFile['daqInfo']['id']
daqInfo = DaqTracker.getInstance().getDaqInfo(daqId)
nCompletedFiles = daqInfo.get('nCompletedFiles', 0)
nQueuedFiles = self.fileProcessingManager.getNumberOfQueuedFilesByOwner(daqId)
nScheduledFiles = scheduledDict.get(daqId,0)
#self.logger.debug('There are %s scheduled and %s already queued files for DAQ id %s.' % (nScheduledFiles, nQueuedFiles, daqId))
if nQueuedFiles+nScheduledFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
# We do not need to add more files for processing for this DAQ
#self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nQueuedFiles, daqId))
skippedDict[daqId] = skippedDict.get(daqId,0)+1
continue
timestamp = observedFile.get('lastUpdateTime')
deltaT = now - timestamp
if deltaT > self.minFileProcessingDelayInSeconds:
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
filePathsForProcessing.append(filePath)
scheduledDict[daqId] = scheduledDict.get(daqId,0)+1
else:
skippedDict[daqId] = skippedDict.get(daqId,0)+1
for (daqId,nSkipped) in skippedDict.items():
self.logger.debug('Skipped for procesing %s observed files for DAQ id %s.' % (nSkipped, daqId))
for (daqId,nScheduled) in scheduledDict.items():
self.logger.debug('Scheduled for processing %s observed files for DAQ id %s.' % (nScheduled, daqId))
return filePathsForProcessing
@ThreadingUtility.synchronize
self.logger.debug('Processing file %s' % filePath)
observedFile = self.observedFileMap.get(filePath)
if observedFile is not None:
del self.observedFileMap[filePath]
self.fileProcessingManager.queueFile(observedFile)
else:
self.logger.warn('No observed file found for path %s' % filePath)
@ThreadingUtility.synchronize
def start(self):
self.logger.debug('Starting file observer thread')
threading.Thread.start(self)
self.logger.debug('Starting file observer agent')
self.fileSystemObserverAgent.start()
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
while True:
if self.exitFlag:
self.logger.debug('Exit flag set, %s done' % self.getName())
break
try:

sveseli
committed
filePathsForProcessing = self.checkObservedFilesForProcessing()
if len(filePathsForProcessing):
self.logger.debug('Will queue %s new files for processing' % (len(filePathsForProcessing)))

sveseli
committed
for filePath in filePathsForProcessing:
except Exception, ex:
self.logger.exception(ex)
self.eventFlag.wait(timeout=self.fileSystemEventTimeoutInSeconds)
@ThreadingUtility.synchronize
def stop(self):
self.logger.debug('Stopping file observer agent')
self.fileSystemObserverAgent.stop()
self.logger.debug('Stopping file observer thread')
self.exitFlag = True
self.eventFlag.set()
self.logger.debug('Event is set, joining thread')
threading.Thread.join(self)
self.logger.debug('Module stopped')
@ThreadingUtility.synchronize
def setEvent(self):
self.eventFlag.set()
@ThreadingUtility.synchronize
def clearEvent(self):
self.eventFlag.clear()
####################################################################
# Testing
if __name__ == '__main__':
fp = FileSystemObserver.getInstance()
fp.start()
time.sleep(30)
fp.stop()