#!/usr/bin/env python import threading import time import os from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.configurationManager import ConfigurationManager from dm.common.utility.singleton import Singleton from dm.common.utility.osUtility import OsUtility from dm.common.utility.valueUtility import ValueUtility from dm.common.utility.objectUtility import ObjectUtility from dm.common.utility.threadingUtility import ThreadingUtility from dm.common.utility.fileUtility import FileUtility from dm.common.db.api.experimentDbApi import ExperimentDbApi from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.exceptions.objectNotFound import ObjectNotFound from dm.common.exceptions.invalidRequest import InvalidRequest from pathTracker import PathTracker class ExperimentManager(Singleton): CONFIG_SECTION_NAME = 'ExperimentManager' STORAGE_DIRECTORY_KEY = 'storagedirectory' STORAGE_ID_KEY = 'storageid' MANAGE_STORAGE_PERMISSIONS_KEY = 'managestoragepermissions' PLATFORM_UTILITY_KEY = 'platformutility' RSYNC_SCRIPT_PERMISSIONS_MODE = 0755 FILE_PERMISSIONS_MODE = 0640 FILE_PERMISSIONS_MODE_STRING = '0640' DIR_PERMISSIONS_MODE = 0750 DIRECTORY_PROCESSING_DELAY_IN_SECONDS = 1 # Singleton. __instanceLock = threading.RLock() __instance = None def __init__(self): ExperimentManager.__instanceLock.acquire() try: if ExperimentManager.__instance: return ExperimentManager.__instance = self self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) self.logger.debug('Initializing') self.lock = threading.RLock() self.experimentDbApi = ExperimentDbApi() self.pathTracker = PathTracker() self.platformUtility = None self.__configure() self.fileProcessingManager = FileProcessingManager.getInstance() self.logger.debug('Initialization complete') finally: ExperimentManager.__instanceLock.release() def __configure(self): cm = ConfigurationManager.getInstance() configItems = cm.getConfigItems(ExperimentManager.CONFIG_SECTION_NAME) self.logger.debug('Got config items: %s' % configItems) self.storageDirectory = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_DIRECTORY_KEY) self.storageId = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_ID_KEY) self.manageStoragePermissions = ValueUtility.toBoolean(cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.MANAGE_STORAGE_PERMISSIONS_KEY)) platformUtility = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.PLATFORM_UTILITY_KEY) if platformUtility: (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(platformUtility) self.logger.debug('Creating platform utility class %s' % className) self.platformUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor) self.logger.debug('Manage storage permissions: %s' % self.manageStoragePermissions) def __getExperimentStorageDataDirectory(self, experiment): experimentTypeName = experiment.get('experimentType').get('name') experimentName = experiment.get('name') storageDirectory = '%s/%s/%s' % (self.storageDirectory, experimentTypeName, experimentName) storageDirectory = os.path.normpath(storageDirectory) return storageDirectory @ThreadingUtility.synchronize def updateExperimentWithStorageDataDirectory(self, experiment): storageDirectory = self.__getExperimentStorageDataDirectory(experiment) if os.path.exists(storageDirectory): storageHost = ConfigurationManager.getInstance().getHost() experiment['storageDirectory'] = storageDirectory experiment['storageHost'] = storageHost experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory) return storageDirectory def addUserToGroup(self, username, experimentName): experiment = self.experimentDbApi.getExperimentWithUsers(experimentName) storageDirectory = self.__getExperimentStorageDataDirectory(experiment) if os.path.exists(storageDirectory): self.platformUtility.addUserToGroup(username, experimentName) def deleteUserFromGroup(self, username, experimentName): experiment = self.experimentDbApi.getExperimentWithUsers(experimentName) storageDirectory = self.__getExperimentStorageDataDirectory(experiment) if os.path.exists(storageDirectory): experimentUsers = experiment.get('experimentUsernameList', []) self.platformUtility.setGroupUsers(experimentName, experimentUsers) def createRsyncScript(self, username, experimentName): fileName = '/tmp/rsync.%s.%s' % (username, experimentName) self.logger.debug('Creating rsync script %s' % (fileName)) f = open(fileName, 'w') f.write('#!/bin/sh\n') f.write('exec sg %s "rsync $*"\n' % experimentName) f.close() OsUtility.chmodPath(fileName, fileMode=self.RSYNC_SCRIPT_PERMISSIONS_MODE) def deleteRsyncScript(self, username, experimentName): fileName = '/tmp/rsync.%s.%s' % (username, experimentName) self.logger.debug('Removing rsync script %s' % (fileName)) OsUtility.removeFile(fileName) def authorizeDownload(self, username, experiment): experimentName = experiment['name'] self.logger.debug('Authorizing download for %s from experiment %s' % (username, experimentName)) storageDirectory = self.updateExperimentWithStorageDataDirectory(experiment) if os.path.exists(storageDirectory): self.platformUtility.addLocalUserToGroup(username, experimentName) else: raise InvalidRequest('Experiment %s has not been started.' % experimentName) self.createRsyncScript(username, experimentName) return experiment def deauthorizeDownload(self, username, experiment): experimentName = experiment['name'] self.logger.debug('De-authorizing download for %s from experiment %s' % (username, experimentName)) storageDirectory = self.updateExperimentWithStorageDataDirectory(experiment) if os.path.exists(storageDirectory): self.platformUtility.deleteLocalUserFromGroup(username, experimentName) else: raise InvalidRequest('Experiment %s has not been started.' % experimentName) self.deleteRsyncScript(username, experimentName) return experiment def createExperimentGroup(self, experiment): experimentName = experiment.get('name') storageDirectory = experiment.get('storageDirectory') # Create experiment group self.platformUtility.createGroup(experimentName) self.logger.debug('Setting permissions for %s to %s' % (storageDirectory, self.DIR_PERMISSIONS_MODE)) OsUtility.chmodPath(storageDirectory, dirMode=self.DIR_PERMISSIONS_MODE) self.logger.debug('Changing group owner for %s to %s' % (storageDirectory, experimentName)) self.platformUtility.changePathGroupOwner(storageDirectory, experimentName) ownerUpdateTime = time.time() self.pathTracker.put(storageDirectory, ownerUpdateTime) # Add users to group experimentUsers = experiment.get('experimentUsernameList', []) self.logger.debug('Found experiment users: %s', experimentUsers) self.platformUtility.setGroupUsers(experimentName, experimentUsers) def updateExperimentGroupUsers(self, experiment): experimentName = experiment.get('name') experimentUsers = experiment.get('experimentUsernameList', []) self.platformUtility.setGroupUsers(experimentName, experimentUsers) @ThreadingUtility.synchronize def createExperimentDataDirectory(self, experiment): experimentName = experiment.get('name') storageDirectory = self.__getExperimentStorageDataDirectory(experiment) if os.path.exists(storageDirectory): self.logger.debug('Data directory %s for experiment %s already exists' % (storageDirectory, experimentName)) else: self.logger.debug('Creating data directory for experiment %s: %s' % (experimentName, storageDirectory)) OsUtility.createDir(storageDirectory) storageHost = ConfigurationManager.getInstance().getHost() experiment['storageDirectory'] = storageDirectory experiment['storageHost'] = storageHost experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory) if self.manageStoragePermissions: self.createExperimentGroup(experiment) def processExperimentFile(self, experimentFilePath, experiment, fileInfo={}): experimentName = experiment.get('name') self.updateExperimentWithStorageDataDirectory(experiment) storageDirectory = experiment.get('storageDirectory') filePath = os.path.join(storageDirectory, experimentFilePath) fileInfo['filePath'] = filePath fileInfo['experiment'] = experiment if not os.path.exists(filePath): self.logger.error('File path %s does not exist' % filePath) return if self.manageStoragePermissions: self.logger.debug('Setting permissions for %s to %s' % (filePath,self.FILE_PERMISSIONS_MODE_STRING)) OsUtility.chmodPath(filePath, fileMode=self.FILE_PERMISSIONS_MODE) self.logger.debug('Changing group owner for %s to %s' % (filePath, experimentName)) self.platformUtility.changePathGroupOwner(filePath, experimentName) # Recursively modify subdirectory permissions dirPath = os.path.dirname(filePath) while (os.path.abspath(dirPath) != os.path.abspath(storageDirectory)): if self.pathTracker.get(dirPath) is None: self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName)) self.platformUtility.changePathGroupOwner(dirPath, experimentName) ownerUpdateTime = time.time() self.pathTracker.put(dirPath, ownerUpdateTime) else: self.logger.debug('Group owner for experiment subdirectory %s is already set to %s' % (dirPath, experimentName)) dirPath = os.path.dirname(dirPath) self.logger.debug('Processing file path %s (fileInfo: %s)' % (filePath, fileInfo)) self.fileProcessingManager.processFile(fileInfo) def statExperimentFile(self, experimentFilePath, experiment, fileInfo={}): experimentName = experiment.get('name') self.updateExperimentWithStorageDataDirectory(experiment) storageDirectory = experiment.get('storageDirectory') filePath = os.path.join(storageDirectory, experimentFilePath) fileInfo['filePath'] = filePath if os.path.exists(filePath): self.logger.debug('Getting stat for file path %s' % (filePath)) FileUtility.statFile(filePath, fileInfo) FileUtility.getMd5Sum(filePath, fileInfo) self.logger.debug('File info after stat: %s' % str(fileInfo)) else: self.logger.debug('File path %s does not exist' % filePath) raise ObjectNotFound('File %s does not exist' % filePath) def processExperimentDirectory(self, experimentDirectoryPath, experiment, directoryInfo={}): experimentName = experiment.get('name') self.updateExperimentWithStorageDataDirectory(experiment) storageDirectory = experiment.get('storageDirectory') directoryPath = os.path.join(storageDirectory, experimentDirectoryPath) directoryInfo['directoryPath'] = directoryPath directoryInfo['experiment'] = experiment if not self.manageStoragePermissions: self.logger.error('Skipping permission management for directory path %s' % directoryPath) return self.logger.debug('Processing directory path %s in background' % (directoryPath)) timer = threading.Timer(self.DIRECTORY_PROCESSING_DELAY_IN_SECONDS, self.__processExperimentDirectory, args=[experimentDirectoryPath, experiment, directoryInfo]) timer.start() def __processExperimentDirectory(self, experimentDirectoryPath, experiment, directoryInfo={}): experimentName = experiment.get('name') storageDirectory = experiment.get('storageDirectory') directoryPath = directoryInfo.get('directoryPath') if not os.path.exists(directoryPath): self.logger.error('Directory path %s does not exist' % directoryPath) return # Modify ownership self.logger.debug('Processing directory path %s (directoryInfo: %s)' % (directoryPath, directoryInfo)) self.logger.debug('Modifying permissions for directory %s to %s' % (directoryPath, self.DIR_PERMISSIONS_MODE)) OsUtility.chmodPath(directoryPath, dirMode=self.DIR_PERMISSIONS_MODE) self.logger.debug('Changing group owner for %s to %s' % (directoryPath, experimentName)) self.platformUtility.recursivelyChangePathGroupOwner(directoryPath, experimentName) # Recursively modify subdirectory permissions dirPath = os.path.dirname(directoryPath) while (os.path.abspath(dirPath) != os.path.abspath(storageDirectory)): if self.pathTracker.get(dirPath) is None: self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName)) self.platformUtility.changePathGroupOwner(dirPath, experimentName) ownerUpdateTime = time.time() self.pathTracker.put(dirPath, ownerUpdateTime) else: self.logger.debug('Group owner for experiment subdirectory %s is already set to %s' % (dirPath, experimentName)) dirPath = os.path.dirname(dirPath) # Update file permissions self.logger.debug('Changing permissions for all files in %s for experiment %s' % (directoryPath, experimentName)) self.platformUtility.chmodPathForFilesInDirectory(directoryPath, self.FILE_PERMISSIONS_MODE_STRING) @ThreadingUtility.synchronize def start(self): self.logger.debug('Started experiment manager') @ThreadingUtility.synchronize def stop(self): self.logger.debug('Stopped experiment manager') #################################################################### # Testing if __name__ == '__main__': em = ExperimentManager.getInstance() print em