Forked from
DM / dm-docs
261 commits behind, 766 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
experimentManager.py 14.70 KiB
#!/usr/bin/env python
import threading
import time
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.singleton import Singleton
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.utility.fileUtility import FileUtility
from dm.common.db.api.experimentDbApi import ExperimentDbApi
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.exceptions.invalidRequest import InvalidRequest
from pathTracker import PathTracker
class ExperimentManager(Singleton):
CONFIG_SECTION_NAME = 'ExperimentManager'
STORAGE_DIRECTORY_KEY = 'storagedirectory'
STORAGE_ID_KEY = 'storageid'
MANAGE_STORAGE_PERMISSIONS_KEY = 'managestoragepermissions'
PLATFORM_UTILITY_KEY = 'platformutility'
RSYNC_SCRIPT_PERMISSIONS_MODE = 0755
FILE_PERMISSIONS_MODE = 0640
FILE_PERMISSIONS_MODE_STRING = '0640'
DIR_PERMISSIONS_MODE = 0750
DIRECTORY_PROCESSING_DELAY_IN_SECONDS = 1
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
ExperimentManager.__instanceLock.acquire()
try:
if ExperimentManager.__instance:
return
ExperimentManager.__instance = self
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.experimentDbApi = ExperimentDbApi()
self.pathTracker = PathTracker()
self.platformUtility = None
self.__configure()
self.fileProcessingManager = FileProcessingManager.getInstance()
self.logger.debug('Initialization complete')
finally:
ExperimentManager.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(ExperimentManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.storageDirectory = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_DIRECTORY_KEY)
self.storageId = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_ID_KEY)
self.manageStoragePermissions = ValueUtility.toBoolean(cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.MANAGE_STORAGE_PERMISSIONS_KEY))
platformUtility = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.PLATFORM_UTILITY_KEY)
if platformUtility:
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(platformUtility)
self.logger.debug('Creating platform utility class %s' % className)
self.platformUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.logger.debug('Manage storage permissions: %s' % self.manageStoragePermissions)
def __getExperimentStorageDataDirectory(self, experiment):
experimentTypeName = experiment.get('experimentType').get('name')
experimentName = experiment.get('name')
storageDirectory = '%s/%s/%s' % (self.storageDirectory, experimentTypeName, experimentName)
storageDirectory = os.path.normpath(storageDirectory)
return storageDirectory
@ThreadingUtility.synchronize
def updateExperimentWithStorageDataDirectory(self, experiment):
storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
storageHost = ConfigurationManager.getInstance().getHost()
experiment['storageDirectory'] = storageDirectory
experiment['storageHost'] = storageHost
experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory)
return storageDirectory
def addUserToGroup(self, username, experimentName):
experiment = self.experimentDbApi.getExperimentWithUsers(experimentName)
storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
self.platformUtility.addUserToGroup(username, experimentName)
def deleteUserFromGroup(self, username, experimentName):
experiment = self.experimentDbApi.getExperimentWithUsers(experimentName)
storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
experimentUsers = experiment.get('experimentUsernameList', [])
self.platformUtility.setGroupUsers(experimentName, experimentUsers)
def createRsyncScript(self, username, experimentName):
fileName = '/tmp/rsync.%s.%s' % (username, experimentName)
self.logger.debug('Creating rsync script %s' % (fileName))
f = open(fileName, 'w')
f.write('#!/bin/sh\n')
f.write('exec sg %s "rsync $*"\n' % experimentName)
f.close()
OsUtility.chmodPath(fileName, fileMode=self.RSYNC_SCRIPT_PERMISSIONS_MODE)
def deleteRsyncScript(self, username, experimentName):
fileName = '/tmp/rsync.%s.%s' % (username, experimentName)
self.logger.debug('Removing rsync script %s' % (fileName))
OsUtility.removeFile(fileName)
def authorizeDownload(self, username, experiment):
experimentName = experiment['name']
self.logger.debug('Authorizing download for %s from experiment %s' % (username, experimentName))
storageDirectory = self.updateExperimentWithStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
self.platformUtility.addLocalUserToGroup(username, experimentName)
else:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
self.createRsyncScript(username, experimentName)
return experiment
def deauthorizeDownload(self, username, experiment):
experimentName = experiment['name']
self.logger.debug('De-authorizing download for %s from experiment %s' % (username, experimentName))
storageDirectory = self.updateExperimentWithStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
self.platformUtility.deleteLocalUserFromGroup(username, experimentName)
else:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
self.deleteRsyncScript(username, experimentName)
return experiment
def createExperimentGroup(self, experiment):
experimentName = experiment.get('name')
storageDirectory = experiment.get('storageDirectory')
# Create experiment group
self.platformUtility.createGroup(experimentName)
self.logger.debug('Setting permissions for %s to %s' % (storageDirectory, self.DIR_PERMISSIONS_MODE))
OsUtility.chmodPath(storageDirectory, dirMode=self.DIR_PERMISSIONS_MODE)
self.logger.debug('Changing group owner for %s to %s' % (storageDirectory, experimentName))
self.platformUtility.changePathGroupOwner(storageDirectory, experimentName)
ownerUpdateTime = time.time()
self.pathTracker.put(storageDirectory, ownerUpdateTime)
# Add users to group
experimentUsers = experiment.get('experimentUsernameList', [])
self.logger.debug('Found experiment users: %s', experimentUsers)
self.platformUtility.setGroupUsers(experimentName, experimentUsers)
def updateExperimentGroupUsers(self, experiment):
experimentName = experiment.get('name')
experimentUsers = experiment.get('experimentUsernameList', [])
self.platformUtility.setGroupUsers(experimentName, experimentUsers)
@ThreadingUtility.synchronize
def createExperimentDataDirectory(self, experiment):
experimentName = experiment.get('name')
storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
self.logger.debug('Data directory %s for experiment %s already exists' % (storageDirectory, experimentName))
else:
self.logger.debug('Creating data directory for experiment %s: %s' % (experimentName, storageDirectory))
OsUtility.createDir(storageDirectory)
storageHost = ConfigurationManager.getInstance().getHost()
experiment['storageDirectory'] = storageDirectory
experiment['storageHost'] = storageHost
experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory)
if self.manageStoragePermissions:
self.createExperimentGroup(experiment)
def processExperimentFile(self, experimentFilePath, experiment, fileInfo={}):
experimentName = experiment.get('name')
self.updateExperimentWithStorageDataDirectory(experiment)
storageDirectory = experiment.get('storageDirectory')
filePath = os.path.join(storageDirectory, experimentFilePath)
fileInfo['filePath'] = filePath
fileInfo['experiment'] = experiment
if not os.path.exists(filePath):
self.logger.error('File path %s does not exist' % filePath)
return
if self.manageStoragePermissions:
self.logger.debug('Setting permissions for %s to %s' % (filePath,self.FILE_PERMISSIONS_MODE_STRING))
OsUtility.chmodPath(filePath, fileMode=self.FILE_PERMISSIONS_MODE)
self.logger.debug('Changing group owner for %s to %s' % (filePath, experimentName))
self.platformUtility.changePathGroupOwner(filePath, experimentName)
# Recursively modify subdirectory permissions
dirPath = os.path.dirname(filePath)
while (os.path.abspath(dirPath) != os.path.abspath(storageDirectory)):
if self.pathTracker.get(dirPath) is None:
self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName))
self.platformUtility.changePathGroupOwner(dirPath, experimentName)
ownerUpdateTime = time.time()
self.pathTracker.put(dirPath, ownerUpdateTime)
else:
self.logger.debug('Group owner for experiment subdirectory %s is already set to %s' % (dirPath, experimentName))
dirPath = os.path.dirname(dirPath)
self.logger.debug('Processing file path %s (fileInfo: %s)' % (filePath, fileInfo))
self.fileProcessingManager.processFile(fileInfo)
def statExperimentFile(self, experimentFilePath, experiment, fileInfo={}):
experimentName = experiment.get('name')
self.updateExperimentWithStorageDataDirectory(experiment)
storageDirectory = experiment.get('storageDirectory')
filePath = os.path.join(storageDirectory, experimentFilePath)
fileInfo['filePath'] = filePath
if os.path.exists(filePath):
self.logger.debug('Getting stat for file path %s' % (filePath))
FileUtility.statFile(filePath, fileInfo)
FileUtility.getMd5Sum(filePath, fileInfo)
self.logger.debug('File info after stat: %s' % str(fileInfo))
else:
self.logger.debug('File path %s does not exist' % filePath)
raise ObjectNotFound('File %s does not exist' % filePath)
def processExperimentDirectory(self, experimentDirectoryPath, experiment, directoryInfo={}):
experimentName = experiment.get('name')
self.updateExperimentWithStorageDataDirectory(experiment)
storageDirectory = experiment.get('storageDirectory')
directoryPath = os.path.join(storageDirectory, experimentDirectoryPath)
directoryInfo['directoryPath'] = directoryPath
directoryInfo['experiment'] = experiment
if not self.manageStoragePermissions:
self.logger.error('Skipping permission management for directory path %s' % directoryPath)
return
self.logger.debug('Processing directory path %s in background' % (directoryPath))
timer = threading.Timer(self.DIRECTORY_PROCESSING_DELAY_IN_SECONDS, self.__processExperimentDirectory, args=[experimentDirectoryPath, experiment, directoryInfo])
timer.start()
def __processExperimentDirectory(self, experimentDirectoryPath, experiment, directoryInfo={}):
experimentName = experiment.get('name')
storageDirectory = experiment.get('storageDirectory')
directoryPath = directoryInfo.get('directoryPath')
if not os.path.exists(directoryPath):
self.logger.error('Directory path %s does not exist' % directoryPath)
return
# Modify ownership
self.logger.debug('Processing directory path %s (directoryInfo: %s)' % (directoryPath, directoryInfo))
self.logger.debug('Modifying permissions for directory %s to %s' % (directoryPath, self.DIR_PERMISSIONS_MODE))
OsUtility.chmodPath(directoryPath, dirMode=self.DIR_PERMISSIONS_MODE)
self.logger.debug('Changing group owner for %s to %s' % (directoryPath, experimentName))
self.platformUtility.recursivelyChangePathGroupOwner(directoryPath, experimentName)
# Recursively modify subdirectory permissions
dirPath = os.path.dirname(directoryPath)
while (os.path.abspath(dirPath) != os.path.abspath(storageDirectory)):
if self.pathTracker.get(dirPath) is None:
self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName))
self.platformUtility.changePathGroupOwner(dirPath, experimentName)
ownerUpdateTime = time.time()
self.pathTracker.put(dirPath, ownerUpdateTime)
else:
self.logger.debug('Group owner for experiment subdirectory %s is already set to %s' % (dirPath, experimentName))
dirPath = os.path.dirname(dirPath)
# Update file permissions
self.logger.debug('Changing permissions for all files in %s for experiment %s' % (directoryPath, experimentName))
self.platformUtility.chmodPathForFilesInDirectory(directoryPath, self.FILE_PERMISSIONS_MODE_STRING)
@ThreadingUtility.synchronize
def start(self):
self.logger.debug('Started experiment manager')
@ThreadingUtility.synchronize
def stop(self):
self.logger.debug('Stopped experiment manager')
####################################################################
# Testing
if __name__ == '__main__':
em = ExperimentManager.getInstance()
print em