Skip to content
Snippets Groups Projects
Commit 1af7defb authored by sveseli's avatar sveseli
Browse files

changes related to new file processing interface that includes experiment and daqPath

parent 721a8e77
No related branches found
No related tags found
No related merge requests found
......@@ -8,10 +8,12 @@ from dm.common.utility.loggingManager import LoggingManager
class DmFileSystemEventHandler(FileSystemEventHandler):
def __init__(self, fileSystemObserver):
def __init__(self, fileSystemObserver, daqPath, experiment):
FileSystemEventHandler.__init__(self)
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.fileSystemObserver = fileSystemObserver
self.daqPath = daqPath
self.experiment = experiment
def dispatch(self, event):
FileSystemEventHandler.dispatch(self, event)
......@@ -27,10 +29,10 @@ class DmFileSystemEventHandler(FileSystemEventHandler):
def on_modified(self, event):
FileSystemEventHandler.on_modified(self, event)
path = event.src_path
filePath = event.src_path
self.logger.debug('File system modified event: %s' % (event.__dict__))
if not event.is_directory:
self.fileSystemObserver.observedFileUpdated(path)
self.fileSystemObserver.observedFileUpdated(filePath, self.daqPath, self.experiment)
def on_moved(self, event):
FileSystemEventHandler.on_moved(self, event)
......
......@@ -10,6 +10,7 @@ from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
from experimentTracker import ExperimentTracker
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
......@@ -17,20 +18,25 @@ class ExperimentSessionControllerImpl(DmObjectManager):
def __init__(self):
DmObjectManager.__init__(self)
self.experimentDict = {}
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
def startDaq(self, name, dataDirectory):
FileSystemObserver.getInstance().startObservingPath(dataDirectory)
experiment = self.dsExperimentApi.getExperimentByName(name)
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % name)
startTime = time.time()
experiment = Experiment({'name' : name, 'dataDirectory' : dataDirectory, 'daqStartTime' : startTime})
self.experimentDict[name] = experiment
experiment['daqDataDirectory'] = dataDirectory
experiment['daqStartTime'] = startTime
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
ExperimentTracker.getInstance().put(name, experiment)
return experiment
def stopDaq(self, name):
experiment = self.experimentDict.get(name)
experiment = ExperimentTracker.getInstance().get(name)
if experiment is None or experiment.get('daqEndTime') is not None:
raise InvalidRequest('Experiment %s is not active.' % name)
dataDirectory = experiment.get('dataDirectory')
FileSystemObserver.getInstance().stopObservingPath(dataDirectory)
experiment['daqEndTime'] = time.time()
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
return experiment
......@@ -30,20 +30,22 @@ class FileProcessingThread(threading.Thread):
file = self.fileProcessingQueue.pop()
if file is None:
break
path = file.getPath()
filePath = file.getFilePath()
daqPath = file.getDaqPath()
experiment = file.getExperiment()
try:
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
processorName = processor.__class__.__name__
self.logger.debug('%s is about to process file %s ' % (processorName, file))
try:
processor.processFile(path)
processor.processFile(filePath, daqPath, experiment)
self.logger.debug('%s processed file %s ' % (processorName, file))
except Exception, ex:
self.logger.exception(ex)
self.logger.debug('%s processing failed for file %s ' % (processorName, file))
file[processorName] = {'error' : ex}
self.unprocessedFileDict[path] = file
self.unprocessedFileDict[filePath] = file
except Exception, ex:
self.logger.exception(ex)
......
......@@ -40,7 +40,6 @@ class FileSystemObserver(threading.Thread,Singleton):
self.observedFileMap = {}
self.observer = PollingObserver()
self.eventHandler = DmFileSystemEventHandler(self)
self.observedWatchDict = {}
self.__configure()
self.fileProcessingManager = FileProcessingManager.getInstance()
......@@ -55,33 +54,35 @@ class FileSystemObserver(threading.Thread,Singleton):
self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY))
self.fileSystemTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
def startObservingPath(self, path):
def startObservingPath(self, daqPath, experiment):
self.lock.acquire()
try:
self.logger.debug('Starting observer for %s' % path)
observedWatch = self.observer.schedule(self.eventHandler, path, recursive=True)
self.observedWatchDict[path] = observedWatch
self.logger.debug('Starting observer for %s' % daqPath)
eventHandler = DmFileSystemEventHandler(self, daqPath, experiment)
observedWatch = self.observer.schedule(eventHandler, daqPath, recursive=True)
self.observedWatchDict[daqPath] = observedWatch
finally:
self.lock.release()
def stopObservingPath(self, path):
def stopObservingPath(self, daqPath, experiment):
self.lock.acquire()
try:
observedWatch = self.observedWatchDict.get(path)
observedWatch = self.observedWatchDict.get(daqPath)
if observedWatch:
self.logger.debug('Stopping observer for %s' % path)
self.logger.debug('Stopping observer for %s' % daqPath)
self.observer.unschedule(observedWatch)
del self.observedWatchDict[daqPath]
else:
self.logger.debug('Observer for %s is not active' % path)
self.logger.debug('Observer for %s is not active' % daqPath)
finally:
self.lock.release()
def observedFileUpdated(self, path):
def observedFileUpdated(self, filePath, daqPath, experiment):
self.lock.acquire()
try:
observedFile = self.observedFileMap.get(path, ObservedFile(path=path))
observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, daqPath=daqPath, experiment=experiment))
observedFile.setLastUpdatedTimestampToNow()
self.observedFileMap[path] = observedFile
self.observedFileMap[filePath] = observedFile
self.logger.debug('Observed file updated: %s', observedFile)
finally:
self.lock.release()
......@@ -90,24 +91,24 @@ class FileSystemObserver(threading.Thread,Singleton):
self.lock.acquire()
try:
now = time.time()
pathsForProcessing = []
for (path,observedFile) in self.observedFileMap.items():
filePathsForProcessing = []
for (filePath,observedFile) in self.observedFileMap.items():
timestamp = observedFile.get('lastUpdateTimestamp')
deltaT = now - timestamp
if deltaT > self.minFileProcessingDelayInSeconds:
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (path, deltaT))
pathsForProcessing.append(path)
return pathsForProcessing
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
filePathsForProcessing.append(filePath)
return filePathsForProcessing
finally:
self.lock.release()
def processObservedFile(self, path):
def processObservedFile(self, filePath):
self.lock.acquire()
try:
self.logger.debug('Processing file %s' % path)
observedFile = self.observedFileMap.get(path)
self.logger.debug('Processing file %s' % filePath)
observedFile = self.observedFileMap.get(filePath)
if observedFile is not None:
del self.observedFileMap[path]
del self.observedFileMap[filePath]
self.fileProcessingManager.processObservedFile(observedFile)
finally:
self.lock.release()
......@@ -132,9 +133,9 @@ class FileSystemObserver(threading.Thread,Singleton):
try:
self.logger.debug('Checking observed files')
pathsForProcessing = self.checkObservedFilesForProcessing()
for path in pathsForProcessing:
self.processObservedFile(path)
filePathsForProcessing = self.checkObservedFilesForProcessing()
for filePath in filePathsForProcessing:
self.processObservedFile(filePath)
except Exception, ex:
self.logger.exception(ex)
self.eventFlag.wait(timeout=self.fileSystemTimeoutInSeconds)
......
......@@ -29,10 +29,12 @@ class ExperimentSessionControllerImpl(DmObjectManager):
def getExperimentByName(self, name):
experiment = self.experimentDbApi.getExperimentByName(name)
StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment)
return experiment
def getExperimentById(self, id):
experiment = self.experimentDbApi.getExperimentById(id)
StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment)
return experiment
def addExperiment(self, name, experimentTypeId, description):
......@@ -40,10 +42,18 @@ class ExperimentSessionControllerImpl(DmObjectManager):
return experiment
def startExperiment(self, name):
experiment = self.experimentDbApi.setExperimentStartDateToNow(name)
StorageManager.getInstance().createExperimentDataDirectory(experiment)
experiment = self.experimentDbApi.getExperimentByName(name)
if experiment.get('startDate') is None:
experiment = self.experimentDbApi.setExperimentStartDateToNow(name)
StorageManager.getInstance().createExperimentDataDirectory(experiment)
else:
StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment)
return experiment
def stopExperiment(self, name):
experiment = self.experimentDbApi.setExperimentEndDateToNow(name)
experiment = self.experimentDbApi.getExperimentByName(name)
if experiment.get('endDate') is None:
experiment = self.experimentDbApi.setExperimentEndDateToNow(name)
StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment)
return experiment
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment