From 84a350138bd0d1a2a20ee337f28934f567dff7a5 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Wed, 22 Feb 2017 17:24:23 +0000 Subject: [PATCH] mongo support for proc web service --- .../common/mongodb/api/workflowMongoDbApi.py | 104 ++++++++++++++++++ .../mongodb/impl/processingJobCollection.py | 42 +++++++ .../common/mongodb/impl/workflowCollection.py | 42 +++++++ 3 files changed, 188 insertions(+) create mode 100755 src/python/dm/common/mongodb/api/workflowMongoDbApi.py create mode 100755 src/python/dm/common/mongodb/impl/processingJobCollection.py create mode 100755 src/python/dm/common/mongodb/impl/workflowCollection.py diff --git a/src/python/dm/common/mongodb/api/workflowMongoDbApi.py b/src/python/dm/common/mongodb/api/workflowMongoDbApi.py new file mode 100755 index 00000000..ebe4393c --- /dev/null +++ b/src/python/dm/common/mongodb/api/workflowMongoDbApi.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python + +import copy +import types +import re + +from dm.common.exceptions.dmException import DmException +from dm.common.mongodb.api.dmMongoDbApi import DmMongoDbApi +from dm.common.mongodb.impl.workflowCollection import WorkflowCollection +from dm.common.mongodb.api.dmMongoDbApi import DmMongoDbApi +from dm.common.objects.workflow import Workflow +from dm.common.exceptions.invalidRequest import InvalidRequest + +class WorkflowMongoDbApi(DmMongoDbApi): + + SYSTEM_KEY_LIST = ['_id', 'name', 'owner'] + + def __init__(self): + DmMongoDbApi.__init__(self) + self.workflowCollectionDict = {} + + def getWorkflowCollection(self, owner): + workflowCollection = self.workflowCollectionDict.get(owner) + if not workflowCollection: + workflowCollection = WorkflowCollection(self.dbClient, owner) + self.workflowCollectionDict[owner] = workflowCollection + return workflowCollection + + @classmethod + def getAndCheckOwner(cls, dictObject): + owner = dictObject.get('owner') + cls.checkOwner(owner) + return owner + + @classmethod + def checkOwner(cls, owner): + if not owner: + raise InvalidRequest('Owner name has not been provided.') + + @DmMongoDbApi.executeDbCall + def addWorkflow(self, workflow, **kwargs): + owner = self.getAndCheckOwner(owner) + workflow2 = self.getMongoDict(workflow) + dbWorkflow = self.getWorkflowCollection(owner).addByUniqueKeys(workflow2) + return self.toDmObject(dbWorkflow, Workflow) + + @DmMongoDbApi.executeDbCall + def getWorkflows(self, owner, queryDict={}, returnFieldDict=WorkflowCollection.ALL_FIELDS_DICT, **kwargs): + self.checkOwner(owner) + queryDict2 = self.getMongoDict(queryDict) + ignoreCase = kwargs.get(self.REGEX_IGNORE_CASE_KEY, True) + queryDict2 = self.convertStringsToRegex(queryDict2, ignoreCase) + return self.listToDmObjects(self.getWorkflowCollection(owner).findByQueryDict(queryDict2, returnFieldDict), Workflow) + + @DmMongoDbApi.executeDbCall + def getWorkflowById(self, owner, id, **kwargs): + self.checkOwner(owner) + dbWorkflow = self.getWorkflowCollection(owner).findById(id) + return self.toDmObject(dbWorkflow, Workflow) + + @DmMongoDbApi.executeDbCall + def getWorkflowByName(self, owner, name, **kwargs): + self.checkOwner(owner) + queryDict = { 'name' : name, 'owner' : owner } + dbWorkflow = self.getWorkflowCollection(owner).findByUniqueKeys(queryDict) + return self.toDmObject(dbWorkflow, Workflow) + + @DmMongoDbApi.executeDbCall + def updateWorkflowById(self, workflow, **kwargs): + owner = self.getAndCheckOwner(workflow) + workflow2 = self.getMongoDict(workflow) + dbWorkflow = self.getWorkflowCollection(owner).updateById(workflow2) + return self.toDmObject(dbWorkflow, Workflow) + + @DmMongoDbApi.executeDbCall + def updateWorkflow(self, workflow, **kwargs): + owner = self.getAndCheckOwner(workflow) + workflow2 = self.getMongoDict(workflow) + dbWorkflow = self.getWorkflowCollection(owner).updateByUniqueKeys(workflow2) + return self.toDmObject(dbWorkflow, Workflow) + + @DmMongoDbApi.executeDbCall + def updateOrAddWorkflow(self, workflow, **kwargs): + owner = self.getAndCheckOwner(workflow) + workflow2 = self.getMongoDict(workflow) + dbWorkflow = self.getWorkflowCollection(owner).updateOrAddByUniqueKeys(workflow2) + return self.toDmObject(dbWorkflow, Workflow) + +####################################################################### +# Testing. +if __name__ == '__main__': + from dm.common.mongodb.impl.mongoDbManager import MongoDbManager + mgr = MongoDbManager.getInstance() + mgr.dbName = 'dm' + api = WorkflowMongoDbApi() + + workflows = api.getWorkflows('sveseli') + for workflow in workflows: + print 'WORKFLOW: %s\n' % workflow.getDictRep() + + workflow = api.getWorkflowByName('sveseli', 'workflow-01') + print 'WORKFLOW: %s\n' % workflow.getDictRep() + + diff --git a/src/python/dm/common/mongodb/impl/processingJobCollection.py b/src/python/dm/common/mongodb/impl/processingJobCollection.py new file mode 100755 index 00000000..1970247a --- /dev/null +++ b/src/python/dm/common/mongodb/impl/processingJobCollection.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +from dm.common.utility.loggingManager import LoggingManager +from dm.common.exceptions.invalidArgument import InvalidArgument +from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists +from dm.common.exceptions.objectNotFound import ObjectNotFound +from dm.common.exceptions.dbError import DbError +from dmMongoCollection import DmMongoCollection + +class ProcessingJobCollection(DmMongoCollection): + """Class responsible for updating processing job collection in mongo db.""" + + UNIQUE_KEYS_LIST = [ 'workflowId', 'timeStamp' ] + NAME_KEY = 'name' + ITEM_TYPE = 'processingJob' + + def __init__(self, dbClient, prefix=None): + collectionName = 'processing-jobs' + if prefix: + collectionName = '%s-processing-jobs' % prefix + DmMongoCollection.__init__(self, collectionName, dbClient) + +####################################################################### +# Testing +if __name__ == '__main__': + from dmMongoClient import DmMongoClient + mongo = DmMongoClient('dm') + processingJobCollection = ProcessingJobCollection(mongo) + workflowInfo = {'name' : 'workflow-01', 'owner' : 'sveseli', 'stages' : + [ + { 'id' : '1', 'executable' : '/bin/date', 'args' : [ ] }, + { 'id' : '2', 'executable' : '/bin/ls', 'args' : [ '-l', 'INPUT_FILE' ], 'parallelizeExecution' : False}, + { 'id' : '3', 'executable' : '/bin/cp', 'args' : [ 'INPUT_FILE', 'OUTPUT_FILE=INPUT_FILE.processed' ], 'parallelizeExecution' : False} + ] + } + print workflowCollection.addByUniqueKeys(workflowInfo) + print workflowCollection.findByQueryDict({'name' : 'workflow-01', 'owner' : 'sveseli'}, {'steps' : 1}) + workflowInfo['description'] = 'My first workflow' + print workflowCollection.updateByUniqueKeys(workflowInfo) + + + diff --git a/src/python/dm/common/mongodb/impl/workflowCollection.py b/src/python/dm/common/mongodb/impl/workflowCollection.py new file mode 100755 index 00000000..68460aa6 --- /dev/null +++ b/src/python/dm/common/mongodb/impl/workflowCollection.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +from dm.common.utility.loggingManager import LoggingManager +from dm.common.exceptions.invalidArgument import InvalidArgument +from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists +from dm.common.exceptions.objectNotFound import ObjectNotFound +from dm.common.exceptions.dbError import DbError +from dmMongoCollection import DmMongoCollection + +class WorkflowCollection(DmMongoCollection): + """Class responsible for updating workflow collection in mongo db.""" + + UNIQUE_KEYS_LIST = [ 'name', 'owner' ] + NAME_KEY = 'name' + ITEM_TYPE = 'workflow' + + def __init__(self, dbClient, prefix=None): + collectionName = 'workflows' + if prefix: + collectionName = '%s-workflows' % prefix + DmMongoCollection.__init__(self, collectionName, dbClient) + +####################################################################### +# Testing +if __name__ == '__main__': + from dmMongoClient import DmMongoClient + mongo = DmMongoClient('dm') + workflowCollection = WorkflowCollection(mongo) + workflowInfo = {'name' : 'workflow-01', 'owner' : 'sveseli', 'stages' : + [ + { 'id' : '1', 'executable' : '/bin/date', 'args' : [ ] }, + { 'id' : '2', 'executable' : '/bin/ls', 'args' : [ '-l', 'INPUT_FILE' ], 'parallelizeExecution' : False}, + { 'id' : '3', 'executable' : '/bin/cp', 'args' : [ 'INPUT_FILE', 'OUTPUT_FILE=INPUT_FILE.processed' ], 'parallelizeExecution' : False} + ] + } + print workflowCollection.addByUniqueKeys(workflowInfo) + print workflowCollection.findByQueryDict({'name' : 'workflow-01', 'owner' : 'sveseli'}, {'steps' : 1}) + workflowInfo['description'] = 'My first workflow' + print workflowCollection.updateByUniqueKeys(workflowInfo) + + + -- GitLab