#!/usr/bin/env python import threading import time import os from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.configurationManager import ConfigurationManager from dm.common.utility.singleton import Singleton from dm.common.utility.osUtility import OsUtility from dm.common.utility.valueUtility import ValueUtility from dm.common.utility.objectUtility import ObjectUtility from dm.common.utility.threadingUtility import ThreadingUtility from dm.common.utility.fileUtility import FileUtility from dm.common.db.api.experimentDbApi import ExperimentDbApi from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.exceptions.objectNotFound import ObjectNotFound class ExperimentManager(Singleton): CONFIG_SECTION_NAME = 'ExperimentManager' STORAGE_DIRECTORY_KEY = 'storagedirectory' MANAGE_STORAGE_PERMISSIONS_KEY = 'managestoragepermissions' PLATFORM_UTILITY_KEY = 'platformutility' FILE_PERMISSIONS_MODE = 0640 DIR_PERMISSIONS_MODE = 0750 # Singleton. __instanceLock = threading.RLock() __instance = None def __init__(self): ExperimentManager.__instanceLock.acquire() try: if ExperimentManager.__instance: return ExperimentManager.__instance = self self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) self.logger.debug('Initializing') self.lock = threading.RLock() self.experimentDbApi = ExperimentDbApi() self.platformUtility = None self.__configure() self.fileProcessingManager = FileProcessingManager.getInstance() self.logger.debug('Initialization complete') finally: ExperimentManager.__instanceLock.release() def __configure(self): cm = ConfigurationManager.getInstance() configItems = cm.getConfigItems(ExperimentManager.CONFIG_SECTION_NAME) self.logger.debug('Got config items: %s' % configItems) self.storageDirectory =cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_DIRECTORY_KEY) self.manageStoragePermissions = ValueUtility.toBoolean(cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.MANAGE_STORAGE_PERMISSIONS_KEY)) platformUtility = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.PLATFORM_UTILITY_KEY) if platformUtility: (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(platformUtility) self.logger.debug('Creating platform utility class %s' % className) self.platformUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor) self.logger.debug('Manage storage permissions: %s' % self.manageStoragePermissions) def __getExperimentStorageDataDirectory(self, experiment): experimentTypeName = experiment.get('experimentType').get('rootDataPath') experimentName = experiment.get('name') storageDirectory = '%s/%s/%s' % (self.storageDirectory, experimentTypeName, experimentName) storageDirectory = os.path.normpath(storageDirectory) return storageDirectory @ThreadingUtility.synchronize def updateExperimentWithStorageDataDirectory(self, experiment): storageDirectory = self.__getExperimentStorageDataDirectory(experiment) if os.path.exists(storageDirectory): experiment['storageDirectory'] = storageDirectory experiment['storageHost'] = ConfigurationManager.getInstance().getHost() return storageDirectory def addUserToGroup(self, username, experimentName): experiment = self.experimentDbApi.getExperimentWithUsers(experimentName) storageDirectory = self.__getExperimentStorageDataDirectory(experiment) if os.path.exists(storageDirectory): self.platformUtility.addUserToGroup(username, experimentName) def createExperimentGroup(self, experiment): experimentName = experiment.get('name') storageDirectory = experiment.get('storageDirectory') # Create experiment group self.platformUtility.createGroup(experimentName) self.logger.debug('Setting permissions for %s to %s' % (storageDirectory, self.DIR_PERMISSIONS_MODE)) OsUtility.chmodPath(storageDirectory, dirMode=self.DIR_PERMISSIONS_MODE) self.logger.debug('Changing group owner for %s to %s' % (storageDirectory, experimentName)) self.platformUtility.changePathGroupOwner(storageDirectory, experimentName) # Add users to group experimentUsers = experiment.get('experimentUsernameList', []) self.logger.debug('Found experiment users: %s', experimentUsers) self.platformUtility.setGroupUsers(experimentName, experimentUsers) #for username in experimentUsers: # self.platformUtility.addUserToGroup(username, experimentName) def updateExperimentGroupUsers(self, experiment): experimentName = experiment.get('name') experimentUsers = experiment.get('experimentUsernameList', []) self.platformUtility.setGroupUsers(experimentName, experimentUsers) @ThreadingUtility.synchronize def createExperimentDataDirectory(self, experiment): experimentName = experiment.get('name') storageDirectory = self.__getExperimentStorageDataDirectory(experiment) if os.path.exists(storageDirectory): self.logger.debug('Data directory %s for experiment %s already exists' % (storageDirectory, experimentName)) else: self.logger.debug('Creating data directory for experiment %s: %s' % (experimentName, storageDirectory)) OsUtility.createDir(storageDirectory) experiment['storageDirectory'] = storageDirectory experiment['storageHost'] = ConfigurationManager.getInstance().getHost() if self.manageStoragePermissions: self.createExperimentGroup(experiment) @ThreadingUtility.synchronize def processExperimentFile(self, experimentFilePath, experiment, fileInfo={}): experimentName = experiment.get('name') self.updateExperimentWithStorageDataDirectory(experiment) storageDirectory = experiment.get('storageDirectory') filePath = os.path.join(storageDirectory, experimentFilePath) fileInfo['filePath'] = filePath fileInfo['experiment'] = experiment if os.path.exists(filePath): self.logger.debug('Processing file path %s (fileInfo: %s)' % (filePath, fileInfo)) if self.manageStoragePermissions: self.logger.debug('Modifying permissions for %s' % filePath) OsUtility.chmodPath(filePath, fileMode=self.FILE_PERMISSIONS_MODE) self.logger.debug('Changing group owner for %s to %s' % (filePath, experimentName)) self.platformUtility.changePathGroupOwner(filePath, experimentName) self.logger.debug('Processing file %s' % filePath) self.fileProcessingManager.processFile(fileInfo) else: self.logger.debug('File path %s does not exist' % filePath) def statExperimentFile(self, experimentFilePath, experiment, fileInfo={}): experimentName = experiment.get('name') self.updateExperimentWithStorageDataDirectory(experiment) storageDirectory = experiment.get('storageDirectory') filePath = os.path.join(storageDirectory, experimentFilePath) fileInfo['filePath'] = filePath if os.path.exists(filePath): self.logger.debug('Getting stat for file path %s' % (filePath)) FileUtility.statFile(filePath, fileInfo) FileUtility.getMd5Sum(filePath, fileInfo) self.logger.debug('File Info after stat: %s' % (fileInfo)) else: self.logger.debug('File path %s does not exist' % filePath) raise ObjectNotFound('File %s does not exist' % filePath) @ThreadingUtility.synchronize def start(self): self.logger.debug('Started experiment manager') @ThreadingUtility.synchronize def stop(self): self.logger.debug('Stopped experiment manager') #################################################################### # Testing if __name__ == '__main__': em = ExperimentManager.getInstance() print em