Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 1230 additions and 0 deletions
#!/usr/bin/env python
#
# Implementation for user info controller.
#
import datetime
import cherrypy
from dm.common.constants import dmRole
from dm.common.objects.authorizationPrincipal import AuthorizationPrincipal
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.objects.dmSession import DmSession
from dm.common.db.api.userDbApi import UserDbApi
from dm.common.exceptions.objectNotFound import ObjectNotFound
class AuthSessionControllerImpl(DmObjectManager):
""" User info controller implementation class. """
def __init__(self):
DmObjectManager.__init__(self)
self.userDbApi = UserDbApi()
def getAuthorizationPrincipal(self, username):
user = self.userDbApi.getUserWithPasswordByUsername(username)
principal = AuthorizationPrincipal(name=username, token=user.get('password'))
principal.setRole(dmRole.DM_USER_ROLE)
principal.setUserInfo(user)
for userSystemRoleName in user.get('userSystemRoleNameList', []):
if userSystemRoleName == dmRole.DM_ADMIN_ROLE:
principal.setRole(dmRole.DM_ADMIN_ROLE)
return principal
def addSession(self, sessionId, sessionInfo):
sessionCache = cherrypy.session.cache
self.logger.debug('Session cache length: %s' % (len(sessionCache)))
sessionCache[sessionId] = (sessionInfo, datetime.datetime.now())
self.logger.debug('Session cache: %s' % (sessionCache))
return DmSession(sessionInfo)
def checkSession(self, sessionId):
sessionCache = cherrypy.session.cache
sessionTuple = sessionCache.get(sessionId)
if not sessionTuple:
raise ObjectNotFound('Session %s not found in cache.' % sessionId)
sessionInfo = sessionTuple[0]
oldTimestamp = sessionTuple[1]
newTimestamp = datetime.datetime.now()
self.logger.debug('Updated timestamp from %s to %s for session id %s' % (oldTimestamp, newTimestamp, sessionId))
sessionCache[sessionId] = (sessionInfo, newTimestamp)
return DmSession(sessionInfo)
#!/usr/bin/env python
import threading
import time
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.singleton import Singleton
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.utility.fileUtility import FileUtility
from dm.common.db.api.experimentDbApi import ExperimentDbApi
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.exceptions.invalidRequest import InvalidRequest
from pathTracker import PathTracker
class ExperimentManager(Singleton):
CONFIG_SECTION_NAME = 'ExperimentManager'
STORAGE_DIRECTORY_KEY = 'storagedirectory'
STORAGE_ID_KEY = 'storageid'
MANAGE_STORAGE_PERMISSIONS_KEY = 'managestoragepermissions'
PLATFORM_UTILITY_KEY = 'platformutility'
RSYNC_SCRIPT_PERMISSIONS_MODE = 0755
FILE_PERMISSIONS_MODE = 0640
DIR_PERMISSIONS_MODE = 0750
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
ExperimentManager.__instanceLock.acquire()
try:
if ExperimentManager.__instance:
return
ExperimentManager.__instance = self
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.experimentDbApi = ExperimentDbApi()
self.pathTracker = PathTracker()
self.platformUtility = None
self.__configure()
self.fileProcessingManager = FileProcessingManager.getInstance()
self.logger.debug('Initialization complete')
finally:
ExperimentManager.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(ExperimentManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.storageDirectory = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_DIRECTORY_KEY)
self.storageId = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_ID_KEY)
self.manageStoragePermissions = ValueUtility.toBoolean(cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.MANAGE_STORAGE_PERMISSIONS_KEY))
platformUtility = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.PLATFORM_UTILITY_KEY)
if platformUtility:
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(platformUtility)
self.logger.debug('Creating platform utility class %s' % className)
self.platformUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.logger.debug('Manage storage permissions: %s' % self.manageStoragePermissions)
def __getExperimentStorageDataDirectory(self, experiment):
experimentTypeName = experiment.get('experimentType').get('rootDataPath')
experimentName = experiment.get('name')
storageDirectory = '%s/%s/%s' % (self.storageDirectory, experimentTypeName, experimentName)
storageDirectory = os.path.normpath(storageDirectory)
return storageDirectory
@ThreadingUtility.synchronize
def updateExperimentWithStorageDataDirectory(self, experiment):
storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
storageHost = ConfigurationManager.getInstance().getHost()
experiment['storageDirectory'] = storageDirectory
experiment['storageHost'] = storageHost
experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory)
return storageDirectory
def addUserToGroup(self, username, experimentName):
experiment = self.experimentDbApi.getExperimentWithUsers(experimentName)
storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
self.platformUtility.addUserToGroup(username, experimentName)
def deleteUserFromGroup(self, username, experimentName):
experiment = self.experimentDbApi.getExperimentWithUsers(experimentName)
storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
experimentUsers = experiment.get('experimentUsernameList', [])
self.platformUtility.setGroupUsers(experimentName, experimentUsers)
def createRsyncScript(self, username, experimentName):
fileName = '/tmp/rsync.%s.%s' % (username, experimentName)
self.logger.debug('Creating rsync script %s' % (fileName))
f = open(fileName, 'w')
f.write('#!/bin/sh\n')
f.write('exec sg %s "rsync $*"\n' % experimentName)
f.close()
OsUtility.chmodPath(fileName, fileMode=self.RSYNC_SCRIPT_PERMISSIONS_MODE)
def deleteRsyncScript(self, username, experimentName):
fileName = '/tmp/rsync.%s.%s' % (username, experimentName)
self.logger.debug('Removing rsync script %s' % (fileName))
OsUtility.removeFile(fileName)
def authorizeDownload(self, username, experimentName):
self.logger.debug('Authorizing download for %s from experiment %s' % (username, experimentName))
experiment = self.experimentDbApi.getExperimentByName(experimentName)
storageDirectory = self.updateExperimentWithStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
self.platformUtility.addLocalUserToGroup(username, experimentName)
else:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
self.createRsyncScript(username, experimentName)
return experiment
def deauthorizeDownload(self, username, experimentName):
experiment = self.experimentDbApi.getExperimentByName(experimentName)
self.logger.debug('De-authorizing download for %s from experiment %s' % (username, experimentName))
storageDirectory = self.updateExperimentWithStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
self.platformUtility.deleteLocalUserFromGroup(username, experimentName)
else:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
self.deleteRsyncScript(username, experimentName)
return experiment
def createExperimentGroup(self, experiment):
experimentName = experiment.get('name')
storageDirectory = experiment.get('storageDirectory')
# Create experiment group
self.platformUtility.createGroup(experimentName)
self.logger.debug('Setting permissions for %s to %s' % (storageDirectory, self.DIR_PERMISSIONS_MODE))
OsUtility.chmodPath(storageDirectory, dirMode=self.DIR_PERMISSIONS_MODE)
self.logger.debug('Changing group owner for %s to %s' % (storageDirectory, experimentName))
self.platformUtility.changePathGroupOwner(storageDirectory, experimentName)
ownerUpdateTime = time.time()
self.pathTracker.put(storageDirectory, ownerUpdateTime)
# Add users to group
experimentUsers = experiment.get('experimentUsernameList', [])
self.logger.debug('Found experiment users: %s', experimentUsers)
self.platformUtility.setGroupUsers(experimentName, experimentUsers)
def updateExperimentGroupUsers(self, experiment):
experimentName = experiment.get('name')
experimentUsers = experiment.get('experimentUsernameList', [])
self.platformUtility.setGroupUsers(experimentName, experimentUsers)
@ThreadingUtility.synchronize
def createExperimentDataDirectory(self, experiment):
experimentName = experiment.get('name')
storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
self.logger.debug('Data directory %s for experiment %s already exists' % (storageDirectory, experimentName))
else:
self.logger.debug('Creating data directory for experiment %s: %s' % (experimentName, storageDirectory))
OsUtility.createDir(storageDirectory)
storageHost = ConfigurationManager.getInstance().getHost()
experiment['storageDirectory'] = storageDirectory
experiment['storageHost'] = storageHost
experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory)
if self.manageStoragePermissions:
self.createExperimentGroup(experiment)
@ThreadingUtility.synchronize
def processExperimentFile(self, experimentFilePath, experiment, fileInfo={}):
experimentName = experiment.get('name')
self.updateExperimentWithStorageDataDirectory(experiment)
storageDirectory = experiment.get('storageDirectory')
filePath = os.path.join(storageDirectory, experimentFilePath)
fileInfo['filePath'] = filePath
fileInfo['experiment'] = experiment
if os.path.exists(filePath):
self.logger.debug('Processing file path %s (fileInfo: %s)' % (filePath, fileInfo))
if self.manageStoragePermissions:
self.logger.debug('Modifying permissions for %s' % filePath)
OsUtility.chmodPath(filePath, fileMode=self.FILE_PERMISSIONS_MODE)
self.logger.debug('Changing group owner for %s to %s' % (filePath, experimentName))
self.platformUtility.changePathGroupOwner(filePath, experimentName)
# Recursively modify subdirectory permissions
dirPath = os.path.dirname(filePath)
while (os.path.abspath(dirPath) != os.path.abspath(storageDirectory)):
if self.pathTracker.get(dirPath) is None:
self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName))
self.platformUtility.changePathGroupOwner(dirPath, experimentName)
ownerUpdateTime = time.time()
self.pathTracker.put(dirPath, ownerUpdateTime)
else:
self.logger.debug('Group owner for experiment subdirectory %s is already set to %s' % (dirPath, experimentName))
dirPath = os.path.dirname(dirPath)
self.logger.debug('Processing file %s' % filePath)
self.fileProcessingManager.processFile(fileInfo)
else:
self.logger.debug('File path %s does not exist' % filePath)
def statExperimentFile(self, experimentFilePath, experiment, fileInfo={}):
experimentName = experiment.get('name')
self.updateExperimentWithStorageDataDirectory(experiment)
storageDirectory = experiment.get('storageDirectory')
filePath = os.path.join(storageDirectory, experimentFilePath)
fileInfo['filePath'] = filePath
if os.path.exists(filePath):
self.logger.debug('Getting stat for file path %s' % (filePath))
FileUtility.statFile(filePath, fileInfo)
FileUtility.getMd5Sum(filePath, fileInfo)
self.logger.debug('File info after stat: %s' % str(fileInfo))
else:
self.logger.debug('File path %s does not exist' % filePath)
raise ObjectNotFound('File %s does not exist' % filePath)
@ThreadingUtility.synchronize
def start(self):
self.logger.debug('Started experiment manager')
@ThreadingUtility.synchronize
def stop(self):
self.logger.debug('Stopped experiment manager')
####################################################################
# Testing
if __name__ == '__main__':
em = ExperimentManager.getInstance()
print em
#!/usr/bin/env python
#
# Implementation for experiment session controller.
#
import time
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.db.api.experimentDbApi import ExperimentDbApi
from dm.ds_web_service.service.impl.experimentManager import ExperimentManager
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
def __init__(self):
DmObjectManager.__init__(self)
self.experimentDbApi = ExperimentDbApi()
def getExperimentTypes(self):
experimentTypeList = self.experimentDbApi.getExperimentTypes()
return experimentTypeList
def getExperiments(self):
experimentList = self.experimentDbApi.getExperiments()
return experimentList
def getExperimentByName(self, name):
experiment = self.experimentDbApi.getExperimentWithUsers(name)
ExperimentManager.getInstance().updateExperimentWithStorageDataDirectory(experiment)
return experiment
def getExperimentById(self, id):
experiment = self.experimentDbApi.getExperimentById(id)
ExperimentManager.getInstance().updateExperimentWithStorageDataDirectory(experiment)
return experiment
def addExperiment(self, name, experimentTypeId, description, startDate, endDate):
experiment = self.experimentDbApi.addExperiment(name, experimentTypeId, description, startDate, endDate)
return experiment
def startExperiment(self, name):
experiment = self.experimentDbApi.getExperimentWithUsers(name)
if experiment.get('startDate') is None:
experiment2 = self.experimentDbApi.setExperimentStartDateToNow(name)
experiment['startDate'] = experiment2['startDate']
ExperimentManager.getInstance().createExperimentDataDirectory(experiment)
return experiment
def updateExperiment(self, name):
experiment = self.experimentDbApi.getExperimentWithUsers(name)
if experiment.get('startDate') is not None:
ExperimentManager.getInstance().updateExperimentGroupUsers(experiment)
return experiment
def stopExperiment(self, name):
experiment = self.experimentDbApi.getExperimentByName(name)
if experiment.get('endDate') is None:
experiment = self.experimentDbApi.setExperimentEndDateToNow(name)
ExperimentManager.getInstance().updateExperimentWithStorageDataDirectory(experiment)
return experiment
def authorizeDownload(self, username, experimentName):
return ExperimentManager.getInstance().authorizeDownload(username, experimentName)
def deauthorizeDownload(self, username, experimentName):
return ExperimentManager.getInstance().deauthorizeDownload(username, experimentName)
#!/usr/bin/env python
#
# Implementation for file session controller.
#
import time
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.objects.fileMetadata import FileMetadata
from dm.common.db.api.experimentDbApi import ExperimentDbApi
from dm.ds_web_service.service.impl.experimentManager import ExperimentManager
class FileSessionControllerImpl(DmObjectManager):
""" File session controller implementation class. """
def __init__(self):
DmObjectManager.__init__(self)
self.experimentDbApi = ExperimentDbApi()
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
experiment = self.experimentDbApi.getExperimentByName(experimentName)
ExperimentManager.getInstance().processExperimentFile(experimentFilePath, experiment, fileInfo)
return FileMetadata(fileInfo)
def statFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
experiment = self.experimentDbApi.getExperimentByName(experimentName)
ExperimentManager.getInstance().statExperimentFile(experimentFilePath, experiment, fileInfo)
return FileMetadata(fileInfo)
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi
class MongoDbFileCatalogPlugin(FileProcessor):
def __init__(self):
FileProcessor.__init__(self)
self.fileMongoDbApi = FileMongoDbApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
self.logger.debug('Processing file %s for experiment %s' % (fileInfo, experimentName))
daqInfo = fileInfo.get('daqInfo')
storageDirectory = daqInfo.get('storageDirectory')
storageHost = daqInfo.get('storageHost')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
fileProcessingTime = time.time()
fileProcessingTimeStamp = TimeUtility.formatLocalTimeStamp(fileProcessingTime)
# Prepare catalogging entry
fileInfo2 = {}
for key in ['md5Sum', 'fileSize', 'fileCreationTime', 'fileCreationTimeStamp']:
if fileInfo.has_key(key):
fileInfo2[key] = fileInfo.get(key, '')
fileInfo2['fileName'] = os.path.basename(experimentFilePath)
fileInfo2['storageDirectory'] = storageDirectory
fileInfo2['storageHost'] = storageHost
fileInfo2['storageFilePath'] = storageFilePath
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileInfo2['fileProcessingTime'] = fileProcessingTime
fileInfo2['fileProcessingTimeStamp'] = fileProcessingTimeStamp
self.logger.debug('Daq info: %s' % (daqInfo))
fileInfo2.update(daqInfo)
if daqInfo.has_key('id'):
fileInfo2['daqId'] = daqInfo.get('id')
del fileInfo2['id']
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dm.common.utility.objectTracker import ObjectTracker
class PathTracker(ObjectTracker):
# Cache configuration
pass
####################################################################
# Testing
if __name__ == '__main__':
et = PathTracker.getInstance()
print et
et2 = PathTracker.getInstance()
print et2
et.put('xyz', 1)
print et.get('xyz')
print et.get('xyz2')
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi
from dm.common.utility.dmSubprocess import DmSubprocess
class ScriptProcessingPlugin(FileProcessor):
PROCESSING_SCRIPT_KEY = 'processingScript'
def __init__(self):
FileProcessor.__init__(self)
self.fileMongoDbApi = FileMongoDbApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentName = fileInfo.get('experimentName')
experimentFilePath = fileInfo.get('experimentFilePath')
daqInfo = fileInfo.get('daqInfo', {})
processingScript = daqInfo.get(self.PROCESSING_SCRIPT_KEY)
if not processingScript:
self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName))
return
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
storageDirectory = fileInfo.get('storageDirectory')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
# Process file
try:
p = DmSubprocess('%s %s' % (processingScript, storageFilePath))
p.run()
stdOut = p.getStdOut()
except Exception, ex:
self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex))
return
# Prepare catalogging entry
fileInfo2 = {}
fileInfo2['fileName'] = os.path.basename(experimentFilePath)
fileInfo2['experimentName'] = experimentName
fileInfo2['processingScriptOutput'] = '%s' % stdOut.strip()
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi
class SddsParameterProcessingPlugin(FileProcessor):
PROCESS_SDDS_PARAMETERS_KEY = 'processSddsParameters'
def __init__(self):
FileProcessor.__init__(self)
self.fileMongoDbApi = FileMongoDbApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
daqInfo = fileInfo.get('daqInfo', {})
processSddsParameters = daqInfo.get(self.PROCESS_SDDS_PARAMETERS_KEY)
if not processSddsParameters:
self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName))
return
self.logger.debug('Processing file %s for experiment %s' % (fileInfo, experimentName))
storageDirectory = fileInfo.get('storageDirectory')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
# Load file
try:
import sddsdata
from sdds import SDDS
s = SDDS(0)
self.logger.error('Loading file %s for experiment %s' % (experimentFilePath, experimentName))
s.load(storageFilePath)
except Exception, ex:
self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex))
return
# Prepare catalogging entry
fileInfo2 = {}
fileInfo2['fileName'] = os.path.basename(experimentFilePath)
fileInfo2['experimentName'] = experimentName
for i in range(0,len(s.parameterName)):
parameterName = s.parameterName[i]
parameterData = s.parameterData[i][0]
fileInfo2[parameterName] = parameterData
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
self.logger.error('SDDS terminate file %s for experiment %s' % (experimentFilePath, experimentName))
sddsdata.Terminate(0)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.utility.dmSubprocess import DmSubprocess
class SgeJobSubmissionPlugin(FileProcessor):
SGE_JOB_SCRIPT_KEY = 'sgeJobScript'
def __init__(self, sgeRoot):
FileProcessor.__init__(self)
self.sgeRoot = sgeRoot
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentName = fileInfo.get('experimentName')
experimentFilePath = fileInfo.get('experimentFilePath')
daqInfo = fileInfo.get('daqInfo', {})
sgeJobScript = daqInfo.get(self.SGE_JOB_SCRIPT_KEY)
if not sgeJobScript:
self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName))
return
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
storageDirectory = fileInfo.get('storageDirectory')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
# Process file
try:
p = DmSubprocess('. %s/default/common/settings.sh ; qsub -v DM_EXPERIMENT_NAME=%s %s %s' % (self.sgeRoot, experimentName, sgeJobScript, storageFilePath))
p.run()
except Exception, ex:
self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex))
return
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
#
# Implementation for user info controller.
#
from dm.common.objects.dmObject import DmObject
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.db.api.userDbApi import UserDbApi
from dm.ds_web_service.service.impl.experimentManager import ExperimentManager
class UserInfoSessionControllerImpl(DmObjectManager):
""" User info controller implementation class. """
def __init__(self):
DmObjectManager.__init__(self)
self.userDbApi = UserDbApi()
def getUsers(self):
return self.userDbApi.getUsers()
def getUserById(self, id):
return self.userDbApi.getUserById(id)
def getUserByUsername(self, username):
return self.userDbApi.getUserByUsername(username)
def addUserExperimentRole(self, username, experimentName, roleName):
userInfo = self.userDbApi.addUserExperimentRole(username, experimentName, roleName)
ExperimentManager.getInstance().addUserToGroup(username, experimentName)
return userInfo
def deleteUserExperimentRole(self, username, experimentName, roleName):
userInfo = self.userDbApi.deleteUserExperimentRole(username, experimentName, roleName)
ExperimentManager.getInstance().deleteUserFromGroup(username, experimentName)
return userInfo
#!/usr/bin/env python
import cherrypy
from dm.common.service.dmSessionController import DmSessionController
from dm.ds_web_service.service.impl.userInfoSessionControllerImpl import UserInfoSessionControllerImpl
class UserInfoSessionController(DmSessionController):
def __init__(self):
DmSessionController.__init__(self)
self.userInfoSessionControllerImpl = UserInfoSessionControllerImpl()
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def getUsers(self, **kwargs):
return self.listToJson(self.userInfoSessionControllerImpl.getUsers())
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def getUserById(self, id, **kwargs):
if not id:
raise InvalidRequest('Invalid id provided.')
response = self.userInfoSessionControllerImpl.getUserById(id).getFullJsonRep()
self.logger.debug('Returning user info for %s: %s' % (id,response))
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def getUserByUsername(self, username, **kwargs):
if not len(username):
raise InvalidRequest('Invalid username provided.')
response = self.userInfoSessionControllerImpl.getUserByUsername(username).getFullJsonRep()
self.logger.debug('Returning user info for %s: %s' % (username,response))
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def addUserExperimentRole(self, username, experimentName, roleName, **kwargs):
if not username:
raise InvalidRequest('Invalid username provided.')
if not experimentName:
raise InvalidRequest('Invalid experiment name provided.')
if not roleName:
raise InvalidRequest('Invalid role name provided.')
response = self.userInfoSessionControllerImpl.addUserExperimentRole(username, experimentName, roleName).getFullJsonRep()
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def deleteUserExperimentRole(self, username, experimentName, roleName, **kwargs):
if not username:
raise InvalidRequest('Invalid username provided.')
if not experimentName:
raise InvalidRequest('Invalid experiment name provided.')
if not roleName:
raise InvalidRequest('Invalid role name provided.')
response = self.userInfoSessionControllerImpl.deleteUserExperimentRole(username, experimentName, roleName).getFullJsonRep()
return response
#!/usr/bin/env python
#
# User route descriptor.
#
from dm.common.utility.configurationManager import ConfigurationManager
from userInfoSessionController import UserInfoSessionController
class UserRouteDescriptor:
@classmethod
def getRoutes(cls):
contextRoot = ConfigurationManager.getInstance().getContextRoot()
# Static instances shared between different routes
userInfoSessionController = UserInfoSessionController()
# Define routes.
routes = [
# Get user info list
{
'name' : 'getUsers',
'path' : '%s/users' % contextRoot,
'controller' : userInfoSessionController,
'action' : 'getUsers',
'method' : ['GET']
},
# Get user by id
{
'name' : 'getUserById',
'path' : '%s/users/:(id)' % contextRoot,
'controller' : userInfoSessionController,
'action' : 'getUserById',
'method' : ['GET']
},
# Get user by username
{
'name' : 'getUserByUsername',
'path' : '%s/usersByUsername/:(username)' % contextRoot,
'controller' : userInfoSessionController,
'action' : 'getUserByUsername',
'method' : ['GET']
},
# Add user experiment role
{
'name' : 'addUserExperimentRole',
'path' : '%s/usersByExperiment/:(username)/:(experimentName)/:(roleName)' % contextRoot,
'controller' : userInfoSessionController,
'action' : 'addUserExperimentRole',
'method' : ['POST']
},
# Delete user experiment role
{
'name' : 'deleteUserExperimentRole',
'path' : '%s/usersByExperiment/:(username)/:(experimentName)/:(roleName)' % contextRoot,
'controller' : userInfoSessionController,
'action' : 'deleteUserExperimentRole',
'method' : ['DELETE']
},
]
return routes
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry kind="src" path="resources"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="/home/phoebus/BFROSIK/trunk/tools/AccountSynchronizer/lib/ojdbc7.jar"/>
<classpathentry kind="lib" path="/home/phoebus/BFROSIK/trunk/tools/AccountSynchronizer/lib/postgresql-9.4-1201.jdbc4.jar"/>
<classpathentry kind="lib" path="/home/phoebus/BFROSIK/trunk/tools/AccountSynchronizer/lib/unboundid-ldapsdk-me.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>accountSynchronizer</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
Main-Class: gov.anl.aps.dm.sync.Synchronizer
Class-Path: . ../../lib/postgresql-9.4-1201.jdbc4.jar ../../lib/ojdbc7.jar
### log congiguration
log.file = accountSynchronizer%g.log
log.limit = 6000
log.count = 2
###
### Oracle database settings
###
oracle.database.connection = jdbc:oracle:thin:@ra.aps.anl.gov:1527:aps1
oracle.database.username = glob_conn
#oracle.database.password =
oracle.database.table = DCC.FL$03_BL_APV_VIEW_V2
dm.database.connection = jdbc:postgresql://127.0.0.1:11136/dm
dm.database.username = dm
### dm.database.password =
# The user name prefix that gets append to every badge number.
user.userid.prefix = d
package gov.anl.aps.dm.sync;
public class DmUser {
String username;
String firstName;
String lastName;
String middleName;
String email;
String badge;
String password;
void clear() {
username = null;
firstName = null;
lastName = null;
middleName = null;
email = null;
badge = null;
password = null;
}
}
package gov.anl.aps.dm.sync;
import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
public class OracleConnection {
Connection connection = null;
private Logger logger;
public OracleConnection(Logger logger) {
this.logger = logger;
}
void connect(Properties config) throws SQLException {
try {
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
} catch (SQLException e) {
logger.log(Level.SEVERE, "Can't establish Oracle Driver", e);
throw e;
}
// logger.log(Level.INFO, "Oracle JDBC Driver Registered!");
try {
connection = DriverManager.getConnection(
config.getProperty("oracle.database.connection"),
config.getProperty("oracle.database.username"),
config.getProperty("oracle.database.password"));
} catch (SQLException e) {
logger.log(Level.SEVERE, "Can't connect to Oracle data base", e);
throw e;
}
}
ResultSet getUsers(String table) throws SQLException {
try {
Statement statement = connection.createStatement();
return statement.executeQuery("select * from " + table);
} catch (SQLException ex) {
logger.log(Level.SEVERE, "table {0} does not exist", table);
throw ex;
}
}
void close() {
try {
connection.close();
} catch (SQLException ex) {
logger.log(Level.SEVERE, "problem closing Oracle connection ", ex);
}
}
}
package gov.anl.aps.dm.sync;
import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
public class PsqlConnection {
Connection connection = null;
private final Map<String, Timestamp> dmUsers = new HashMap<>(); // <username, lastUpdate>
private PreparedStatement insertQuery;
private PreparedStatement updateQuery;
private final String userTable = "user_info";
private Logger logger;
public PsqlConnection(Logger logger) {
this.logger = logger;
}
void connect(Properties config) throws SQLException {
try {
Class.forName("org.postgresql.Driver");
} catch (ClassNotFoundException e) {
logger.log(Level.SEVERE, "Can't establish PosgreSQL Driver", e);
System.exit(0);
}
// logger.log(Level.INFO, "PostgreSQL JDBC Driver Registered!");
try {
connection = DriverManager.getConnection(
config.getProperty("dm.database.connection"),
config.getProperty("dm.database.username"),
config.getProperty("dm.database.password"));
} catch (SQLException e) {
logger.log(Level.SEVERE, "Can't connect to PostgreSQL data base");
throw e;
}
}
void init() throws SQLException {
Statement statement = null;
try {
statement = connection.createStatement();
} catch (SQLException ex) {
logger.log(Level.SEVERE, "can't create statement for postgreSQL connection");
throw ex;
}
ResultSet results = null;
try {
if (statement != null) {
results = statement.executeQuery("SELECT * FROM " + userTable + ";");
}
} catch (SQLException ex) {
logger.log(Level.SEVERE, "can't execute SELECT query from " + userTable + " table");
throw ex;
}
try {
if (results != null) {
while (results.next()) {
String username = results.getString("username");
Timestamp lastUpdate = results.getTimestamp("last_update");
dmUsers.put(username, lastUpdate);
}
results.close();
}
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
logger.log(Level.WARNING, "query results processing error ", ex);
}
// prepare statements
try {
insertQuery = connection.prepareStatement ("INSERT INTO user_info (username, first_name, last_name, middle_name, email, badge, is_local_user, last_update) VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
updateQuery = connection.prepareStatement ("UPDATE user_info SET email = ?, last_name = ?, last_update = ? WHERE username = ?");
} catch (SQLException ex) {
logger.log(Level.SEVERE, "Can't get peaparedStatement from connection ", ex);
throw ex;
}
}
protected Timestamp getUserLastUpdate(String username) {
return dmUsers.get(username);
}
boolean isUser(String username) {
return dmUsers.containsKey(username);
}
void addUser(DmUser dmuser, Timestamp currentTime) {
try {
insertQuery.setString(1, dmuser.username);
insertQuery.setString(2, dmuser.firstName);
insertQuery.setString(3, dmuser.lastName);
insertQuery.setString(4, dmuser.middleName);
insertQuery.setString(5, dmuser.email);
insertQuery.setString(6, dmuser.badge);
insertQuery.setBoolean(7, false);
insertQuery.setTimestamp(8, currentTime);
insertQuery.execute();
} catch (SQLException ex) {
logger.log(Level.SEVERE, "can't execute query to add user ", ex);
}
}
void updateUser(DmUser dmuser, Timestamp currentTime) {
try {
updateQuery.setString(1, dmuser.email);
updateQuery.setString(2, dmuser.lastName);
updateQuery.setTimestamp(3, currentTime);
updateQuery.setString(4, dmuser.username);
updateQuery.executeUpdate();
} catch (SQLException ex) {
logger.log(Level.SEVERE, "can't execute query to update user", ex);
}
}
void close() {
try {
insertQuery.close();
updateQuery.close();
connection.close();
} catch (SQLException ex) {
logger.log(Level.SEVERE, "problem closing PostgreSQL connection ", ex);
}
}
}
package gov.anl.aps.dm.sync;
import java.io.IOException;
import java.io.InputStream;
import java.io.FileInputStream;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Properties;
import java.util.logging.FileHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Synchronizer {
private OracleConnection oConnection;
private PsqlConnection pConnection;
private final Properties config = new Properties();
private final Properties tableRows = new Properties();
private final String oracleTable = "oracleTable.properties";
private FileHandler fileHandler;
private static final Logger logger = Logger.getLogger("AccountSynchronizerLogger");
public static void main(String[] argv) {
Synchronizer sync = new Synchronizer();
sync.loadProperties(argv[0]);
sync.initLogger();
sync.oConnection = new OracleConnection(logger);
sync.pConnection = new PsqlConnection(logger);
sync.connectAndInit();
sync.synchronize();
}
private void loadProperties(String configFile) {
InputStream configInputStream = null;
try {
configInputStream = new FileInputStream(configFile);
} catch (Exception e) {
e.printStackTrace();
}
if (configInputStream != null) {
try {
this.config.load(configInputStream);
} catch (IOException ex) {
initDefaultLogger();
logger.log(Level.SEVERE, "can't load configuration ", ex);
System.exit(-1);
}
} else {
initDefaultLogger();
logger.log(Level.SEVERE, "can't find configuration file");
System.exit(-1);
}
InputStream tableInputStream = getClass().getClassLoader().getResourceAsStream(oracleTable);
if (tableInputStream != null) {
try {
this.tableRows.load(tableInputStream);
} catch (IOException ex) {
logger.log(Level.SEVERE, "can't load Oracle table rows definitions ", ex);
System.exit(-1);
}
}
}
private void initDefaultLogger() {
try {
fileHandler = new FileHandler("accountSynchronizer.log");
logger.addHandler(fileHandler);
logger.log(Level.WARNING, "Using default logger ");
} catch (SecurityException | IOException ex) {
ex.printStackTrace();
System.exit(-1);
}
}
private void initLogger() {
String pattern = config.getProperty("log.file");
int limit = 0;
int count = 0;
try {
String limitProperty = config.getProperty("log.limit");
limit = Integer.decode(limitProperty);
count = Integer.decode(config.getProperty("log.count"));
} catch (NumberFormatException e) {
initDefaultLogger();
logger.log(Level.WARNING, "The logger limit or count are not configured properly. ", e);
System.exit(-1);
}
try {
fileHandler = new FileHandler(pattern, limit, count);
logger.addHandler(fileHandler);
} catch (SecurityException | IOException e) {
e.printStackTrace();
System.exit(-1);
}
}
private void connectAndInit() {
try {
pConnection.connect(config);
}
catch (SQLException e) {
System.exit(-1);
}
try {
oConnection.connect(config);
}
catch (SQLException e) {
pConnection.close();
System.exit(-1);
}
try {
pConnection.init();
}
catch (SQLException e) {
pConnection.close();
oConnection.close();
System.exit(-1);
}
}
private void synchronize() {
java.util.Calendar cal = java.util.Calendar.getInstance();
Timestamp current = new Timestamp(cal.getTimeInMillis());
DmUser dmuser = new DmUser();
try {
ResultSet results = oConnection.getUsers(config.getProperty("oracle.database.table"));
while (results.next()) {
String badge = results.getString("BADGE_NO");
String username = config.getProperty("user.userid.prefix") + badge;
if (pConnection.isUser(username)) {
Timestamp userLastUpdate = pConnection.getUserLastUpdate(username);
if ((results.getTimestamp("LAST_CHANGE_DATE") == null) || (userLastUpdate == null) || (results.getTimestamp("LAST_CHANGE_DATE").compareTo(userLastUpdate) >= 0)) {
// update user
dmuser.email = results.getString("EMAIL");
dmuser.username = username;
dmuser.lastName = results.getString("LAST_NAME");
pConnection.updateUser(dmuser, current);
dmuser.clear();
}
} else {
// add user
dmuser.badge = badge;
dmuser.email = results.getString("EMAIL");
dmuser.firstName = results.getString("FIRST_NAME");
dmuser.lastName = results.getString("LAST_NAME");
dmuser.middleName = results.getString("MIDDLE_NAME");
dmuser.username = username;
pConnection.addUser(dmuser, current);
dmuser.clear();
}
}
} catch (SQLException ex) {
logger.log(Level.SEVERE, "table processing error ", ex);
}
finally {
pConnection.close();
oConnection.close();
}
}
}