diff --git a/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py b/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py index 74a7d4c4fd75d2e8425c439a3942201768f1dbf9..b9b5788ee30ba641c1d28aa1a8d3f391448d36d5 100755 --- a/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py +++ b/src/python/dm/daq_web_service/service/impl/dmFileSystemEventHandler.py @@ -8,10 +8,12 @@ from dm.common.utility.loggingManager import LoggingManager class DmFileSystemEventHandler(FileSystemEventHandler): - def __init__(self, fileSystemObserver): + def __init__(self, fileSystemObserver, daqPath, experiment): FileSystemEventHandler.__init__(self) self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) self.fileSystemObserver = fileSystemObserver + self.daqPath = daqPath + self.experiment = experiment def dispatch(self, event): FileSystemEventHandler.dispatch(self, event) @@ -27,10 +29,10 @@ class DmFileSystemEventHandler(FileSystemEventHandler): def on_modified(self, event): FileSystemEventHandler.on_modified(self, event) - path = event.src_path + filePath = event.src_path self.logger.debug('File system modified event: %s' % (event.__dict__)) if not event.is_directory: - self.fileSystemObserver.observedFileUpdated(path) + self.fileSystemObserver.observedFileUpdated(filePath, self.daqPath, self.experiment) def on_moved(self, event): FileSystemEventHandler.on_moved(self, event) diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index 1c9da104f3fa21508cebaa40d0a4e9d679f23747..e5d73a5af4092efd81023976cc379c371e1a713c 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -10,6 +10,7 @@ from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.exceptions.invalidRequest import InvalidRequest from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory +from experimentTracker import ExperimentTracker from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): @@ -17,20 +18,25 @@ class ExperimentSessionControllerImpl(DmObjectManager): def __init__(self): DmObjectManager.__init__(self) - self.experimentDict = {} + self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() def startDaq(self, name, dataDirectory): - FileSystemObserver.getInstance().startObservingPath(dataDirectory) + experiment = self.dsExperimentApi.getExperimentByName(name) + storageDirectory = experiment.get('storageDirectory') + if storageDirectory is None: + raise InvalidRequest('Experiment %s has not been started.' % name) startTime = time.time() - experiment = Experiment({'name' : name, 'dataDirectory' : dataDirectory, 'daqStartTime' : startTime}) - self.experimentDict[name] = experiment + experiment['daqDataDirectory'] = dataDirectory + experiment['daqStartTime'] = startTime + FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) + ExperimentTracker.getInstance().put(name, experiment) return experiment def stopDaq(self, name): - experiment = self.experimentDict.get(name) + experiment = ExperimentTracker.getInstance().get(name) if experiment is None or experiment.get('daqEndTime') is not None: raise InvalidRequest('Experiment %s is not active.' % name) dataDirectory = experiment.get('dataDirectory') - FileSystemObserver.getInstance().stopObservingPath(dataDirectory) experiment['daqEndTime'] = time.time() + FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) return experiment diff --git a/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py b/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py index 443f2b4cc902cae17067e07c527c10d577ef4283..fda96165fadeb984d2bf159077ca8cc7ea386bfd 100755 --- a/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py +++ b/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py @@ -30,20 +30,22 @@ class FileProcessingThread(threading.Thread): file = self.fileProcessingQueue.pop() if file is None: break - path = file.getPath() + filePath = file.getFilePath() + daqPath = file.getDaqPath() + experiment = file.getExperiment() try: for processorKey in self.fileProcessorKeyList: processor = self.fileProcessorDict.get(processorKey) processorName = processor.__class__.__name__ self.logger.debug('%s is about to process file %s ' % (processorName, file)) try: - processor.processFile(path) + processor.processFile(filePath, daqPath, experiment) self.logger.debug('%s processed file %s ' % (processorName, file)) except Exception, ex: self.logger.exception(ex) self.logger.debug('%s processing failed for file %s ' % (processorName, file)) file[processorName] = {'error' : ex} - self.unprocessedFileDict[path] = file + self.unprocessedFileDict[filePath] = file except Exception, ex: self.logger.exception(ex) diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index bbe30f768c0efa6e8d8498febdebf7afe20f41dd..e6f8ed2e31001f18492ef2a055d02cd9ff78a6be 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -40,7 +40,6 @@ class FileSystemObserver(threading.Thread,Singleton): self.observedFileMap = {} self.observer = PollingObserver() - self.eventHandler = DmFileSystemEventHandler(self) self.observedWatchDict = {} self.__configure() self.fileProcessingManager = FileProcessingManager.getInstance() @@ -55,33 +54,35 @@ class FileSystemObserver(threading.Thread,Singleton): self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY)) self.fileSystemTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY)) - def startObservingPath(self, path): + def startObservingPath(self, daqPath, experiment): self.lock.acquire() try: - self.logger.debug('Starting observer for %s' % path) - observedWatch = self.observer.schedule(self.eventHandler, path, recursive=True) - self.observedWatchDict[path] = observedWatch + self.logger.debug('Starting observer for %s' % daqPath) + eventHandler = DmFileSystemEventHandler(self, daqPath, experiment) + observedWatch = self.observer.schedule(eventHandler, daqPath, recursive=True) + self.observedWatchDict[daqPath] = observedWatch finally: self.lock.release() - def stopObservingPath(self, path): + def stopObservingPath(self, daqPath, experiment): self.lock.acquire() try: - observedWatch = self.observedWatchDict.get(path) + observedWatch = self.observedWatchDict.get(daqPath) if observedWatch: - self.logger.debug('Stopping observer for %s' % path) + self.logger.debug('Stopping observer for %s' % daqPath) self.observer.unschedule(observedWatch) + del self.observedWatchDict[daqPath] else: - self.logger.debug('Observer for %s is not active' % path) + self.logger.debug('Observer for %s is not active' % daqPath) finally: self.lock.release() - def observedFileUpdated(self, path): + def observedFileUpdated(self, filePath, daqPath, experiment): self.lock.acquire() try: - observedFile = self.observedFileMap.get(path, ObservedFile(path=path)) + observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, daqPath=daqPath, experiment=experiment)) observedFile.setLastUpdatedTimestampToNow() - self.observedFileMap[path] = observedFile + self.observedFileMap[filePath] = observedFile self.logger.debug('Observed file updated: %s', observedFile) finally: self.lock.release() @@ -90,24 +91,24 @@ class FileSystemObserver(threading.Thread,Singleton): self.lock.acquire() try: now = time.time() - pathsForProcessing = [] - for (path,observedFile) in self.observedFileMap.items(): + filePathsForProcessing = [] + for (filePath,observedFile) in self.observedFileMap.items(): timestamp = observedFile.get('lastUpdateTimestamp') deltaT = now - timestamp if deltaT > self.minFileProcessingDelayInSeconds: - self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (path, deltaT)) - pathsForProcessing.append(path) - return pathsForProcessing + self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT)) + filePathsForProcessing.append(filePath) + return filePathsForProcessing finally: self.lock.release() - def processObservedFile(self, path): + def processObservedFile(self, filePath): self.lock.acquire() try: - self.logger.debug('Processing file %s' % path) - observedFile = self.observedFileMap.get(path) + self.logger.debug('Processing file %s' % filePath) + observedFile = self.observedFileMap.get(filePath) if observedFile is not None: - del self.observedFileMap[path] + del self.observedFileMap[filePath] self.fileProcessingManager.processObservedFile(observedFile) finally: self.lock.release() @@ -132,9 +133,9 @@ class FileSystemObserver(threading.Thread,Singleton): try: self.logger.debug('Checking observed files') - pathsForProcessing = self.checkObservedFilesForProcessing() - for path in pathsForProcessing: - self.processObservedFile(path) + filePathsForProcessing = self.checkObservedFilesForProcessing() + for filePath in filePathsForProcessing: + self.processObservedFile(filePath) except Exception, ex: self.logger.exception(ex) self.eventFlag.wait(timeout=self.fileSystemTimeoutInSeconds) diff --git a/src/python/dm/ds_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/ds_web_service/service/impl/experimentSessionControllerImpl.py index f41021dcb26a35027e857986fb43d5a3988f3359..f19c114de85fc75b442be16867076b70e148a125 100755 --- a/src/python/dm/ds_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/ds_web_service/service/impl/experimentSessionControllerImpl.py @@ -29,10 +29,12 @@ class ExperimentSessionControllerImpl(DmObjectManager): def getExperimentByName(self, name): experiment = self.experimentDbApi.getExperimentByName(name) + StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment) return experiment def getExperimentById(self, id): experiment = self.experimentDbApi.getExperimentById(id) + StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment) return experiment def addExperiment(self, name, experimentTypeId, description): @@ -40,10 +42,18 @@ class ExperimentSessionControllerImpl(DmObjectManager): return experiment def startExperiment(self, name): - experiment = self.experimentDbApi.setExperimentStartDateToNow(name) - StorageManager.getInstance().createExperimentDataDirectory(experiment) + experiment = self.experimentDbApi.getExperimentByName(name) + if experiment.get('startDate') is None: + experiment = self.experimentDbApi.setExperimentStartDateToNow(name) + StorageManager.getInstance().createExperimentDataDirectory(experiment) + else: + StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment) return experiment def stopExperiment(self, name): - experiment = self.experimentDbApi.setExperimentEndDateToNow(name) + experiment = self.experimentDbApi.getExperimentByName(name) + if experiment.get('endDate') is None: + experiment = self.experimentDbApi.setExperimentEndDateToNow(name) + StorageManager.getInstance().updateExperimentWithStorageDataDirectory(experiment) return experiment +