Skip to content
Snippets Groups Projects
Commit e0ccb58d authored by sveseli's avatar sveseli
Browse files

changes that allow simultaneous data acquisitions for the same experiment

parent b1b99190
No related branches found
No related tags found
No related merge requests found
......@@ -4,6 +4,7 @@ import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.processing.plugins.fileProcessor import FileProcessor
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
class DaqProcessingCompleteNotificationPlugin(FileProcessor):
......@@ -12,18 +13,24 @@ class DaqProcessingCompleteNotificationPlugin(FileProcessor):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
uploadId = fileInfo.get('uploadId')
filePath = fileInfo.get('filePath')
uploadId = fileInfo.get('uploadId')
daqId = fileInfo.get('daqInfo', {}).get('id')
trackedInfo = None
if uploadId != None:
self.logger.debug('Upload id for file %s: %s' %(filePath, uploadId))
uploadInfo = UploadTracker.getInstance().get(uploadId)
if uploadInfo != None:
fileDict = uploadInfo.get('fileDict', {})
uploadFileInfo = fileDict.get(filePath)
if uploadFileInfo:
uploadFileInfo['processed'] = True
trackedInfo = UploadTracker.getInstance().get(uploadId)
if daqId != None:
self.logger.debug('Daq id for file %s: %s' %(filePath, daqId))
trackedInfo = DaqTracker.getInstance().get(daqId)
if trackedInfo != None:
fileDict = trackedInfo.get('fileDict', {})
trackedFileInfo = fileDict.get(filePath)
if trackedFileInfo:
trackedFileInfo['processed'] = True
else:
self.logger.error('Upload tracker does not have upload id %s' %(uploadId))
self.logger.error('%s object does not have file path %s' %(trackedInfo, filePath))
trackedInfo.updateStatus()
#######################################################################
......
#!/usr/bin/env python
import os
import copy
from dm.common.utility.loggingManager import LoggingManager
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
......@@ -14,15 +15,17 @@ class DsProcessFileNotificationPlugin(FileProcessor):
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experiment = fileInfo.get('experiment')
experimentName = experiment.get('name')
experimentName = fileInfo.get('experimentName')
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
daqInfo = copy.deepcopy(fileInfo.get('daqInfo', {}))
if daqInfo.has_key('fileDict'):
del daqInfo['fileDict']
# Prepare dictionary for processing. Only send needed data.
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileInfo2['daqInfo'] = experiment.get('daqInfo')
fileInfo2['daqInfo'] = daqInfo
self.dsFileApi.processFile(experimentFilePath, experimentName, fileInfo2)
#######################################################################
......
......@@ -7,6 +7,7 @@
import os
import time
import uuid
import copy
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
......@@ -16,9 +17,12 @@ from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.dictUtility import DictUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
......@@ -30,58 +34,56 @@ class ExperimentSessionControllerImpl(DmObjectManager):
def startDaq(self, experimentName, dataDirectory, daqInfo):
FileSystemObserver.getInstance().createDirectory(dataDirectory)
if daqInfo is None:
daqInfo={}
daqInfo['experimentName'] = experimentName
daqInfo['dataDirectory'] = dataDirectory
experiment = ExperimentTracker.getInstance().get(experimentName)
if experiment is not None:
oldDaqInfo = experiment.get('daqInfo')
if oldDaqInfo.get('daqEndTime') is None:
raise InvalidRequest('DAQ for experiment %s is already active in directory %s.' % (experimentName,oldDaqInfo.get('dataDirectory')))
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
startTime = time.time()
daqInfo['daqStartTime'] = startTime
experiment['daqInfo'] = daqInfo
self.logger.debug('Starting DAQ %s' % daqInfo)
daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo)
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
ExperimentTracker.getInstance().put(experimentName, experiment)
return experiment
return daqInfo
def stopDaq(self, experimentName):
experiment = ExperimentTracker.getInstance().get(experimentName)
if experiment is not None:
daqInfo = experiment.get('daqInfo')
if experiment is None or daqInfo.get('daqEndTime') is not None:
raise InvalidRequest('Experiment %s is not active.' % experimentName)
dataDirectory = daqInfo.get('dataDirectory')
daqInfo['daqEndTime'] = time.time()
def stopDaq(self, experimentName, dataDirectory):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
return experiment
return daqInfo.scrub()
def upload(self, daqInfo):
experimentName = daqInfo.get('experimentName')
def getDaqInfo(self, id):
daqInfo = DaqTracker.getInstance().getDaqInfo(id)
if not daqInfo:
raise ObjectNotFound('Daq id %s not found.' % id)
daqInfo.updateStatus()
return daqInfo.scrub()
def upload(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
dataDirectory = daqInfo.get('dataDirectory')
filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = UploadInfo()
uploadInfo = UploadInfo(daqInfo)
uploadInfo['id'] = uploadId
uploadInfo['experiment'] = experimentName
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['dataDirectory'] = dataDirectory
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimeStamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
fileDict = {}
for filePath in filePaths:
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId
fileUploadInfo = { 'processed' : False }
FileUtility.statFile(filePath, fileUploadInfo)
......@@ -89,33 +91,14 @@ class ExperimentSessionControllerImpl(DmObjectManager):
fileProcessingManager.processFile(fileInfo)
uploadInfo['fileDict'] = fileDict
#self.logger.debug('Upload info %s' % uploadInfo)
ExperimentTracker.getInstance().put(experimentName, experiment)
UploadTracker.getInstance().put(uploadId, uploadInfo)
return uploadInfo
return uploadInfo.scrub()
def getUploadInfo(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadStatus = uploadInfo.get('status', 'running')
if uploadStatus == 'complete':
return uploadInfo
fileDict = uploadInfo.get('fileDict')
nFiles = len(fileDict)
nProcessedFiles = 0
for (filePath,uploadFileInfo) in fileDict.items():
if uploadFileInfo.get('processed'):
nProcessedFiles += 1
# need to handle 'failed' uploads
if nProcessedFiles == nFiles:
uploadStatus = 'complete'
uploadInfo['status'] = uploadStatus
uploadInfo['nProcessedFiles'] = '%s/%s' % (nProcessedFiles,nFiles)
percentageComplete = 100.0
if nFiles > 0:
percentageComplete = float(nProcessedFiles)/float(nFiles)*100.0
uploadInfo['percentageComplete'] = '%.2f' % percentageComplete
return uploadInfo
uploadInfo.updateStatus()
return uploadInfo.scrub()
......@@ -14,6 +14,7 @@ from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dmFileSystemEventHandler import DmFileSystemEventHandler
from daqTracker import DaqTracker
class FileSystemObserver(threading.Thread,Singleton):
......@@ -84,8 +85,13 @@ class FileSystemObserver(threading.Thread,Singleton):
@ThreadingUtility.synchronize
def fileUpdated(self, filePath, dataDirectory, experiment):
daqInfo = DaqTracker.getInstance().getDaqInfoByExperimentAndDataDirectory(experiment, dataDirectory)
observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment))
observedFile.setLastUpdatedTimestampToNow()
observedFile.setLastUpdateTimeToNow()
if daqInfo:
daqFileDict = daqInfo['fileDict']
daqFileDict[filePath] = observedFile
observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys()
self.observedFileMap[filePath] = observedFile
self.logger.debug('Observed file updated: %s', observedFile)
......@@ -94,7 +100,7 @@ class FileSystemObserver(threading.Thread,Singleton):
now = time.time()
filePathsForProcessing = []
for (filePath,observedFile) in self.observedFileMap.items():
timestamp = observedFile.get('lastUpdateTimestamp')
timestamp = observedFile.get('lastUpdateTime')
deltaT = now - timestamp
if deltaT > self.minFileProcessingDelayInSeconds:
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
......@@ -124,9 +130,9 @@ class FileSystemObserver(threading.Thread,Singleton):
self.logger.debug('Exit flag set, %s done' % self.getName())
break
try:
self.logger.debug('Checking observed files')
filePathsForProcessing = self.checkObservedFilesForProcessing()
if len(filePathsForProcessing):
self.logger.debug('Checking observed files')
for filePath in filePathsForProcessing:
self.processFile(filePath)
except Exception, ex:
......
......@@ -7,6 +7,7 @@ class UploadTracker(ObjectTracker):
# Cache configuration
objectClass = UploadInfo
cacheSize = 100
####################################################################
# Testing
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment