Skip to content
Snippets Groups Projects
Commit 51bc9eae authored by sveseli's avatar sveseli
Browse files

added support for list-daqs and list-uploads commands

parent 6dc8bb40
No related branches found
No related tags found
No related merge requests found
Showing
with 256 additions and 16 deletions
#!/bin/sh
# Run command
if [ -z $DM_ROOT_DIR ]; then
cd `dirname $0` && myDir=`pwd`
setupFile=$myDir/../setup.sh
if [ ! -f $setupFile ]; then
echo "Cannot find setup file: $setupFile"
exit 1
fi
source $setupFile > /dev/null
fi
$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/listDaqsCli.py $@
#!/bin/sh
# Run command
if [ -z $DM_ROOT_DIR ]; then
cd `dirname $0` && myDir=`pwd`
setupFile=$myDir/../setup.sh
if [ ! -f $setupFile ]; then
echo "Cannot find setup file: $setupFile"
exit 1
fi
source $setupFile > /dev/null
fi
$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/listUploadsCli.py $@
Release 0.10 (03/xx/2016)
=============================
- Added dm-list-daqs and dm-list-uploads commands
Release 0.9 (02/25/2016)
=============================
......
#!/usr/bin/env python
#######################################################################
DM_PROCESSING_STATUS_ANY = 'any'
DM_PROCESSING_STATUS_PENDING = 'pending'
DM_PROCESSING_STATUS_RUNNING = 'running'
DM_PROCESSING_STATUS_DONE = 'done'
DM_PROCESSING_STATUS_FAILED = 'failed'
DM_PROCESSING_STATUS_SKIPPED = 'skipped'
DM_PROCESSING_STATUS_ABORTING = 'aborting'
DM_PROCESSING_STATUS_ABORTED = 'aborted'
DM_ALLOWED_PROCESSING_STATUS_LIST = [
DM_PROCESSING_STATUS_ANY,
DM_PROCESSING_STATUS_PENDING,
DM_PROCESSING_STATUS_RUNNING,
DM_PROCESSING_STATUS_DONE,
DM_PROCESSING_STATUS_FAILED,
DM_PROCESSING_STATUS_SKIPPED,
DM_PROCESSING_STATUS_ABORTING,
DM_PROCESSING_STATUS_ABORTED
]
......@@ -87,6 +87,10 @@ class ObjectCache:
id, item, updateTime, expirationTime = itemTuple
return item
def getAll(self):
# Item tuple: id, item, updateTime, expirationTime = itemTuple
return map(lambda itemTuple:itemTuple[1], self.objectMap.values())
def getItemTuple(self, id):
itemTuple = self.objectMap.get(id)
if itemTuple is None:
......
......@@ -37,6 +37,9 @@ class ObjectTracker(Singleton):
def get(self, id):
return self.objectCache.get(id)
def getAll(self):
return self.objectCache.getAll()
def remove(self, id):
return self.objectCache.remove(id)
......
......@@ -4,6 +4,7 @@ import os
import json
import urllib
from dm.common.constants import dmProcessingStatus
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.objects.experiment import Experiment
......@@ -46,6 +47,14 @@ class ExperimentRestApi(DaqRestApi):
responseDict = self.sendSessionRequest(url=url, method='GET')
return DaqInfo(responseDict)
@DaqRestApi.execute
def listDaqs(self, status=None):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, DaqInfo)
@DaqRestApi.execute
def upload(self, experimentName, dataDirectory, daqInfo={}):
if not experimentName:
......@@ -65,6 +74,14 @@ class ExperimentRestApi(DaqRestApi):
responseDict = self.sendSessionRequest(url=url, method='GET')
return UploadInfo(responseDict)
@DaqRestApi.execute
def listUploads(self, status=None):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, UploadInfo)
@DaqRestApi.execute
def stopUpload(self, id):
url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id)
......
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListDaqsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-daqs [--status=STATUS]
Description:
Retrieves all known daqs.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqs = api.listDaqs(self.getStatus())
for daq in daqs:
print daq.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListDaqsCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListUploadsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-uploads [--status=STATUS]
Description:
Retrieves all known uploads.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploads = api.listUploads(self.getStatus())
for upload in uploads:
print upload.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListUploadsCli()
cli.run()
......@@ -46,6 +46,15 @@ class ExperimentRouteDescriptor:
'method' : ['GET']
},
# List DAQs
{
'name' : 'listDaqs',
'path' : '%s/experimentDaqsByStatus/:(status)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'listDaqs',
'method' : ['GET']
},
# Upload experiment data
{
'name' : 'upload',
......@@ -64,6 +73,15 @@ class ExperimentRouteDescriptor:
'method' : ['GET']
},
# List uploads
{
'name' : 'listUploads',
'path' : '%s/experimentUploadsByStatus/:(status)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'listUploads',
'method' : ['GET']
},
# Stop upload
{
'name' : 'stopUpload',
......
......@@ -5,6 +5,7 @@ import json
import os
from dm.common.constants import dmProcessingMode
from dm.common.constants import dmProcessingStatus
from dm.common.service.dmSessionController import DmSessionController
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.utility.encoder import Encoder
......@@ -62,6 +63,16 @@ class ExperimentSessionController(DmSessionController):
self.logger.debug('Returning: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def listDaqs(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, **kwargs):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
if status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Invalid processing status "%s". Status must be one of %s.' % (status,dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST))
return self.listToJson(self.experimentSessionControllerImpl.listDaqs(status))
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
......@@ -97,6 +108,16 @@ class ExperimentSessionController(DmSessionController):
self.logger.debug('Returning info for upload id %s' % id)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def listUploads(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, **kwargs):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
if status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Invalid processing status "%s". Status must be one of %s.' % (status,dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST))
return self.listToJson(self.experimentSessionControllerImpl.listUploads(status))
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
......
......@@ -4,6 +4,7 @@ import os
import uuid
import time
from dm.common.constants import dmProcessingStatus
from dm.common.objects.daqInfo import DaqInfo
from dm.common.utility.objectTracker import ObjectTracker
from dm.common.utility.timeUtility import TimeUtility
......@@ -41,8 +42,9 @@ class DaqTracker(ObjectTracker):
daqInfo2 = DaqInfo(daqInfo)
daqInfo2['nFiles'] = 0
daqInfo2['nProcessedFiles'] = 0
daqInfo2['nProcessedFiles'] = 0
daqInfo2['nProcessingErrors'] = 0
daqInfo2['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
startTime = time.time()
daqInfo2['startTime'] = startTime
......@@ -69,6 +71,16 @@ class DaqTracker(ObjectTracker):
def getDaqInfo(self, id):
return self.get(id)
def getDaqInfos(self, status=None):
daqInfoList = self.getAll()
if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY:
return daqInfoList
filteredDaqInfoList = []
for daqInfo in daqInfoList:
if daqInfo.get('status', '') == status:
filteredDaqInfoList.append(daqInfo)
return filteredDaqInfoList
def getDaqInfoByExperimentAndDataDirectory(self, experiment, dataDirectory):
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
......@@ -87,6 +99,7 @@ if __name__ == '__main__':
daqId = daqInfo['id']
print 'DAQ ID: ', daqId
print 'DAQ INFO: ', tracker.getDaqInfo(daqId)
print 'DAQS: ', tracker.getDaqInfos()
print 'REMOVED DAQ: ', tracker.stopDaq(experiment, dataDirectory)
dataDirectory = 'ftp:///wolf:2811///data/e1'
daqId = tracker.startDaq(experiment, dataDirectory)
......
......@@ -10,6 +10,7 @@ import uuid
import copy
import threading
from dm.common.constants import dmProcessingStatus
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
......@@ -62,6 +63,10 @@ class ExperimentSessionControllerImpl(DmObjectManager):
daqInfo.updateStatus()
return daqInfo
def listDaqs(self, status):
daqInfoList = DaqTracker.getInstance().getDaqInfos(status)
return daqInfoList
def uploadFiles(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
......@@ -122,7 +127,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = 'running'
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment, filePathsDict])
timer.start()
......@@ -162,11 +167,15 @@ class ExperimentSessionControllerImpl(DmObjectManager):
uploadInfo.updateStatus()
return uploadInfo
def listUploads(self, status):
uploadInfoList = UploadTracker.getInstance().getUploadInfos(status)
return uploadInfoList
def stopUpload(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadInfo['status'] = 'aborting'
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING
uploadInfo.updateStatus()
return uploadInfo
......@@ -212,16 +221,16 @@ class ExperimentSessionControllerImpl(DmObjectManager):
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
if processorName in skipPlugins:
processingInfo[processorName] = {'status' : 'skipped'}
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED}
else:
self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict])
processingInfo[processorName] = {'status' : 'pending'}
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_PENDING}
timer.start()
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = 'running'
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
return uploadInfo
def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict):
......@@ -234,8 +243,8 @@ class ExperimentSessionControllerImpl(DmObjectManager):
dependsOn = processor.dependsOn
while True:
# Check status
if uploadInfo['status'] == 'aborting':
processingInfo[processorName]['status'] = 'aborted'
if uploadInfo['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED
return
# Check that processor can proceed
......@@ -247,9 +256,9 @@ class ExperimentSessionControllerImpl(DmObjectManager):
if depProcessorStatus in ['skipped', 'aborted', 'failed']:
# We must skip processing
self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = 'skipped'
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED
return
elif depProcessorStatus in ['pending', 'running']:
elif depProcessorStatus in [dmProcessingStatus.DM_PROCESSING_STATUS_PENDING, dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING]:
# Do nothing
pass
elif depProcessorStatus == 'done':
......@@ -258,7 +267,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
else:
# This should not happen
self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = 'skipped'
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED
return
# Process directory if we can
if canProcess:
......@@ -267,11 +276,11 @@ class ExperimentSessionControllerImpl(DmObjectManager):
'experiment' : experiment,
'filePathsDict' : filePathsDict
}
processingInfo[processorName]['status'] = 'running'
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
processingStartTime = time.time()
processor.processDirectory(directoryInfo)
if processingInfo[processorName]['status'] == 'running':
processingInfo[processorName]['status'] = 'done'
if processingInfo[processorName]['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING:
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName))
else:
self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status']))
......@@ -282,7 +291,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
except Exception, ex:
self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex)))
processingInfo[processorName]['status'] = 'failed'
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
processingInfo[processorName]['processingError'] = str(ex)
processingEndTime = time.time()
......
#!/usr/bin/env python
import os
from dm.common.constants import dmProcessingStatus
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.utility.objectTracker import ObjectTracker
from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists
......@@ -24,7 +25,7 @@ class UploadTracker(ObjectTracker):
uploadInfo = self.get(uploadId)
if uploadInfo is not None:
uploadInfo.updateStatus()
if uploadInfo.get('status') == 'running':
if uploadInfo.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING:
raise ObjectAlreadyExists('Upload id %s is already active for experiment %s in data directory %s.' % (uploadId, experimentName, dataDir))
del self.activeUploadDict[activeUploadKey]
......@@ -36,6 +37,18 @@ class UploadTracker(ObjectTracker):
self.activeUploadDict[activeUploadKey] = uploadId
self.put(uploadId, uploadInfo)
def getUploadInfo(self, id):
return self.get(id)
def getUploadInfos(self, status=None):
uploadInfoList = self.getAll()
filteredUploadInfoList = []
for uploadInfo in uploadInfoList:
uploadInfo.updateStatus()
if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY or uploadInfo.get('status', '') == status:
filteredUploadInfoList.append(uploadInfo)
return filteredUploadInfoList
####################################################################
# Testing
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment