Skip to content
Snippets Groups Projects
Commit 84a35013 authored by sveseli's avatar sveseli
Browse files

mongo support for proc web service

parent 141ec0e7
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
import copy
import types
import re
from dm.common.exceptions.dmException import DmException
from dm.common.mongodb.api.dmMongoDbApi import DmMongoDbApi
from dm.common.mongodb.impl.workflowCollection import WorkflowCollection
from dm.common.mongodb.api.dmMongoDbApi import DmMongoDbApi
from dm.common.objects.workflow import Workflow
from dm.common.exceptions.invalidRequest import InvalidRequest
class WorkflowMongoDbApi(DmMongoDbApi):
SYSTEM_KEY_LIST = ['_id', 'name', 'owner']
def __init__(self):
DmMongoDbApi.__init__(self)
self.workflowCollectionDict = {}
def getWorkflowCollection(self, owner):
workflowCollection = self.workflowCollectionDict.get(owner)
if not workflowCollection:
workflowCollection = WorkflowCollection(self.dbClient, owner)
self.workflowCollectionDict[owner] = workflowCollection
return workflowCollection
@classmethod
def getAndCheckOwner(cls, dictObject):
owner = dictObject.get('owner')
cls.checkOwner(owner)
return owner
@classmethod
def checkOwner(cls, owner):
if not owner:
raise InvalidRequest('Owner name has not been provided.')
@DmMongoDbApi.executeDbCall
def addWorkflow(self, workflow, **kwargs):
owner = self.getAndCheckOwner(owner)
workflow2 = self.getMongoDict(workflow)
dbWorkflow = self.getWorkflowCollection(owner).addByUniqueKeys(workflow2)
return self.toDmObject(dbWorkflow, Workflow)
@DmMongoDbApi.executeDbCall
def getWorkflows(self, owner, queryDict={}, returnFieldDict=WorkflowCollection.ALL_FIELDS_DICT, **kwargs):
self.checkOwner(owner)
queryDict2 = self.getMongoDict(queryDict)
ignoreCase = kwargs.get(self.REGEX_IGNORE_CASE_KEY, True)
queryDict2 = self.convertStringsToRegex(queryDict2, ignoreCase)
return self.listToDmObjects(self.getWorkflowCollection(owner).findByQueryDict(queryDict2, returnFieldDict), Workflow)
@DmMongoDbApi.executeDbCall
def getWorkflowById(self, owner, id, **kwargs):
self.checkOwner(owner)
dbWorkflow = self.getWorkflowCollection(owner).findById(id)
return self.toDmObject(dbWorkflow, Workflow)
@DmMongoDbApi.executeDbCall
def getWorkflowByName(self, owner, name, **kwargs):
self.checkOwner(owner)
queryDict = { 'name' : name, 'owner' : owner }
dbWorkflow = self.getWorkflowCollection(owner).findByUniqueKeys(queryDict)
return self.toDmObject(dbWorkflow, Workflow)
@DmMongoDbApi.executeDbCall
def updateWorkflowById(self, workflow, **kwargs):
owner = self.getAndCheckOwner(workflow)
workflow2 = self.getMongoDict(workflow)
dbWorkflow = self.getWorkflowCollection(owner).updateById(workflow2)
return self.toDmObject(dbWorkflow, Workflow)
@DmMongoDbApi.executeDbCall
def updateWorkflow(self, workflow, **kwargs):
owner = self.getAndCheckOwner(workflow)
workflow2 = self.getMongoDict(workflow)
dbWorkflow = self.getWorkflowCollection(owner).updateByUniqueKeys(workflow2)
return self.toDmObject(dbWorkflow, Workflow)
@DmMongoDbApi.executeDbCall
def updateOrAddWorkflow(self, workflow, **kwargs):
owner = self.getAndCheckOwner(workflow)
workflow2 = self.getMongoDict(workflow)
dbWorkflow = self.getWorkflowCollection(owner).updateOrAddByUniqueKeys(workflow2)
return self.toDmObject(dbWorkflow, Workflow)
#######################################################################
# Testing.
if __name__ == '__main__':
from dm.common.mongodb.impl.mongoDbManager import MongoDbManager
mgr = MongoDbManager.getInstance()
mgr.dbName = 'dm'
api = WorkflowMongoDbApi()
workflows = api.getWorkflows('sveseli')
for workflow in workflows:
print 'WORKFLOW: %s\n' % workflow.getDictRep()
workflow = api.getWorkflowByName('sveseli', 'workflow-01')
print 'WORKFLOW: %s\n' % workflow.getDictRep()
#!/usr/bin/env python
from dm.common.utility.loggingManager import LoggingManager
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.exceptions.dbError import DbError
from dmMongoCollection import DmMongoCollection
class ProcessingJobCollection(DmMongoCollection):
"""Class responsible for updating processing job collection in mongo db."""
UNIQUE_KEYS_LIST = [ 'workflowId', 'timeStamp' ]
NAME_KEY = 'name'
ITEM_TYPE = 'processingJob'
def __init__(self, dbClient, prefix=None):
collectionName = 'processing-jobs'
if prefix:
collectionName = '%s-processing-jobs' % prefix
DmMongoCollection.__init__(self, collectionName, dbClient)
#######################################################################
# Testing
if __name__ == '__main__':
from dmMongoClient import DmMongoClient
mongo = DmMongoClient('dm')
processingJobCollection = ProcessingJobCollection(mongo)
workflowInfo = {'name' : 'workflow-01', 'owner' : 'sveseli', 'stages' :
[
{ 'id' : '1', 'executable' : '/bin/date', 'args' : [ ] },
{ 'id' : '2', 'executable' : '/bin/ls', 'args' : [ '-l', 'INPUT_FILE' ], 'parallelizeExecution' : False},
{ 'id' : '3', 'executable' : '/bin/cp', 'args' : [ 'INPUT_FILE', 'OUTPUT_FILE=INPUT_FILE.processed' ], 'parallelizeExecution' : False}
]
}
print workflowCollection.addByUniqueKeys(workflowInfo)
print workflowCollection.findByQueryDict({'name' : 'workflow-01', 'owner' : 'sveseli'}, {'steps' : 1})
workflowInfo['description'] = 'My first workflow'
print workflowCollection.updateByUniqueKeys(workflowInfo)
#!/usr/bin/env python
from dm.common.utility.loggingManager import LoggingManager
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.exceptions.dbError import DbError
from dmMongoCollection import DmMongoCollection
class WorkflowCollection(DmMongoCollection):
"""Class responsible for updating workflow collection in mongo db."""
UNIQUE_KEYS_LIST = [ 'name', 'owner' ]
NAME_KEY = 'name'
ITEM_TYPE = 'workflow'
def __init__(self, dbClient, prefix=None):
collectionName = 'workflows'
if prefix:
collectionName = '%s-workflows' % prefix
DmMongoCollection.__init__(self, collectionName, dbClient)
#######################################################################
# Testing
if __name__ == '__main__':
from dmMongoClient import DmMongoClient
mongo = DmMongoClient('dm')
workflowCollection = WorkflowCollection(mongo)
workflowInfo = {'name' : 'workflow-01', 'owner' : 'sveseli', 'stages' :
[
{ 'id' : '1', 'executable' : '/bin/date', 'args' : [ ] },
{ 'id' : '2', 'executable' : '/bin/ls', 'args' : [ '-l', 'INPUT_FILE' ], 'parallelizeExecution' : False},
{ 'id' : '3', 'executable' : '/bin/cp', 'args' : [ 'INPUT_FILE', 'OUTPUT_FILE=INPUT_FILE.processed' ], 'parallelizeExecution' : False}
]
}
print workflowCollection.addByUniqueKeys(workflowInfo)
print workflowCollection.findByQueryDict({'name' : 'workflow-01', 'owner' : 'sveseli'}, {'steps' : 1})
workflowInfo['description'] = 'My first workflow'
print workflowCollection.updateByUniqueKeys(workflowInfo)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment