From e0ccb58dbcc6e24e66cd123b1ae1832c412ed12e Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Wed, 2 Dec 2015 17:18:01 +0000 Subject: [PATCH] changes that allow simultaneous data acquisitions for the same experiment --- ...daqProcessingCompleteNotificationPlugin.py | 23 +++-- .../impl/dsProcessFileNotificationPlugin.py | 9 +- .../impl/experimentSessionControllerImpl.py | 87 ++++++++----------- .../service/impl/fileSystemObserver.py | 14 ++- .../service/impl/uploadTracker.py | 1 + 5 files changed, 67 insertions(+), 67 deletions(-) diff --git a/src/python/dm/daq_web_service/service/impl/daqProcessingCompleteNotificationPlugin.py b/src/python/dm/daq_web_service/service/impl/daqProcessingCompleteNotificationPlugin.py index 5e36dc72..3c85e356 100755 --- a/src/python/dm/daq_web_service/service/impl/daqProcessingCompleteNotificationPlugin.py +++ b/src/python/dm/daq_web_service/service/impl/daqProcessingCompleteNotificationPlugin.py @@ -4,6 +4,7 @@ import os from dm.common.utility.loggingManager import LoggingManager from dm.common.processing.plugins.fileProcessor import FileProcessor from uploadTracker import UploadTracker +from daqTracker import DaqTracker class DaqProcessingCompleteNotificationPlugin(FileProcessor): @@ -12,18 +13,24 @@ class DaqProcessingCompleteNotificationPlugin(FileProcessor): self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) def processFile(self, fileInfo): - uploadId = fileInfo.get('uploadId') filePath = fileInfo.get('filePath') + uploadId = fileInfo.get('uploadId') + daqId = fileInfo.get('daqInfo', {}).get('id') + trackedInfo = None if uploadId != None: self.logger.debug('Upload id for file %s: %s' %(filePath, uploadId)) - uploadInfo = UploadTracker.getInstance().get(uploadId) - if uploadInfo != None: - fileDict = uploadInfo.get('fileDict', {}) - uploadFileInfo = fileDict.get(filePath) - if uploadFileInfo: - uploadFileInfo['processed'] = True + trackedInfo = UploadTracker.getInstance().get(uploadId) + if daqId != None: + self.logger.debug('Daq id for file %s: %s' %(filePath, daqId)) + trackedInfo = DaqTracker.getInstance().get(daqId) + if trackedInfo != None: + fileDict = trackedInfo.get('fileDict', {}) + trackedFileInfo = fileDict.get(filePath) + if trackedFileInfo: + trackedFileInfo['processed'] = True else: - self.logger.error('Upload tracker does not have upload id %s' %(uploadId)) + self.logger.error('%s object does not have file path %s' %(trackedInfo, filePath)) + trackedInfo.updateStatus() ####################################################################### diff --git a/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py b/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py index 3485ab6b..55340ed3 100755 --- a/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py +++ b/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import os +import copy from dm.common.utility.loggingManager import LoggingManager from dm.common.processing.plugins.fileProcessor import FileProcessor from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory @@ -14,15 +15,17 @@ class DsProcessFileNotificationPlugin(FileProcessor): def processFile(self, fileInfo): experimentFilePath = fileInfo.get('experimentFilePath') - experiment = fileInfo.get('experiment') - experimentName = experiment.get('name') + experimentName = fileInfo.get('experimentName') self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName)) + daqInfo = copy.deepcopy(fileInfo.get('daqInfo', {})) + if daqInfo.has_key('fileDict'): + del daqInfo['fileDict'] # Prepare dictionary for processing. Only send needed data. fileInfo2 = {} fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName - fileInfo2['daqInfo'] = experiment.get('daqInfo') + fileInfo2['daqInfo'] = daqInfo self.dsFileApi.processFile(experimentFilePath, experimentName, fileInfo2) ####################################################################### diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index f556bf63..c047191b 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -7,6 +7,7 @@ import os import time import uuid +import copy from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager @@ -16,9 +17,12 @@ from dm.common.objects.observedFile import ObservedFile from dm.common.objects.uploadInfo import UploadInfo from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.utility.fileUtility import FileUtility +from dm.common.utility.timeUtility import TimeUtility +from dm.common.utility.dictUtility import DictUtility from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory from experimentTracker import ExperimentTracker from uploadTracker import UploadTracker +from daqTracker import DaqTracker from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): @@ -30,58 +34,56 @@ class ExperimentSessionControllerImpl(DmObjectManager): def startDaq(self, experimentName, dataDirectory, daqInfo): FileSystemObserver.getInstance().createDirectory(dataDirectory) - if daqInfo is None: - daqInfo={} - daqInfo['experimentName'] = experimentName - daqInfo['dataDirectory'] = dataDirectory - experiment = ExperimentTracker.getInstance().get(experimentName) - if experiment is not None: - oldDaqInfo = experiment.get('daqInfo') - if oldDaqInfo.get('daqEndTime') is None: - raise InvalidRequest('DAQ for experiment %s is already active in directory %s.' % (experimentName,oldDaqInfo.get('dataDirectory'))) - experiment = self.dsExperimentApi.getExperimentByName(experimentName) storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) - startTime = time.time() - daqInfo['daqStartTime'] = startTime - experiment['daqInfo'] = daqInfo - self.logger.debug('Starting DAQ %s' % daqInfo) + daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo) FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) - ExperimentTracker.getInstance().put(experimentName, experiment) - return experiment + return daqInfo - def stopDaq(self, experimentName): - experiment = ExperimentTracker.getInstance().get(experimentName) - if experiment is not None: - daqInfo = experiment.get('daqInfo') - if experiment is None or daqInfo.get('daqEndTime') is not None: - raise InvalidRequest('Experiment %s is not active.' % experimentName) - dataDirectory = daqInfo.get('dataDirectory') - daqInfo['daqEndTime'] = time.time() + def stopDaq(self, experimentName, dataDirectory): + experiment = self.dsExperimentApi.getExperimentByName(experimentName) + daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory) FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) - return experiment + return daqInfo.scrub() - def upload(self, daqInfo): - experimentName = daqInfo.get('experimentName') + def getDaqInfo(self, id): + daqInfo = DaqTracker.getInstance().getDaqInfo(id) + if not daqInfo: + raise ObjectNotFound('Daq id %s not found.' % id) + daqInfo.updateStatus() + return daqInfo.scrub() + + def upload(self, experimentName, dataDirectory, daqInfo): experiment = self.dsExperimentApi.getExperimentByName(experimentName) experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) - dataDirectory = daqInfo.get('dataDirectory') filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory) fileProcessingManager = FileProcessingManager.getInstance() uploadId = str(uuid.uuid4()) self.logger.debug('Starting upload id %s' % uploadId) - uploadInfo = UploadInfo() + uploadInfo = UploadInfo(daqInfo) uploadInfo['id'] = uploadId - uploadInfo['experiment'] = experimentName + uploadInfo['experimentName'] = experimentName + uploadInfo['storageDirectory'] = experiment.get('storageDirectory') + uploadInfo['storageHost'] = experiment.get('storageHost') uploadInfo['dataDirectory'] = dataDirectory + startTime = time.time() + uploadInfo['startTime'] = startTime + uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimeStamp(startTime) + daqInfo['experimentName'] = experimentName + daqInfo['storageDirectory'] = experiment.get('storageDirectory') + daqInfo['storageHost'] = experiment.get('storageHost') + daqInfo['dataDirectory'] = dataDirectory + daqInfo['uploadId'] = uploadId + fileDict = {} for filePath in filePaths: fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) + fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileUploadInfo = { 'processed' : False } FileUtility.statFile(filePath, fileUploadInfo) @@ -89,33 +91,14 @@ class ExperimentSessionControllerImpl(DmObjectManager): fileProcessingManager.processFile(fileInfo) uploadInfo['fileDict'] = fileDict #self.logger.debug('Upload info %s' % uploadInfo) - ExperimentTracker.getInstance().put(experimentName, experiment) UploadTracker.getInstance().put(uploadId, uploadInfo) - return uploadInfo + return uploadInfo.scrub() def getUploadInfo(self, id): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) - uploadStatus = uploadInfo.get('status', 'running') - if uploadStatus == 'complete': - return uploadInfo - fileDict = uploadInfo.get('fileDict') - nFiles = len(fileDict) - nProcessedFiles = 0 - for (filePath,uploadFileInfo) in fileDict.items(): - if uploadFileInfo.get('processed'): - nProcessedFiles += 1 - - # need to handle 'failed' uploads - if nProcessedFiles == nFiles: - uploadStatus = 'complete' - uploadInfo['status'] = uploadStatus - uploadInfo['nProcessedFiles'] = '%s/%s' % (nProcessedFiles,nFiles) - percentageComplete = 100.0 - if nFiles > 0: - percentageComplete = float(nProcessedFiles)/float(nFiles)*100.0 - uploadInfo['percentageComplete'] = '%.2f' % percentageComplete - return uploadInfo + uploadInfo.updateStatus() + return uploadInfo.scrub() diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index 6613f820..b5b15bcb 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -14,6 +14,7 @@ from dm.common.utility.threadingUtility import ThreadingUtility from dm.common.processing.fileProcessingManager import FileProcessingManager from dmFileSystemEventHandler import DmFileSystemEventHandler +from daqTracker import DaqTracker class FileSystemObserver(threading.Thread,Singleton): @@ -84,8 +85,13 @@ class FileSystemObserver(threading.Thread,Singleton): @ThreadingUtility.synchronize def fileUpdated(self, filePath, dataDirectory, experiment): + daqInfo = DaqTracker.getInstance().getDaqInfoByExperimentAndDataDirectory(experiment, dataDirectory) observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)) - observedFile.setLastUpdatedTimestampToNow() + observedFile.setLastUpdateTimeToNow() + if daqInfo: + daqFileDict = daqInfo['fileDict'] + daqFileDict[filePath] = observedFile + observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys() self.observedFileMap[filePath] = observedFile self.logger.debug('Observed file updated: %s', observedFile) @@ -94,7 +100,7 @@ class FileSystemObserver(threading.Thread,Singleton): now = time.time() filePathsForProcessing = [] for (filePath,observedFile) in self.observedFileMap.items(): - timestamp = observedFile.get('lastUpdateTimestamp') + timestamp = observedFile.get('lastUpdateTime') deltaT = now - timestamp if deltaT > self.minFileProcessingDelayInSeconds: self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT)) @@ -124,9 +130,9 @@ class FileSystemObserver(threading.Thread,Singleton): self.logger.debug('Exit flag set, %s done' % self.getName()) break try: - self.logger.debug('Checking observed files') - filePathsForProcessing = self.checkObservedFilesForProcessing() + if len(filePathsForProcessing): + self.logger.debug('Checking observed files') for filePath in filePathsForProcessing: self.processFile(filePath) except Exception, ex: diff --git a/src/python/dm/daq_web_service/service/impl/uploadTracker.py b/src/python/dm/daq_web_service/service/impl/uploadTracker.py index 3f3adc93..990fae1d 100755 --- a/src/python/dm/daq_web_service/service/impl/uploadTracker.py +++ b/src/python/dm/daq_web_service/service/impl/uploadTracker.py @@ -7,6 +7,7 @@ class UploadTracker(ObjectTracker): # Cache configuration objectClass = UploadInfo + cacheSize = 100 #################################################################### # Testing -- GitLab