Newer
Older
#!/usr/bin/env python
import os
import uuid
import time
from dm.common.constants import dmProcessingStatus
from dm.common.objects.daqInfo import DaqInfo
from dm.common.utility.objectTracker import ObjectTracker
from dm.common.utility.timeUtility import TimeUtility
from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists
from dm.common.exceptions.objectNotFound import ObjectNotFound
class DaqTracker(ObjectTracker):
# Cache configuration
objectClass = DaqInfo
cacheSize = 100
def __init__(self, *args, **kwargs):
ObjectTracker.__init__(self, args, kwargs)
self.activeDaqDict = {}
def startDaq(self, experiment, dataDirectory, daqInfo={}):
# Prevent second daq to be started in the same directory
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeDaqKey = experimentName + dataDir
oldDaqInfo = self.activeDaqDict.get(activeDaqKey)
if oldDaqInfo:
raise ObjectAlreadyExists('DAQ is already active for experiment %s in data directory %s.' % (experimentName, dataDirectory))
daqId = str(uuid.uuid4())
daqInfo['id'] = daqId
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
# Create DaqInfo object with keys that we want to save with file
# metadata, and add other keys later
daqInfo2['nFiles'] = 0
daqInfo2['nProcessedFiles'] = 0
daqInfo2['nProcessingErrors'] = 0
daqInfo2['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
daqInfo2['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
self.activeDaqDict[activeDaqKey] = daqInfo2
self.put(daqId, daqInfo2)
return daqInfo2
def stopDaq(self, experiment, dataDirectory):
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeDaqKey = experimentName + dataDir
daqInfo = self.activeDaqDict.get(activeDaqKey)
if not daqInfo:
raise ObjectNotFound('DAQ is not active for experiment %s in data directory %s.' % (experimentName, dataDirectory))
endTime = time.time()
daqInfo['endTime'] = endTime
daqInfo['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
daqInfo.updateStatus()
del self.activeDaqDict[activeDaqKey]
return daqInfo
def getDaqInfo(self, id):
return self.get(id)
def updateDaqInfos(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING):
daqInfoList = self.getAll()
for daqInfo in daqInfoList:
if daqInfo.get('status', '') == status:
daqInfo.updateStatus()
def getDaqInfos(self, status=None):
daqInfoList = self.getAll()
if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY:
return daqInfoList
filteredDaqInfoList = []
for daqInfo in daqInfoList:
if daqInfo.get('status', '') == status:
filteredDaqInfoList.append(daqInfo)
return filteredDaqInfoList
def getDaqInfoByExperimentAndDataDirectory(self, experiment, dataDirectory):
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeDaqKey = experimentName + dataDir
return self.activeDaqDict.get(activeDaqKey)
####################################################################
# Testing
if __name__ == '__main__':
tracker = DaqTracker.getInstance()
print tracker
experiment = {'name' : 'e1', 'owner' : 'sv'}
dataDirectory = 'ftp://wolf:2811/data/e1'
daqInfo = tracker.startDaq(experiment, dataDirectory)
daqId = daqInfo['id']
print 'DAQ ID: ', daqId
print 'DAQ INFO: ', tracker.getDaqInfo(daqId)
print 'DAQS: ', tracker.getDaqInfos()