Skip to content
Snippets Groups Projects
Forked from DM / dm-docs
261 commits behind, 766 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
experimentManager.py 14.70 KiB
#!/usr/bin/env python

import threading
import time
import os

from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.singleton import Singleton
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.utility.fileUtility import FileUtility
from dm.common.db.api.experimentDbApi import ExperimentDbApi
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.exceptions.invalidRequest import InvalidRequest
from pathTracker import PathTracker

class ExperimentManager(Singleton):

    CONFIG_SECTION_NAME = 'ExperimentManager'
    STORAGE_DIRECTORY_KEY = 'storagedirectory'
    STORAGE_ID_KEY = 'storageid'
    MANAGE_STORAGE_PERMISSIONS_KEY = 'managestoragepermissions'
    PLATFORM_UTILITY_KEY = 'platformutility'

    RSYNC_SCRIPT_PERMISSIONS_MODE = 0755
    FILE_PERMISSIONS_MODE = 0640
    FILE_PERMISSIONS_MODE_STRING = '0640'
    DIR_PERMISSIONS_MODE = 0750

    DIRECTORY_PROCESSING_DELAY_IN_SECONDS = 1

    # Singleton.
    __instanceLock = threading.RLock()
    __instance = None

    def __init__(self):
        ExperimentManager.__instanceLock.acquire()
        try:
            if ExperimentManager.__instance:
                return
            ExperimentManager.__instance = self
            self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)

            self.logger.debug('Initializing')
            self.lock = threading.RLock()
            self.experimentDbApi = ExperimentDbApi()
            self.pathTracker = PathTracker()
            self.platformUtility = None
            self.__configure()
            self.fileProcessingManager = FileProcessingManager.getInstance()
            self.logger.debug('Initialization complete')
        finally:
            ExperimentManager.__instanceLock.release()

    def __configure(self):
        cm = ConfigurationManager.getInstance()
        configItems = cm.getConfigItems(ExperimentManager.CONFIG_SECTION_NAME)
        self.logger.debug('Got config items: %s' % configItems)
        self.storageDirectory = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_DIRECTORY_KEY) 
        self.storageId = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_ID_KEY) 
        self.manageStoragePermissions = ValueUtility.toBoolean(cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.MANAGE_STORAGE_PERMISSIONS_KEY))
        platformUtility = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.PLATFORM_UTILITY_KEY)
        if platformUtility:
            (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(platformUtility)
            self.logger.debug('Creating platform utility class %s' % className)
            self.platformUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor)
                                                                            
        self.logger.debug('Manage storage permissions: %s' % self.manageStoragePermissions)

    def __getExperimentStorageDataDirectory(self, experiment):
        experimentTypeName = experiment.get('experimentType').get('name')
        experimentName = experiment.get('name')
        storageDirectory = '%s/%s/%s' % (self.storageDirectory, experimentTypeName, experimentName)
        storageDirectory = os.path.normpath(storageDirectory)
        return storageDirectory

    @ThreadingUtility.synchronize
    def updateExperimentWithStorageDataDirectory(self, experiment):
        storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
        if os.path.exists(storageDirectory):
            storageHost = ConfigurationManager.getInstance().getHost()
            experiment['storageDirectory'] = storageDirectory
            experiment['storageHost'] = storageHost
            experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory)
        return storageDirectory

    def addUserToGroup(self, username, experimentName):
        experiment = self.experimentDbApi.getExperimentWithUsers(experimentName)
        storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
        if os.path.exists(storageDirectory):
            self.platformUtility.addUserToGroup(username, experimentName)

    def deleteUserFromGroup(self, username, experimentName):
        experiment = self.experimentDbApi.getExperimentWithUsers(experimentName)
        storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
        if os.path.exists(storageDirectory):
            experimentUsers = experiment.get('experimentUsernameList', [])
            self.platformUtility.setGroupUsers(experimentName, experimentUsers)

    def createRsyncScript(self, username, experimentName):
        fileName = '/tmp/rsync.%s.%s' % (username, experimentName)
        self.logger.debug('Creating rsync script %s' % (fileName))
        f = open(fileName, 'w')
        f.write('#!/bin/sh\n')
        f.write('exec sg %s "rsync $*"\n' % experimentName)
        f.close()
        OsUtility.chmodPath(fileName, fileMode=self.RSYNC_SCRIPT_PERMISSIONS_MODE)
    def deleteRsyncScript(self, username, experimentName):
        fileName = '/tmp/rsync.%s.%s' % (username, experimentName)
        self.logger.debug('Removing rsync script %s' % (fileName))
        OsUtility.removeFile(fileName)

    def authorizeDownload(self, username, experiment):
        experimentName = experiment['name']
        self.logger.debug('Authorizing download for %s from experiment %s' % (username, experimentName))
        storageDirectory = self.updateExperimentWithStorageDataDirectory(experiment)
        if os.path.exists(storageDirectory):
            self.platformUtility.addLocalUserToGroup(username, experimentName)
        else:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        self.createRsyncScript(username, experimentName)
        return experiment

    def deauthorizeDownload(self, username, experiment):
        experimentName = experiment['name']
        self.logger.debug('De-authorizing download for %s from experiment %s' % (username, experimentName))
        storageDirectory = self.updateExperimentWithStorageDataDirectory(experiment)
        if os.path.exists(storageDirectory):
            self.platformUtility.deleteLocalUserFromGroup(username, experimentName)
        else:
            raise InvalidRequest('Experiment %s has not been started.' % experimentName)
        self.deleteRsyncScript(username, experimentName)
        return experiment

    def createExperimentGroup(self, experiment):
        experimentName = experiment.get('name')
        storageDirectory = experiment.get('storageDirectory')

        # Create experiment group
        self.platformUtility.createGroup(experimentName)
        self.logger.debug('Setting permissions for %s to %s' % (storageDirectory, self.DIR_PERMISSIONS_MODE))
        OsUtility.chmodPath(storageDirectory, dirMode=self.DIR_PERMISSIONS_MODE)
        self.logger.debug('Changing group owner for %s to %s' % (storageDirectory, experimentName))
        self.platformUtility.changePathGroupOwner(storageDirectory, experimentName)
        ownerUpdateTime = time.time()
        self.pathTracker.put(storageDirectory, ownerUpdateTime)

        # Add users to group
        experimentUsers = experiment.get('experimentUsernameList', [])
        self.logger.debug('Found experiment users: %s', experimentUsers)
        self.platformUtility.setGroupUsers(experimentName, experimentUsers)
      
    def updateExperimentGroupUsers(self, experiment):
        experimentName = experiment.get('name')
        experimentUsers = experiment.get('experimentUsernameList', [])
        self.platformUtility.setGroupUsers(experimentName, experimentUsers)

    @ThreadingUtility.synchronize
    def createExperimentDataDirectory(self, experiment):
        experimentName = experiment.get('name')
        storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
        if os.path.exists(storageDirectory):
            self.logger.debug('Data directory %s for experiment %s already exists' % (storageDirectory, experimentName))
        else:
            self.logger.debug('Creating data directory for experiment %s: %s' % (experimentName, storageDirectory))
            OsUtility.createDir(storageDirectory)

        storageHost = ConfigurationManager.getInstance().getHost()
        experiment['storageDirectory'] = storageDirectory
        experiment['storageHost'] = storageHost
        experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory)
        if self.manageStoragePermissions:
            self.createExperimentGroup(experiment)
      
    def processExperimentFile(self, experimentFilePath, experiment, fileInfo={}):
        experimentName = experiment.get('name')
        self.updateExperimentWithStorageDataDirectory(experiment)
        storageDirectory = experiment.get('storageDirectory')
        filePath = os.path.join(storageDirectory, experimentFilePath)
        fileInfo['filePath'] = filePath
        fileInfo['experiment'] = experiment
        if not os.path.exists(filePath):
            self.logger.error('File path %s does not exist' % filePath)
            return

        if self.manageStoragePermissions:
            self.logger.debug('Setting permissions for %s to %s' % (filePath,self.FILE_PERMISSIONS_MODE_STRING))
            OsUtility.chmodPath(filePath, fileMode=self.FILE_PERMISSIONS_MODE)
            self.logger.debug('Changing group owner for %s to %s' % (filePath, experimentName))
            self.platformUtility.changePathGroupOwner(filePath, experimentName)

            # Recursively modify subdirectory permissions
            dirPath = os.path.dirname(filePath)
            while (os.path.abspath(dirPath) != os.path.abspath(storageDirectory)):
                if self.pathTracker.get(dirPath) is None:
                    self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName))
                    self.platformUtility.changePathGroupOwner(dirPath, experimentName)
                    ownerUpdateTime = time.time()
                    self.pathTracker.put(dirPath, ownerUpdateTime)
                else: 
                    self.logger.debug('Group owner for experiment subdirectory %s is already set to %s' % (dirPath, experimentName))
                dirPath = os.path.dirname(dirPath)

        self.logger.debug('Processing file path %s (fileInfo: %s)' % (filePath, fileInfo))
        self.fileProcessingManager.processFile(fileInfo)

    def statExperimentFile(self, experimentFilePath, experiment, fileInfo={}):
        experimentName = experiment.get('name')
        self.updateExperimentWithStorageDataDirectory(experiment)
        storageDirectory = experiment.get('storageDirectory')
        filePath = os.path.join(storageDirectory, experimentFilePath)
        fileInfo['filePath'] = filePath
        if os.path.exists(filePath):
            self.logger.debug('Getting stat for file path %s' % (filePath))
            FileUtility.statFile(filePath, fileInfo)
            FileUtility.getMd5Sum(filePath, fileInfo)
            self.logger.debug('File info after stat: %s' % str(fileInfo))
        else:
            self.logger.debug('File path %s does not exist' % filePath)
            raise ObjectNotFound('File %s does not exist' % filePath)

    def processExperimentDirectory(self, experimentDirectoryPath, experiment, directoryInfo={}):
        experimentName = experiment.get('name')
        self.updateExperimentWithStorageDataDirectory(experiment)
        storageDirectory = experiment.get('storageDirectory')
        directoryPath = os.path.join(storageDirectory, experimentDirectoryPath)
        directoryInfo['directoryPath'] = directoryPath
        directoryInfo['experiment'] = experiment

        if not self.manageStoragePermissions:
            self.logger.error('Skipping permission management for directory path %s' % directoryPath)
            return

        self.logger.debug('Processing directory path %s in background' % (directoryPath))
        timer = threading.Timer(self.DIRECTORY_PROCESSING_DELAY_IN_SECONDS, self.__processExperimentDirectory, args=[experimentDirectoryPath, experiment, directoryInfo])
        timer.start()

    def __processExperimentDirectory(self, experimentDirectoryPath, experiment, directoryInfo={}):
        experimentName = experiment.get('name')
        storageDirectory = experiment.get('storageDirectory')
        directoryPath = directoryInfo.get('directoryPath')

        if not os.path.exists(directoryPath):
            self.logger.error('Directory path %s does not exist' % directoryPath)
            return

        # Modify ownership
        self.logger.debug('Processing directory path %s (directoryInfo: %s)' % (directoryPath, directoryInfo))
        self.logger.debug('Modifying permissions for directory %s to %s' % (directoryPath, self.DIR_PERMISSIONS_MODE))
        OsUtility.chmodPath(directoryPath, dirMode=self.DIR_PERMISSIONS_MODE)
        self.logger.debug('Changing group owner for %s to %s' % (directoryPath, experimentName))
        self.platformUtility.recursivelyChangePathGroupOwner(directoryPath, experimentName)

        # Recursively modify subdirectory permissions
        dirPath = os.path.dirname(directoryPath)
        while (os.path.abspath(dirPath) != os.path.abspath(storageDirectory)):
            if self.pathTracker.get(dirPath) is None:
                self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName))
                self.platformUtility.changePathGroupOwner(dirPath, experimentName)
                ownerUpdateTime = time.time()
                self.pathTracker.put(dirPath, ownerUpdateTime)
            else: 
                self.logger.debug('Group owner for experiment subdirectory %s is already set to %s' % (dirPath, experimentName))
            dirPath = os.path.dirname(dirPath)

        # Update file permissions
        self.logger.debug('Changing permissions for all files in %s for experiment %s' % (directoryPath, experimentName))
        self.platformUtility.chmodPathForFilesInDirectory(directoryPath, self.FILE_PERMISSIONS_MODE_STRING)

    @ThreadingUtility.synchronize
    def start(self):
        self.logger.debug('Started experiment manager')

    @ThreadingUtility.synchronize
    def stop(self):
        self.logger.debug('Stopped experiment manager')

####################################################################
# Testing

if __name__ == '__main__':
    em = ExperimentManager.getInstance()
    print em