From 51bc9eae6ca6c6affdbd14897ba60c7a411f2b14 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Mon, 7 Mar 2016 03:23:37 +0000 Subject: [PATCH] added support for list-daqs and list-uploads commands --- bin/dm-list-daqs | 17 ++++++++ bin/dm-list-uploads | 17 ++++++++ doc/RELEASE_NOTES.txt | 5 +++ .../dm/common/constants/dmProcessingStatus.py | 23 +++++++++++ src/python/dm/common/utility/objectCache.py | 4 ++ src/python/dm/common/utility/objectTracker.py | 3 ++ .../daq_web_service/api/experimentRestApi.py | 17 ++++++++ .../dm/daq_web_service/cli/listDaqsCli.py | 40 +++++++++++++++++++ .../dm/daq_web_service/cli/listUploadsCli.py | 40 +++++++++++++++++++ .../service/experimentRouteDescriptor.py | 18 +++++++++ .../service/experimentSessionController.py | 21 ++++++++++ .../service/impl/daqTracker.py | 15 ++++++- .../impl/experimentSessionControllerImpl.py | 37 ++++++++++------- .../service/impl/uploadTracker.py | 15 ++++++- 14 files changed, 256 insertions(+), 16 deletions(-) create mode 100755 bin/dm-list-daqs create mode 100755 bin/dm-list-uploads create mode 100755 src/python/dm/common/constants/dmProcessingStatus.py create mode 100755 src/python/dm/daq_web_service/cli/listDaqsCli.py create mode 100755 src/python/dm/daq_web_service/cli/listUploadsCli.py diff --git a/bin/dm-list-daqs b/bin/dm-list-daqs new file mode 100755 index 00000000..f7c592fd --- /dev/null +++ b/bin/dm-list-daqs @@ -0,0 +1,17 @@ +#!/bin/sh + +# Run command + +if [ -z $DM_ROOT_DIR ]; then + cd `dirname $0` && myDir=`pwd` + setupFile=$myDir/../setup.sh + if [ ! -f $setupFile ]; then + echo "Cannot find setup file: $setupFile" + exit 1 + fi + source $setupFile > /dev/null +fi + +$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/listDaqsCli.py $@ + + diff --git a/bin/dm-list-uploads b/bin/dm-list-uploads new file mode 100755 index 00000000..e4f62b84 --- /dev/null +++ b/bin/dm-list-uploads @@ -0,0 +1,17 @@ +#!/bin/sh + +# Run command + +if [ -z $DM_ROOT_DIR ]; then + cd `dirname $0` && myDir=`pwd` + setupFile=$myDir/../setup.sh + if [ ! -f $setupFile ]; then + echo "Cannot find setup file: $setupFile" + exit 1 + fi + source $setupFile > /dev/null +fi + +$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/listUploadsCli.py $@ + + diff --git a/doc/RELEASE_NOTES.txt b/doc/RELEASE_NOTES.txt index c5ed8e21..95b81b44 100644 --- a/doc/RELEASE_NOTES.txt +++ b/doc/RELEASE_NOTES.txt @@ -1,3 +1,8 @@ +Release 0.10 (03/xx/2016) +============================= + +- Added dm-list-daqs and dm-list-uploads commands + Release 0.9 (02/25/2016) ============================= diff --git a/src/python/dm/common/constants/dmProcessingStatus.py b/src/python/dm/common/constants/dmProcessingStatus.py new file mode 100755 index 00000000..439062da --- /dev/null +++ b/src/python/dm/common/constants/dmProcessingStatus.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python + +####################################################################### + +DM_PROCESSING_STATUS_ANY = 'any' +DM_PROCESSING_STATUS_PENDING = 'pending' +DM_PROCESSING_STATUS_RUNNING = 'running' +DM_PROCESSING_STATUS_DONE = 'done' +DM_PROCESSING_STATUS_FAILED = 'failed' +DM_PROCESSING_STATUS_SKIPPED = 'skipped' +DM_PROCESSING_STATUS_ABORTING = 'aborting' +DM_PROCESSING_STATUS_ABORTED = 'aborted' + +DM_ALLOWED_PROCESSING_STATUS_LIST = [ + DM_PROCESSING_STATUS_ANY, + DM_PROCESSING_STATUS_PENDING, + DM_PROCESSING_STATUS_RUNNING, + DM_PROCESSING_STATUS_DONE, + DM_PROCESSING_STATUS_FAILED, + DM_PROCESSING_STATUS_SKIPPED, + DM_PROCESSING_STATUS_ABORTING, + DM_PROCESSING_STATUS_ABORTED +] diff --git a/src/python/dm/common/utility/objectCache.py b/src/python/dm/common/utility/objectCache.py index 81829ae5..7969d303 100755 --- a/src/python/dm/common/utility/objectCache.py +++ b/src/python/dm/common/utility/objectCache.py @@ -87,6 +87,10 @@ class ObjectCache: id, item, updateTime, expirationTime = itemTuple return item + def getAll(self): + # Item tuple: id, item, updateTime, expirationTime = itemTuple + return map(lambda itemTuple:itemTuple[1], self.objectMap.values()) + def getItemTuple(self, id): itemTuple = self.objectMap.get(id) if itemTuple is None: diff --git a/src/python/dm/common/utility/objectTracker.py b/src/python/dm/common/utility/objectTracker.py index 66fd49cc..1e747bc7 100755 --- a/src/python/dm/common/utility/objectTracker.py +++ b/src/python/dm/common/utility/objectTracker.py @@ -37,6 +37,9 @@ class ObjectTracker(Singleton): def get(self, id): return self.objectCache.get(id) + def getAll(self): + return self.objectCache.getAll() + def remove(self, id): return self.objectCache.remove(id) diff --git a/src/python/dm/daq_web_service/api/experimentRestApi.py b/src/python/dm/daq_web_service/api/experimentRestApi.py index 6f5f1630..817c81e1 100755 --- a/src/python/dm/daq_web_service/api/experimentRestApi.py +++ b/src/python/dm/daq_web_service/api/experimentRestApi.py @@ -4,6 +4,7 @@ import os import json import urllib +from dm.common.constants import dmProcessingStatus from dm.common.utility.encoder import Encoder from dm.common.exceptions.dmException import DmException from dm.common.objects.experiment import Experiment @@ -46,6 +47,14 @@ class ExperimentRestApi(DaqRestApi): responseDict = self.sendSessionRequest(url=url, method='GET') return DaqInfo(responseDict) + @DaqRestApi.execute + def listDaqs(self, status=None): + if not status: + status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY + url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status) + responseData = self.sendSessionRequest(url=url, method='GET') + return self.toDmObjectList(responseData, DaqInfo) + @DaqRestApi.execute def upload(self, experimentName, dataDirectory, daqInfo={}): if not experimentName: @@ -65,6 +74,14 @@ class ExperimentRestApi(DaqRestApi): responseDict = self.sendSessionRequest(url=url, method='GET') return UploadInfo(responseDict) + @DaqRestApi.execute + def listUploads(self, status=None): + if not status: + status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY + url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status) + responseData = self.sendSessionRequest(url=url, method='GET') + return self.toDmObjectList(responseData, UploadInfo) + @DaqRestApi.execute def stopUpload(self, id): url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id) diff --git a/src/python/dm/daq_web_service/cli/listDaqsCli.py b/src/python/dm/daq_web_service/cli/listDaqsCli.py new file mode 100755 index 00000000..1e7a7232 --- /dev/null +++ b/src/python/dm/daq_web_service/cli/listDaqsCli.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python + +from daqWebServiceSessionCli import DaqWebServiceSessionCli + +from dm.common.constants import dmProcessingStatus +from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi +from dm.common.exceptions.invalidRequest import InvalidRequest + +class ListDaqsCli(DaqWebServiceSessionCli): + + def __init__(self): + DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) + self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY)) + + def checkArgs(self): + if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST: + raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST) + + def getStatus(self): + return self.options.status + + def runCommand(self): + self.parseArgs(usage=""" + dm-list-daqs [--status=STATUS] + +Description: + Retrieves all known daqs. + """) + self.checkArgs() + api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) + daqs = api.listDaqs(self.getStatus()) + for daq in daqs: + print daq.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) + +####################################################################### +# Run command. +if __name__ == '__main__': + cli = ListDaqsCli() + cli.run() + diff --git a/src/python/dm/daq_web_service/cli/listUploadsCli.py b/src/python/dm/daq_web_service/cli/listUploadsCli.py new file mode 100755 index 00000000..8cc30c50 --- /dev/null +++ b/src/python/dm/daq_web_service/cli/listUploadsCli.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python + +from daqWebServiceSessionCli import DaqWebServiceSessionCli + +from dm.common.constants import dmProcessingStatus +from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi +from dm.common.exceptions.invalidRequest import InvalidRequest + +class ListUploadsCli(DaqWebServiceSessionCli): + + def __init__(self): + DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) + self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY)) + + def checkArgs(self): + if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST: + raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST) + + def getStatus(self): + return self.options.status + + def runCommand(self): + self.parseArgs(usage=""" + dm-list-uploads [--status=STATUS] + +Description: + Retrieves all known uploads. + """) + self.checkArgs() + api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) + uploads = api.listUploads(self.getStatus()) + for upload in uploads: + print upload.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) + +####################################################################### +# Run command. +if __name__ == '__main__': + cli = ListUploadsCli() + cli.run() + diff --git a/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py index 51956cf4..81cb65a5 100755 --- a/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py +++ b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py @@ -46,6 +46,15 @@ class ExperimentRouteDescriptor: 'method' : ['GET'] }, + # List DAQs + { + 'name' : 'listDaqs', + 'path' : '%s/experimentDaqsByStatus/:(status)' % contextRoot, + 'controller' : experimentSessionController, + 'action' : 'listDaqs', + 'method' : ['GET'] + }, + # Upload experiment data { 'name' : 'upload', @@ -64,6 +73,15 @@ class ExperimentRouteDescriptor: 'method' : ['GET'] }, + # List uploads + { + 'name' : 'listUploads', + 'path' : '%s/experimentUploadsByStatus/:(status)' % contextRoot, + 'controller' : experimentSessionController, + 'action' : 'listUploads', + 'method' : ['GET'] + }, + # Stop upload { 'name' : 'stopUpload', diff --git a/src/python/dm/daq_web_service/service/experimentSessionController.py b/src/python/dm/daq_web_service/service/experimentSessionController.py index b9f02f2d..27fce7d5 100755 --- a/src/python/dm/daq_web_service/service/experimentSessionController.py +++ b/src/python/dm/daq_web_service/service/experimentSessionController.py @@ -5,6 +5,7 @@ import json import os from dm.common.constants import dmProcessingMode +from dm.common.constants import dmProcessingStatus from dm.common.service.dmSessionController import DmSessionController from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.utility.encoder import Encoder @@ -62,6 +63,16 @@ class ExperimentSessionController(DmSessionController): self.logger.debug('Returning: %s' % response) return response + @cherrypy.expose + @DmSessionController.require(DmSessionController.isAdministrator()) + @DmSessionController.execute + def listDaqs(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, **kwargs): + if not status: + status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY + if status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST: + raise InvalidRequest('Invalid processing status "%s". Status must be one of %s.' % (status,dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)) + return self.listToJson(self.experimentSessionControllerImpl.listDaqs(status)) + @cherrypy.expose @DmSessionController.require(DmSessionController.isAdministrator()) @DmSessionController.execute @@ -97,6 +108,16 @@ class ExperimentSessionController(DmSessionController): self.logger.debug('Returning info for upload id %s' % id) return response + @cherrypy.expose + @DmSessionController.require(DmSessionController.isAdministrator()) + @DmSessionController.execute + def listUploads(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, **kwargs): + if not status: + status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY + if status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST: + raise InvalidRequest('Invalid processing status "%s". Status must be one of %s.' % (status,dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)) + return self.listToJson(self.experimentSessionControllerImpl.listUploads(status)) + @cherrypy.expose @DmSessionController.require(DmSessionController.isAdministrator()) @DmSessionController.execute diff --git a/src/python/dm/daq_web_service/service/impl/daqTracker.py b/src/python/dm/daq_web_service/service/impl/daqTracker.py index d538ad10..2f8f17d6 100755 --- a/src/python/dm/daq_web_service/service/impl/daqTracker.py +++ b/src/python/dm/daq_web_service/service/impl/daqTracker.py @@ -4,6 +4,7 @@ import os import uuid import time +from dm.common.constants import dmProcessingStatus from dm.common.objects.daqInfo import DaqInfo from dm.common.utility.objectTracker import ObjectTracker from dm.common.utility.timeUtility import TimeUtility @@ -41,8 +42,9 @@ class DaqTracker(ObjectTracker): daqInfo2 = DaqInfo(daqInfo) daqInfo2['nFiles'] = 0 - daqInfo2['nProcessedFiles'] = 0 + daqInfo2['nProcessedFiles'] = 0 daqInfo2['nProcessingErrors'] = 0 + daqInfo2['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING startTime = time.time() daqInfo2['startTime'] = startTime @@ -69,6 +71,16 @@ class DaqTracker(ObjectTracker): def getDaqInfo(self, id): return self.get(id) + def getDaqInfos(self, status=None): + daqInfoList = self.getAll() + if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY: + return daqInfoList + filteredDaqInfoList = [] + for daqInfo in daqInfoList: + if daqInfo.get('status', '') == status: + filteredDaqInfoList.append(daqInfo) + return filteredDaqInfoList + def getDaqInfoByExperimentAndDataDirectory(self, experiment, dataDirectory): experimentName = experiment.get('name') dataDir = os.path.normpath(dataDirectory) @@ -87,6 +99,7 @@ if __name__ == '__main__': daqId = daqInfo['id'] print 'DAQ ID: ', daqId print 'DAQ INFO: ', tracker.getDaqInfo(daqId) + print 'DAQS: ', tracker.getDaqInfos() print 'REMOVED DAQ: ', tracker.stopDaq(experiment, dataDirectory) dataDirectory = 'ftp:///wolf:2811///data/e1' daqId = tracker.startDaq(experiment, dataDirectory) diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index edeeaaa5..5899f05c 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -10,6 +10,7 @@ import uuid import copy import threading +from dm.common.constants import dmProcessingStatus from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.exceptions.invalidRequest import InvalidRequest @@ -62,6 +63,10 @@ class ExperimentSessionControllerImpl(DmObjectManager): daqInfo.updateStatus() return daqInfo + def listDaqs(self, status): + daqInfoList = DaqTracker.getInstance().getDaqInfos(status) + return daqInfoList + def uploadFiles(self, experimentName, dataDirectory, daqInfo): experiment = self.dsExperimentApi.getExperimentByName(experimentName) UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory) @@ -122,7 +127,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): UploadTracker.getInstance().startUpload(uploadId, uploadInfo) uploadInfo['nFiles'] = len(filePathsDict) - uploadInfo['status'] = 'running' + uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING self.logger.debug('Will prepare upload of %s files' % len(filePathsDict)) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment, filePathsDict]) timer.start() @@ -162,11 +167,15 @@ class ExperimentSessionControllerImpl(DmObjectManager): uploadInfo.updateStatus() return uploadInfo + def listUploads(self, status): + uploadInfoList = UploadTracker.getInstance().getUploadInfos(status) + return uploadInfoList + def stopUpload(self, id): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) - uploadInfo['status'] = 'aborting' + uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING uploadInfo.updateStatus() return uploadInfo @@ -212,16 +221,16 @@ class ExperimentSessionControllerImpl(DmObjectManager): processor = fileProcessingManager.fileProcessorDict.get(processorKey) processorName = processor.name if processorName in skipPlugins: - processingInfo[processorName] = {'status' : 'skipped'} + processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED} else: self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory)) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict]) - processingInfo[processorName] = {'status' : 'pending'} + processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_PENDING} timer.start() UploadTracker.getInstance().startUpload(uploadId, uploadInfo) uploadInfo['nFiles'] = len(filePathsDict) - uploadInfo['status'] = 'running' + uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING return uploadInfo def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict): @@ -234,8 +243,8 @@ class ExperimentSessionControllerImpl(DmObjectManager): dependsOn = processor.dependsOn while True: # Check status - if uploadInfo['status'] == 'aborting': - processingInfo[processorName]['status'] = 'aborted' + if uploadInfo['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING: + processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED return # Check that processor can proceed @@ -247,9 +256,9 @@ class ExperimentSessionControllerImpl(DmObjectManager): if depProcessorStatus in ['skipped', 'aborted', 'failed']: # We must skip processing self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) - processingInfo[processorName]['status'] = 'skipped' + processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED return - elif depProcessorStatus in ['pending', 'running']: + elif depProcessorStatus in [dmProcessingStatus.DM_PROCESSING_STATUS_PENDING, dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING]: # Do nothing pass elif depProcessorStatus == 'done': @@ -258,7 +267,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): else: # This should not happen self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) - processingInfo[processorName]['status'] = 'skipped' + processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED return # Process directory if we can if canProcess: @@ -267,11 +276,11 @@ class ExperimentSessionControllerImpl(DmObjectManager): 'experiment' : experiment, 'filePathsDict' : filePathsDict } - processingInfo[processorName]['status'] = 'running' + processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING processingStartTime = time.time() processor.processDirectory(directoryInfo) - if processingInfo[processorName]['status'] == 'running': - processingInfo[processorName]['status'] = 'done' + if processingInfo[processorName]['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING: + processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName)) else: self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status'])) @@ -282,7 +291,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): except Exception, ex: self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex))) - processingInfo[processorName]['status'] = 'failed' + processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED processingInfo[processorName]['processingError'] = str(ex) processingEndTime = time.time() diff --git a/src/python/dm/daq_web_service/service/impl/uploadTracker.py b/src/python/dm/daq_web_service/service/impl/uploadTracker.py index 8639f59d..89e57447 100755 --- a/src/python/dm/daq_web_service/service/impl/uploadTracker.py +++ b/src/python/dm/daq_web_service/service/impl/uploadTracker.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import os +from dm.common.constants import dmProcessingStatus from dm.common.objects.uploadInfo import UploadInfo from dm.common.utility.objectTracker import ObjectTracker from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists @@ -24,7 +25,7 @@ class UploadTracker(ObjectTracker): uploadInfo = self.get(uploadId) if uploadInfo is not None: uploadInfo.updateStatus() - if uploadInfo.get('status') == 'running': + if uploadInfo.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING: raise ObjectAlreadyExists('Upload id %s is already active for experiment %s in data directory %s.' % (uploadId, experimentName, dataDir)) del self.activeUploadDict[activeUploadKey] @@ -36,6 +37,18 @@ class UploadTracker(ObjectTracker): self.activeUploadDict[activeUploadKey] = uploadId self.put(uploadId, uploadInfo) + def getUploadInfo(self, id): + return self.get(id) + + def getUploadInfos(self, status=None): + uploadInfoList = self.getAll() + filteredUploadInfoList = [] + for uploadInfo in uploadInfoList: + uploadInfo.updateStatus() + if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY or uploadInfo.get('status', '') == status: + filteredUploadInfoList.append(uploadInfo) + return filteredUploadInfoList + #################################################################### # Testing -- GitLab