Skip to content
Snippets Groups Projects
daqTracker.py 4.31 KiB
Newer Older
sveseli's avatar
sveseli committed
#!/usr/bin/env python

import os
import uuid
import time

from dm.common.constants import dmProcessingStatus
sveseli's avatar
sveseli committed
from dm.common.objects.daqInfo import DaqInfo
from dm.common.utility.objectTracker import ObjectTracker
from dm.common.utility.timeUtility import TimeUtility
from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists
from dm.common.exceptions.objectNotFound import ObjectNotFound

class DaqTracker(ObjectTracker):

    # Cache configuration
    objectClass = DaqInfo
    cacheSize = 100

    def __init__(self, *args, **kwargs):
        ObjectTracker.__init__(self, args, kwargs)
        self.activeDaqDict = {}

    def startDaq(self, experiment, dataDirectory, daqInfo={}):
        # Prevent second daq to be started in the same directory
        experimentName = experiment.get('name') 
        dataDir = os.path.normpath(dataDirectory)
        activeDaqKey = experimentName + dataDir
        oldDaqInfo = self.activeDaqDict.get(activeDaqKey)
        if oldDaqInfo:
            raise ObjectAlreadyExists('DAQ is already active for experiment %s in data directory %s.' % (experimentName, dataDirectory))
        daqId = str(uuid.uuid4())
        daqInfo['id'] =  daqId 
        daqInfo['experimentName'] = experimentName
        daqInfo['storageDirectory'] = experiment.get('storageDirectory')
        daqInfo['storageHost'] = experiment.get('storageHost')
sveseli's avatar
sveseli committed
        daqInfo['storageUrl'] = experiment.get('storageUrl')
sveseli's avatar
sveseli committed
        daqInfo['dataDirectory'] = dataDirectory

        # Create DaqInfo object with keys that we want to save with file
        # metadata, and add other keys later
sveseli's avatar
sveseli committed
        daqInfo2 = DaqInfo(daqInfo)

        daqInfo2['nProcessedFiles'] = 0
        daqInfo2['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
sveseli's avatar
sveseli committed
        startTime = time.time()
        daqInfo2['startTime'] = startTime
        daqInfo2['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
sveseli's avatar
sveseli committed

        self.activeDaqDict[activeDaqKey] = daqInfo2
        self.put(daqId, daqInfo2)
        return daqInfo2

    def stopDaq(self, experiment, dataDirectory):
        experimentName = experiment.get('name') 
        dataDir = os.path.normpath(dataDirectory)
        activeDaqKey = experimentName + dataDir
        daqInfo = self.activeDaqDict.get(activeDaqKey)
        if not daqInfo:
            raise ObjectNotFound('DAQ is not active for experiment %s in data directory %s.' % (experimentName, dataDirectory))
        endTime = time.time()
        daqInfo['endTime'] = endTime
        daqInfo['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
sveseli's avatar
sveseli committed
        daqInfo.updateStatus()
        del self.activeDaqDict[activeDaqKey]
        return daqInfo

    def getDaqInfo(self, id):
        return self.get(id)

    def updateDaqInfos(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING):

        daqInfoList = self.getAll()
        for daqInfo in daqInfoList:
            if daqInfo.get('status', '') == status:
                daqInfo.updateStatus()

    def getDaqInfos(self, status=None):
        daqInfoList = self.getAll()
        if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY:
            return daqInfoList
        filteredDaqInfoList = []
        for daqInfo in daqInfoList:
            if daqInfo.get('status', '') == status:
                filteredDaqInfoList.append(daqInfo)
        return filteredDaqInfoList

sveseli's avatar
sveseli committed
    def getDaqInfoByExperimentAndDataDirectory(self, experiment, dataDirectory):
        experimentName = experiment.get('name') 
        dataDir = os.path.normpath(dataDirectory)
        activeDaqKey = experimentName + dataDir
        return self.activeDaqDict.get(activeDaqKey)

####################################################################
# Testing

if __name__ == '__main__':
    tracker = DaqTracker.getInstance()
    print tracker
    experiment = {'name' : 'e1', 'owner' : 'sv'}
    dataDirectory = 'ftp://wolf:2811/data/e1'
    daqInfo = tracker.startDaq(experiment, dataDirectory) 
    daqId = daqInfo['id']
    print 'DAQ ID: ', daqId
    print 'DAQ INFO: ', tracker.getDaqInfo(daqId)
    print 'DAQS: ', tracker.getDaqInfos()
sveseli's avatar
sveseli committed
    print 'REMOVED DAQ: ', tracker.stopDaq(experiment, dataDirectory) 
    dataDirectory = 'ftp:///wolf:2811///data/e1'
    daqId = tracker.startDaq(experiment, dataDirectory) 
    print 'DAQ ID: ', daqId