Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 807 additions and 0 deletions
__version__ = "1.1 (2017.03.01)"
#!/usr/bin/env python
from PyQt4.QtGui import *
from PyQt4.QtCore import *
from dmApiFactory import DmApiFactory
# Define the DAQs tab content:
class DaqsTab(QObject):
def __init__(self, stationName, parent, id=-1):
super(DaqsTab, self).__init__(parent)
self.stationName = stationName
self.experimentDaqApi = DmApiFactory.getInstance().getExperimentDaqApi()
self.InsertColumn(0, 'Id')
self.InsertColumn(2, 'Data Directory')
self.InsertColumn(3, '# Files')
self.SetColumnWidth(0, 150)
self.SetColumnWidth(1, 350)
self.SetColumnWidth(2, 250)
self.updateList()
def updateList(self):
self.daqList = self.experimentDaqApi.listDaqs()
i = 0
for daq in self.daqList:
self.InsertStringItem(i, daq.get('id'))
self.SetStringItem(i, 1, daq.get('dataDirectory', ''))
self.SetStringItem(i, 2, str(daq.get('nFiles', '0')))
if (i % 2) == 0:
self.SetItemBackgroundColour(i, '#e6f1f5')
i += 1
#!/usr/bin/env python
from dm.common.utility.singleton import Singleton
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
class DmApiFactory(Singleton):
def __init__(self):
self.__configure()
def __configure(self):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
configManager = ConfigurationManager.getInstance()
(self.username,self.password) = configManager.parseLoginFile()
self.protocol = configManager.getWebServiceProtocol()
self.dsHost = configManager.getDsWebServiceHost()
self.dsPort = configManager.getDsWebServicePort()
self.daqHost = configManager.getDaqWebServiceHost()
self.daqPort = configManager.getDaqWebServicePort()
def getUserDsApi(self):
from dm.ds_web_service.api.userDsApi import UserDsApi
api = UserDsApi(self.username, self.password, self.dsHost, self.dsPort, self.protocol)
return api
def getExperimentDsApi(self):
from dm.ds_web_service.api.experimentDsApi import ExperimentDsApi
api = ExperimentDsApi(self.username, self.password, self.dsHost, self.dsPort, self.protocol)
return api
def getExperimentDaqApi(self):
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
api = ExperimentDaqApi(self.username, self.password, self.daqHost, self.daqPort, self.protocol)
return api
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
import sys
from PyQt4.QtCore import *
from PyQt4.QtGui import QMainWindow, QStackedLayout, QApplication, QWidget
from dm.common.constants import dmStatus
from dm.common.exceptions.dmException import DmException
from dm.common.exceptions.configurationError import ConfigurationError
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from experimentsTab import ExperimentsTab
from daqsTab import DaqsTab
from uploadsTab import UploadsTab
from initialTab import InitialTab
class DmStationUi(QMainWindow):
def __init__(self):
self.__configure()
self.logger.debug('Starting UI for DM Station: %s' % self.stationName)
super(DmStationUi, self).__init__()
self.setWindowTitle('DM Station: %s' % self.stationName)
self.setGeometry(0, 0, 800, 400)
# Create a stacked layout to connect the various pages
self.stackedLayout = QStackedLayout()
# Create the tab windows
initialTab = InitialTab(self.stationName, self.stackedLayout)
experimentsTab = ExperimentsTab(self.stationName, self.stackedLayout)
#daqsTab = DaqsTab(self.stationName, self.stackedLayout)
#uploadsTab = UploadsTab(self.stationName, self.stackedLayout)
# Add the windows to the stack.
self.stackedLayout.addWidget(initialTab)
self.stackedLayout.addWidget(experimentsTab)
#self.stackedLayout.addWidget(daqsTab)
#self.stackedLayout.addWidget(uploadsTab)
# Set a central widget to hold everything
self.centralWidget = QWidget()
self.centralWidget.setLayout(self.stackedLayout)
self.setCentralWidget(self.centralWidget)
def __configure(self):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
configManager = ConfigurationManager.getInstance()
self.stationName = configManager.getInstance().getStationName()
if not self.stationName:
raise ConfigurationError('DM_STATION_NAME environment variable is not defined.')
if __name__ == "__main__":
try:
app = QApplication(sys.argv)
DmStationUi().Show()
app.MainLoop()
except DmException, ex:
print >>sys.stderr, 'ERROR: %s' % ex
raise SystemExit(ex.getErrorCode())
except Exception, ex:
print >>sys.stderr, '%s' % ex
raise SystemExit(dmStatus.DM_ERROR)
#!/usr/bin/env python
from PyQt4.QtGui import *
from PyQt4.QtCore import QObject
from dmApiFactory import DmApiFactory
# Define the experiments tab content:
class ExperimentsTab(QObject):
def __init__(self, stationName, parent, id=-1):
super(ExperimentsTab, self).__init__(parent)
self.stationName = stationName
self.experimentDsApi = DmApiFactory.getInstance().getExperimentDsApi()
self.updateList()
def updateList(self):
self.experimentList = self.experimentDsApi.getExperimentsByStation(self.stationName)
i = 0
for experiment in self.experimentList:
self.InsertStringItem(i, experiment.get('name'))
self.SetStringItem(i, 1, experiment.get('description', ''))
self.SetStringItem(i, 2, str(experiment.get('startDate', '')))
if (i % 2) == 0:
self.SetItemBackgroundColour(i, '#e6f1f5')
i += 1
#!/usr/bin/env python
from PyQt4.QtGui import QGridLayout, QSpacerItem, QSizePolicy, QPushButton, QWidget, QFont, QLabel
from PyQt4.QtCore import QObject, Qt
# Define the experiments tab content:
class InitialTab(QObject):
def __init__(self, stationName, parent, id=-1):
super(InitialTab, self).__init__(parent)
self.stationName = stationName
self.parent = parent
def initialTabLayout(self):
grid = QGridLayout()
labelFont = QFont("Arial", 18, QFont.Bold)
lbl = QLabel(self.stationName + " DM Tool", self)
lbl.setAlignment(Qt.AlignCenter)
lbl.setFont(labelFont)
grid.addWidget(lbl, 1, 1, 1, 3)
grid.addItem(QSpacerItem(20, 130, QSizePolicy.Expanding), 2, 1)
grid.addItem(QSpacerItem(40, 20, QSizePolicy.Expanding), 3, 1)
manageBtn = QPushButton("Manage Experiments", self)
manageBtn.clicked.connect(self.setTab(1))
grid.addWidget(manageBtn, 3, 2)
grid.addItem(QSpacerItem(40, 20, QSizePolicy.Expanding), 3, 3)
viewBtn = QPushButton("View Experiments", self)
viewBtn.clicked.connect(self.setTab(2))
grid.addWidget(viewBtn, 3, 4)
grid.addItem(QSpacerItem(40, 20, QSizePolicy.Expanding), 3, 5)
grid.addItem(QSpacerItem(20, 130, QSizePolicy.Expanding), 4, 1)
self.initialTabWidget = QWidget()
self.initialTabWidget.setLayout(grid)
def setTab(self, tab):
self.parent.setCurrentIndex(tab)
#!/usr/bin/env python
from PyQt4.QtGui import *
from PyQt4.QtCore import *
from dmApiFactory import DmApiFactory
# Define the experiments tab content:
class UploadsTab(QObject):
def __init__(self, stationName, parent, id=-1):
super(UploadsTab, self).__init__(parent)
self.stationName = stationName
self.experimentDaqApi = DmApiFactory.getInstance().getExperimentDaqApi()
self.InsertColumn(0, 'Id')
self.InsertColumn(2, 'Data Directory')
self.InsertColumn(3, '# Files')
self.SetColumnWidth(0, 150)
self.SetColumnWidth(1, 350)
self.SetColumnWidth(2, 250)
self.updateList()
def updateList(self):
self.uploadList = self.experimentDaqApi.listUploads()
i = 0
for upload in self.uploadList:
self.InsertStringItem(i, upload.get('id'))
self.SetStringItem(i, 1, upload.get('dataDirectory', ''))
self.SetStringItem(i, 2, str(upload.get('nFiles', '0')))
if (i % 2) == 0:
self.SetItemBackgroundColour(i, '#e6f1f5')
i += 1
#!/usr/bin/env python
import os
from dm.common.exceptions.configurationError import ConfigurationError
from dm.common.api.dmApi import DmApi
from dm.aps_bss.impl.bssClient import BssClient
class ApsBssApi(DmApi):
''' Data Management API for APS Beamline Scheduling System. '''
def __init__(self, beamlineName=None, username=None, password=None, loginFile=None):
'''
Constructor.
:param beamlineName: beamline name (if not provided, environment variable DM_BEAMLINE_NAME must be set)
:type beamlineName: str
:param username: BSS username (if not provided, loginFile must be specified, or environment variable DM_BSS_LOGIN_FILE must be set)
:type username: str
:param password: BSS password (if not provided, loginFile must be specified, environment variable DM_BSS_LOGIN_FILE must be set)
:type password: str
:param loginFile: BSS login file (not used if username/password are provided; it can be replaced by environment variable DM_BSS_LOGIN_FILE)
:type loginFile: str
:raises ConfigurationError: in case beamline name or username/password are not provided, and corresponding environment variables are not set
Note that environment variable DM_BSS_LOGIN_FILE should contain
path to a file containing BSS username and password as a single line in
the '<username>|<password>' form.
>>> api = ApsBssApi(beamlineName='1-ID-B,C,E', username='dm', password='XYZ')
'''
DmApi.__init__(self)
if not username or not password:
if not loginFile:
loginFile = os.environ.get('DM_BSS_LOGIN_FILE')
if loginFile:
try:
# Assume form <username>|<password>
tokenList = open(loginFile).readline().split('|')
if len(tokenList) == 2:
username = tokenList[0].strip()
password = tokenList[1].strip()
except Exception, ex:
raise ConfigurationError('Could not determine username/password from BSS login file: %s' % str(ex))
if not username or not password:
raise ConfigurationError('Username and/or password were not provided, and DM_BSS_LOGIN_FILE is not set.')
if not beamlineName:
beamlineName = os.environ.get('DM_BEAMLINE_NAME')
if not beamlineName:
raise ConfigurationError('Beamline name was not provided, and DM_BEAMLINE_NAME is not set.')
self.beamlineName = beamlineName
self.bssClient = BssClient(username, password)
@DmApi.execute2
def listRuns(self):
'''
List all available runs.
:returns: list of RunInfo objects
:raises DmException: for any error
>>> runs = api.listRuns()
>>> for run in runs:
>>> print run['name']
'''
return self.bssClient.listRuns()
@DmApi.execute2
def getCurrentRun(self):
'''
Find current run.
:returns: RunInfo object
:raises DmException: for any error
>>> run = api.getCurrentRun()
'''
return self.bssClient.getCurrentRun()
@DmApi.execute2
def listBeamlineProposals(self, runName=None):
'''
List beamline proposals for a given run.
:param runName: run name (if not provided, current run name will be used)
:type runName: str
:returns: list of ProposalInfo objects
:raises DmException: for any error
>>> proposals = api.listBeamlineProposals()
>>> for proposal in proposals:
>>> print proposal['title']
'''
if not runName:
runName = self.getCurrentRun()['name']
return self.bssClient.listBeamlineProposals(self.beamlineName, runName)
@DmApi.execute2
def getBeamlineProposal(self, proposalId, runName=None):
'''
Get beamline proposal with a given id.
:param proposalId: proposal id
:type proposalId: str
:param runName: run name (if not provided, current run name will be used)
:type runName: str
:returns: ProposalInfo object
:raises ObjectNotFound: if proposal with the given id does not exist
:raises DmException: for any other error
>>> proposal = api.getBeamlineProposal(42096)
>>> for experimenter in proposal['experimenters']:
>>> print experimenter['badge'], experimenter['lastName']
'''
if not runName:
runName = self.getCurrentRun()['name']
return self.bssClient.getBeamlineProposal(proposalId, self.beamlineName, runName)
#######################################################################
# Testing.
if __name__ == '__main__':
api = ApsBssApi('1-ID-B,C,E')
print 'RUNS', api.listRuns()
print 'CURRRENT RUN: ', api.getCurrentRun()
print 'PROPOSALS: ', api.listBeamlineProposals()
print
print 'FIND PROPOSAL: ', api.getBeamlineProposal(42096)
print 'FIND PROPOSAL: ', api.getBeamlineProposal(52096)
#!/usr/bin/env python
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.cli.dmCli import DmCli
from dm.aps_bss.api.apsBssApi import ApsBssApi
class GetProposalCli(DmCli):
def __init__(self):
DmCli.__init__(self)
self.addOption('', '--id', dest='proposalId', help='Proposal id.')
self.addOption('', '--run', dest='runName', help='Run name. If not provided, current run will be used.')
self.addOption('', '--bss-login-file', dest='bssLoginFile', help='BSS login file. Login file may also be specified via environment variable DM_BSS_LOGIN_FILE.')
def checkArgs(self):
if not self.options.proposalId:
raise InvalidRequest('Missing proposal id.')
def runCommand(self):
self.parseArgs(usage="""
dm-get-proposal --id=PROPOSALID
[--run=RUNNAME]
[--bss-login-file=BSSLOGINFILE]
Description:
Retrieves beamline proposal for the given id.
""")
self.checkArgs()
proposalId = int(self.options.proposalId)
api = ApsBssApi(loginFile=self.options.bssLoginFile)
proposal = api.getBeamlineProposal(proposalId=proposalId, runName=self.options.runName)
print proposal.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = GetProposalCli()
cli.run()
#!/usr/bin/env python
from dm.common.cli.dmCli import DmCli
from dm.aps_bss.api.apsBssApi import ApsBssApi
class ListProposalsCli(DmCli):
def __init__(self):
DmCli.__init__(self)
self.addOption('', '--run', dest='runName', help='Run name. If not provided, current run will be used.')
self.addOption('', '--bss-login-file', dest='bssLoginFile', help='BSS login file. It may be provided via environment variable DM_BSS_LOGIN_FILE.')
def checkArgs(self):
pass
def runCommand(self):
self.parseArgs(usage="""
dm-list-proposals
[--run=RUNNAME]
[--bss-login-file=BSSLOGINFILE]
Description:
Retrieves list of beamline proposals for the given run.
""")
self.checkArgs()
api = ApsBssApi(loginFile=self.options.bssLoginFile)
proposals = api.listBeamlineProposals(runName=self.options.runName)
for proposal in proposals:
print proposal.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListProposalsCli()
cli.run()
#!/usr/bin/env python
from dm.common.cli.dmCli import DmCli
from dm.aps_bss.api.apsBssApi import ApsBssApi
class ListRunsCli(DmCli):
def __init__(self):
DmCli.__init__(self)
self.addOption('', '--bss-login-file', dest='bssLoginFile', help='BSS login file. It may be provided via environment variable DM_BSS_LOGIN_FILE.')
def checkArgs(self):
pass
def runCommand(self):
self.parseArgs(usage="""
dm-list-runs [--bss-login-file=BSSLOGINFILE]
Description:
Retrieves list of available runs.
""")
self.checkArgs()
api = ApsBssApi(loginFile=self.options.bssLoginFile)
runs = api.listRuns()
for run in runs:
print run.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListRunsCli()
cli.run()
#!/usr/bin/env python
import datetime
import string
from suds.wsse import Security
from suds.wsse import UsernameToken
from suds.client import Client
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.runInfo import RunInfo
from dm.common.objects.beamlineInfo import BeamlineInfo
from dm.common.objects.proposalInfo import ProposalInfo
from dm.common.objects.apsUserInfo import ApsUserInfo
from dm.common.exceptions.dmException import DmException
from dm.common.utility.loggingManager import LoggingManager
class BssClient:
WSDL_URL = 'https://schedule.aps.anl.gov/beamschedds/springws'
CACHE_DURATION_IN_SECONDS = 10
def __init__(self, username, password):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.__configure(username, password)
def __configure(self, username, password):
beamlineScheduleServiceUrl = self.WSDL_URL + '/beamlineScheduleService/beamlineScheduleWebService.wsdl'
runScheduleServiceUrl = self.WSDL_URL + '/runScheduleService/runScheduleWebService.wsdl'
try:
self.runScheduleServiceClient = Client(runScheduleServiceUrl)
self.runScheduleServiceClient.options.cache.setduration(seconds=self.CACHE_DURATION_IN_SECONDS)
self.setSoapHeader(self.runScheduleServiceClient, username, password)
self.beamlineScheduleServiceClient = Client(beamlineScheduleServiceUrl)
self.beamlineScheduleServiceClient.options.cache.setduration(seconds=self.CACHE_DURATION_IN_SECONDS)
self.setSoapHeader(self.beamlineScheduleServiceClient, username, password)
except Exception, ex:
self.logger.error('Cannot open BSS connection: %s' % str(ex))
raise DmException(exception=ex)
@classmethod
def setSoapHeader(cls, client, username, password):
security = Security()
token = UsernameToken(username, password)
token.setcreated()
security.tokens.append(token)
client.set_options(wsse=security)
def listRuns(self):
''' Return list of all runs. '''
result = self.runScheduleServiceClient.service.findAllRuns()
runArray = result.run
runs = []
for run in runArray:
runs.append(RunInfo({'name' : run.runName, 'startTime' : run.startTime, 'endTime' : run.endTime}))
return runs
def listRunsBetweenDates(self, startDate, endDate):
''' Find list of runs between given startDate and endDate. '''
result = self.runScheduleServiceClient.service.findAllRuns()
runArray = result.run
runs = []
for run in runArray:
if run.startTime >= startDate and run.endTime <= endDate:
runs.append(RunInfo({'name' : run.runName, 'startTime' : run.startTime, 'endTime' : run.endTime}))
return runs
def getRunForDates(self, startDate, endDate):
''' Find run that spans given startDate and endDate. '''
result = self.runScheduleServiceClient.service.findAllRuns()
runArray = result.run
for run in runArray:
if startDate >= run.startTime and endDate <= run.endTime:
return RunInfo({'name' : run.runName, 'startTime' : run.startTime, 'endTime' : run.endTime})
raise ObjectNotFound('No run found.')
def getCurrentRun(self):
''' Find current run. '''
now = datetime.datetime.now()
return self.getRunForDates(now, now)
def listBeamlines(self):
''' Find list of all beamlines. '''
result = self.beamlineScheduleServiceClient.service.findAllBeamlines()
beamlineArray = result.beamline
beamlines = []
for beamline in beamlineArray:
beamlines.append(BeamlineInfo({'name' : beamline.beamlineName, 'id' : beamline.id}))
return beamlines
def createProposalInfo(self, proposal):
experimenterArray = proposal.experimenters.experimenter
experimenters = []
for experimenter in experimenterArray:
user = ApsUserInfo({
'id' : experimenter.id,
'email' : experimenter.email,
})
if hasattr(experimenter, 'badge'):
user['badge'] = experimenter.badge
else:
# Ignore this user, has no badge
continue
if hasattr(experimenter, 'piFlag'):
user['piFlag'] = experimenter.piFlag
if hasattr(experimenter, 'instId'):
user['instId'] = experimenter.instId
if hasattr(experimenter, 'firstName'):
user['firstName'] = ''.join(filter(lambda x:x in string.printable, experimenter.firstName))
if hasattr(experimenter, 'lastName'):
user['lastName'] = ''.join(filter(lambda x:x in string.printable, experimenter.lastName))
if hasattr(experimenter, 'institution'):
# Remove non-printable characters
user['institution'] = ''.join(filter(lambda x:x in string.printable, experimenter.institution))
experimenters.append(user)
proposalInfo = ProposalInfo({
'title' : proposal.proposalTitle,
'id' : proposal.id,
'experimenters' : experimenters
})
return proposalInfo
def listBeamlineProposals(self, beamlineName, runName):
''' Find beamline schedule for a given beamlineName and runName. '''
schedule = self.beamlineScheduleServiceClient.service.findBeamlineSchedule(beamlineName, runName)
activitiesArray = schedule.activities.activity
proposals = []
for activity in activitiesArray:
if hasattr(activity, 'beamtimeRequest'):
proposal = activity.beamtimeRequest.proposal
proposals.append(self.createProposalInfo(proposal))
return proposals
def getBeamlineProposal(self, proposalId, beamlineName, runName):
''' Find proposal with a given id, beamlineName and runName. '''
schedule = self.beamlineScheduleServiceClient.service.findBeamlineSchedule(beamlineName, runName)
activitiesArray = schedule.activities.activity
proposals = []
for activity in activitiesArray:
if hasattr(activity, 'beamtimeRequest'):
proposal = activity.beamtimeRequest.proposal
if proposal.id == proposalId:
return self.createProposalInfo(proposal)
raise ObjectNotFound('Proposal with id %s does not exist (beamline: %s; run: %s).' % (proposalId, beamlineName, runName))
if __name__ == '__main__':
bss = BssClient('DMADMIN', 'A2Fxew@11:76am')
now = datetime.datetime.now()
days = datetime.timedelta(days=180)
before = now - days
print before, now
runNames = bss.getRunsBetweenDates(before, now)
print 'RUNS: ', runNames
print 'CURRENT RUN: ', bss.getCurrentRun()
print bss.listBeamlines()
print bss.listBeamlineProposals('1-ID-B,C,E', '2017-1')
print
print bss.getBeamlineProposal(51190, '1-ID-B,C,E', '2017-1')
print
print bss.getBeamlineProposal(48258, '1-ID-B,C,E', '2017-1')
#!/usr/bin/env python
from dm.aps_user_db.api.apsUserDbApiBase import ApsUserDbApiBase
from dm.aps_user_db.impl.apsUserInfoHandler import ApsUserInfoHandler
class ApsUserDbApi(ApsUserDbApiBase):
""" APS User DB API class. """
def __init__(self):
ApsUserDbApiBase.__init__(self)
self.userInfoHandler = ApsUserInfoHandler()
@ApsUserDbApiBase.executeQuery
def getApsUsers(self, **kwargs):
session = kwargs['session']
dbUsers = self.userInfoHandler.getApsUsers(session)
return self.toDmObjectList(dbUsers)
@ApsUserDbApiBase.executeQuery
def getApsUserByBadgeNumber(self, badgeNumber, **kwargs):
session = kwargs['session']
dbUserInfo = self.userInfoHandler.getApsUserByBadgeNumber(session, badgeNumber)
return dbUserInfo.getDmObject()
#######################################################################
# Testing.
if __name__ == '__main__':
api = ApsUserDbApi()
#users = api.getApsUsers()
#for u in users:
# print u
print api.getApsUserByBadgeNumber(225159)
#!/usr/bin/env python
from dm.common.exceptions.dmException import DmException
from dm.common.exceptions.internalError import InternalError
from dm.common.utility.loggingManager import LoggingManager
from dm.aps_user_db.impl.apsUserDbManager import ApsUserDbManager
class ApsUserDbApiBase:
""" APS User DB API base class. """
def __init__(self):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.dbManager = ApsUserDbManager.getInstance()
# Decorator for all DB queries
@classmethod
def executeQuery(cls, func):
def query(*args, **kwargs):
try:
dbManager = ApsUserDbManager.getInstance()
session = dbManager.openSession()
kwargs['session'] = session
try:
return func(*args, **kwargs)
except DmException, ex:
raise
except Exception, ex:
cls.getLogger().exception('%s' % ex)
raise DmException(exception=ex)
finally:
dbManager.closeSession(session)
return query
# Decorator for all DB transactions
@classmethod
def executeTransaction(cls, func):
def transaction(*args, **kwargs):
try:
dbManager = ApsUserDbManager.getInstance()
session = dbManager.openSession()
kwargs['session'] = session
try:
result = func(*args, **kwargs)
session.commit()
return result
except DmException, ex:
session.rollback()
raise
except Exception, ex:
session.rollback()
cls.getLogger().exception('%s' % ex)
raise DmException(exception=ex)
finally:
dbManager.closeSession(session)
return transaction
@classmethod
def getLogger(cls):
logger = LoggingManager.getInstance().getLogger(cls.__name__)
return logger
@classmethod
def executeConnectionQuery(cls, query):
connection = None
try:
connection = ApsUserDbManager.getInstance().acquireConnection()
try:
return connection.execute(query)
except DmException, ex:
raise
except Exception, ex:
cls.getLogger().exception('%s' % ex)
raise
finally:
ApsUserDbManager.getInstance().releaseConnection(connection)
def loadRelation(self, dbObject, relationName):
if not relationName in dir(dbObject):
raise InternalError('Relation %s not valid for class %s'
% (relationName, dbObject.__class__.__name__))
o = None
exec 'o = dbObject.%s' % (relationName)
return o
def loadRelations(self, dbObject, optionDict):
for k in optionDict.keys():
# The optionDict contains key-value pairs of relation name
# and a boolean to indicate whether to load that relation
if not optionDict[k]:
continue
try:
self.loadRelation(dbObject, k)
except InternalError, ex:
self.logger.error(ex)
def toDmObjectList(self, dbEntityList):
dmObjectList = []
for dbEntity in dbEntityList:
dmObjectList.append(dbEntity.getDmObject())
return dmObjectList
#######################################################################
# Testing.
if __name__ == '__main__':
api = ApsUserDbApiBase()