#!/usr/bin/env python import os import uuid import time from dm.common.constants import dmProcessingStatus from dm.common.objects.daqInfo import DaqInfo from dm.common.utility.objectTracker import ObjectTracker from dm.common.utility.timeUtility import TimeUtility from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists from dm.common.exceptions.objectNotFound import ObjectNotFound class DaqTracker(ObjectTracker): # Cache configuration objectClass = DaqInfo cacheSize = 100 def __init__(self, *args, **kwargs): ObjectTracker.__init__(self, args, kwargs) self.activeDaqDict = {} def startDaq(self, experiment, dataDirectory, daqInfo={}): # Prevent second daq to be started in the same directory experimentName = experiment.get('name') dataDir = os.path.normpath(dataDirectory) activeDaqKey = experimentName + dataDir oldDaqInfo = self.activeDaqDict.get(activeDaqKey) if oldDaqInfo: raise ObjectAlreadyExists('DAQ is already active for experiment %s in data directory %s.' % (experimentName, dataDirectory)) daqId = str(uuid.uuid4()) daqInfo['id'] = daqId daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') daqInfo['storageUrl'] = experiment.get('storageUrl') daqInfo['dataDirectory'] = dataDirectory # Create DaqInfo object with keys that we want to save with file # metadata, and add other keys later daqInfo2 = DaqInfo(daqInfo) daqInfo2['nFiles'] = 0 daqInfo2['nProcessedFiles'] = 0 daqInfo2['nProcessingErrors'] = 0 daqInfo2['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING startTime = time.time() daqInfo2['startTime'] = startTime daqInfo2['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime) self.activeDaqDict[activeDaqKey] = daqInfo2 self.put(daqId, daqInfo2) return daqInfo2 def stopDaq(self, experiment, dataDirectory): experimentName = experiment.get('name') dataDir = os.path.normpath(dataDirectory) activeDaqKey = experimentName + dataDir daqInfo = self.activeDaqDict.get(activeDaqKey) if not daqInfo: raise ObjectNotFound('DAQ is not active for experiment %s in data directory %s.' % (experimentName, dataDirectory)) endTime = time.time() daqInfo['endTime'] = endTime daqInfo['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime) daqInfo.updateStatus() del self.activeDaqDict[activeDaqKey] return daqInfo def getDaqInfo(self, id): return self.get(id) def updateDaqInfos(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING): daqInfoList = self.getAll() for daqInfo in daqInfoList: if daqInfo.get('status', '') == status: daqInfo.updateStatus() def getDaqInfos(self, status=None): daqInfoList = self.getAll() if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY: return daqInfoList filteredDaqInfoList = [] for daqInfo in daqInfoList: if daqInfo.get('status', '') == status: filteredDaqInfoList.append(daqInfo) return filteredDaqInfoList def getDaqInfoByExperimentAndDataDirectory(self, experiment, dataDirectory): experimentName = experiment.get('name') dataDir = os.path.normpath(dataDirectory) activeDaqKey = experimentName + dataDir return self.activeDaqDict.get(activeDaqKey) #################################################################### # Testing if __name__ == '__main__': tracker = DaqTracker.getInstance() print tracker experiment = {'name' : 'e1', 'owner' : 'sv'} dataDirectory = 'ftp://wolf:2811/data/e1' daqInfo = tracker.startDaq(experiment, dataDirectory) daqId = daqInfo['id'] print 'DAQ ID: ', daqId print 'DAQ INFO: ', tracker.getDaqInfo(daqId) print 'DAQS: ', tracker.getDaqInfos() print 'REMOVED DAQ: ', tracker.stopDaq(experiment, dataDirectory) dataDirectory = 'ftp:///wolf:2811///data/e1' daqId = tracker.startDaq(experiment, dataDirectory) print 'DAQ ID: ', daqId