Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 1699 additions and 0 deletions
#!/usr/bin/env python
#
# Logging manager singleton.
#
import re
import sys
import os.path
import logging
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.exceptions.configurationError import ConfigurationError
class LoggingManager:
"""
The log manager class is initialized via a configuration file
that may have the following sections:
ConsoleLogging # Used for output on the screen
FileLogging # Used for logging into a file
Each section in the configuration file should have the following
keys:
handler # Indicates which handler class to use
level # Indicates logging level
format # Indicates format for log messages
dateFormat # Indicates date format used for log messages
Given below is an example of a valid configuration file:
[ConsoleLogging]
handler=ConsoleLoggingHandler(sys.stdout,)
level=info
format=[%(levelname)s] %(message)s
dateFormat=%m/%d/%y %H:%M:%S
[FileLogging]
handler=TimedRotatingFileLoggingHandler('/tmp/dm.log')
level=debug
format=%(asctime)s,%(msecs)d [%(levelname)s] %(module)s:%(lineno)d %(user)s@%(host)s %(name)s (%(process)d): %(message)s
dateFormat=%m/%d/%y %H:%M:%S
"""
# Get singleton instance.
@classmethod
def getInstance(cls):
from dm.common.utility.loggingManager import LoggingManager
try:
lm = LoggingManager()
except LoggingManager, ex:
lm = ex
return lm
# Singleton.
__instance = None
def __init__(self):
if LoggingManager.__instance:
raise LoggingManager.__instance
LoggingManager.__instance = self
self.consoleHandler = None
self.fileHandlerList = []
self.maxIntLevel = logging.CRITICAL
self.minIntLevel = logging.NOTSET
self.levelRegExList = []
self.logger = logging.getLogger(self.__class__.__name__)
self.initFlag = False
def setMinLogLevel(self, minLogLevel=logging.INFO):
self.minIntLevel = minLogLevel
def parseLevelRegEx(self, levelRegExList):
""" Parse expressions of the form <regex>=<log level>. """
lines = levelRegExList.split('\n')
for line in lines:
try:
(regex, level) = line.rsplit('=', 1)
pattern = re.compile(regex)
tuple = (pattern, logging.getLevelName(level.upper()))
self.levelRegExList.append(tuple)
except Exception, ex:
self.logger.error('Parser error in log configuration file: %s' % line)
self.logger.exception(ex)
# Get Log Level based on a string representation
def getIntLogLevel(self, levelStr):
level = logging.getLevelName(levelStr)
# Level should be an integer
try:
return int(level)
except ValueError, ex:
raise ConfigurationError('"%s" is not valid log level' % levelStr)
# Configure log handlers.
def configure(self):
""" Configure log handlers from the config file. """
cm = ConfigurationManager.getInstance()
configFile = cm.getConfigFile()
configParser = cm.getConfigParserFromConfigFile(configFile)
configSections = cm.getConfigSectionsFromConfigParser(configParser)
# Console handler.
defaults = {
'level' : cm.getConsoleLogLevel(),
'format' : cm.getLogRecordFormat(),
'dateFormat' : cm.getLogDateFormat(),
'handler' : 'ConsoleLoggingHandler(sys.stdout,)'
}
if not self.consoleHandler:
consoleHandler = self.__configureHandler(configParser, 'ConsoleLogging', defaults)
if consoleHandler != None:
self.consoleHandler = consoleHandler
# File logging.
# Do not configure if log directory does not exist.
defaults['handler'] = None
defaults['level'] = cm.getFileLogLevel()
if not os.path.exists(configFile):
# No config file, we'll configure default.
defaultLogFile = cm.getLogFile()
defaultLogDir = os.path.dirname(defaultLogFile)
if os.path.exists(defaultLogDir):
handler = 'TimedRotatingFileLoggingHandler("%s")' % defaultLogFile
defaults['handler'] = handler
fileHandler = self.__configureHandler(configParser, 'FileLogging', defaults)
if fileHandler != None:
self.fileHandlerList.append(fileHandler)
else:
# Parse all file loggers present in the config file
for configSection in configSections:
if configSection.startswith('FileLogging'):
fileHandler = self.__configureHandler(configParser, configSection, defaults)
if fileHandler != None:
self.fileHandlerList.append(fileHandler)
# Add handlers to the root logger. Use logging class here
# to make sure we can have a logger when we parse the
# logger expressions
rootLogger = logging.getLogger('')
for handler in [self.consoleHandler] + self.fileHandlerList:
rootLogger.addHandler(handler)
# Get a logger factory based on our current config
self.configureLoggers(configParser, defaultLevel='debug')
def configureLoggers(self, configParser, defaultLevel='error'):
rootLogLevel = 'error'
levelRegEx = '^.*$=%s' % (defaultLevel)
if configParser is not None and configParser.has_section('LoggerLevels'):
rootLogLevel = ConfigurationManager.getOptionFromConfigParser(configParser, 'LoggerLevels', 'root', rootLogLevel)
levelRegEx = ConfigurationManager.getOptionFromConfigParser(configParser, 'LoggerLevels', 'levelregex', levelRegEx)
rootLevelInt = logging.getLevelName(rootLogLevel.upper())
logging.getLogger('').root.setLevel(rootLevelInt)
logging.getLogger('').debug('Set root logger to %s' % rootLevelInt)
if not levelRegEx:
return
# Parse expressions of the form <regex>=<log-level>. """
lines = levelRegEx.split('\n')
for line in lines:
try:
# Use the right split so we can have '='s in the regex
(regex, level) = line.rsplit('=', 1)
pattern = re.compile(regex)
tuple = (pattern, logging.getLevelName(level.upper()))
self.levelRegExList.append(tuple)
except Exception, ex:
# Do not fail
self.logger.error('Parser error in log configuration file: %s' % line)
self.logger.exception(ex)
# Configure particular handler with given defaults.
def __configureHandler(self, configParser, configSection, defaults):
""" Configure specified handler with a given defaults. """
handlerOption = defaults['handler']
try:
if configParser is not None:
handlerOption = configParser.get(configSection, 'handler')
except Exception, ex:
pass
# If handlerOption is empty, handler cannot be instantiated.
handler = None
if handlerOption != None:
# Handler argument format: MyHandler(arg1, arg2, ...)
# Module will be in lowercase letters, but the class
# should be capitalized.
handlerName = re.sub('\(.*', '', handlerOption)
moduleName = handlerName[0].lower() + handlerName[1:]
try:
exec 'from dm.common.utility import %s' % (moduleName)
exec 'handler = %s.%s' % (moduleName, handlerOption)
except IOError, ex:
errNo, errMsg = ex
import errno
# If the exception raised is an I/O permissions error, ignore
# it and disable this log handler. This allows non-root users
# to use the (system-wide) default log configuration
if errNo != errno.EACCES:
raise
handler = None
except Exception, ex:
raise ConfigurationError(exception=ex)
# Only request setting from the config file if it was
# not set via environment variable, or programmatically.
cm = ConfigurationManager.getInstance()
if handler != None:
try:
level = cm.getOptionFromConfigParser(configParser, configSection, 'level', defaults['level'])
intLevel = self.getIntLogLevel(level.upper())
handler.setLevel(intLevel)
format = cm.getOptionFromConfigParser(configParser, configSection, 'format', defaults['format'])
dateFormat = cm.getOptionFromConfigParser(configParser, configSection, 'dateFormat', defaults['dateFormat'])
handler.setFormatter(logging.Formatter(format, dateFormat))
except Exception, ex:
raise ConfigurationError(exception=ex)
# Apply filters to handler
filter = None
try:
filter = configParser.get(configSection, 'filter')
if filter:
handler.addFilter(logging.Filter(filter))
except Exception, ex:
pass
return handler
def getLogger(self, name='defaultLogger'):
if not self.initFlag:
self.initFlag = True
self.configure()
logger = logging.getLogger(name)
logger.setLevel(self.getLevel(name))
return logger
def getLevel(self, name):
# Match from the known regex list.
level = logging.NOTSET
# The last regex is most important.
for e in reversed(self.levelRegExList):
(pattern, level) = e
# If we return not None it is a match
if not None == pattern.match(name):
break
if level > self.maxIntLevel:
level = self.maxIntLevel
if level < self.minIntLevel:
level = self.minIntLevel
return level
def setConsoleLogLevel(self, level):
try:
# We need to override the logger levels and the handler
intLevel = self.getIntLogLevel(level.upper())
self.consoleHandler.setLevel(intLevel)
self.maxIntLevel = intLevel
self.logger.setLevel(intLevel)
except Exception, ex:
raise ConfigurationError(exception=ex)
def setFileLogLevel(self, level):
try:
# We need to override the logger levels and the handler
intLevel = self.getIntLogLevel(level.upper())
for handler in self.fileHandlerList:
handler.setLevel(intLevel)
self.maxIntLevel = intLevel
self.logger.setLevel(intLevel)
except Exception, ex:
raise ConfigurationError(exception=ex)
#######################################################################
# Testing.
if __name__ == '__main__':
lm = LoggingManager.getInstance()
logger = lm.getLogger('Main')
logger.error('Error In Main')
logger.debug('Debug In Main')
logger.warn('Warn In Main')
logger.info('Info In Main')
logger = lm.getLogger('Main')
logger.info('Info In Main')
logger = lm.getLogger('')
logger.info('Info using root logger')
logger = lm.getLogger('Main.2')
logger.info('Info in Main.2')
lm.setConsoleLogLevel('info')
logger.debug('You should not see this message')
lm.setConsoleLogLevel('debug')
logger.debug('Debug in Main.2')
#!/usr/bin/env python
from dm.common.utility.loggingManager import LoggingManager
class NoopPlatformUtility:
def __init__(self):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def createGroup(self, name):
self.logger.debug('createGroup called for %s' % name)
def addUserToGroup(self, username, groupName):
self.logger.debug('addUserToGroup called for %s, %s' % (username, groupName))
def deleteUserFromGroup(self, username, groupName):
self.logger.debug('deleteUserFromGroup called for %s, %s' % (username, groupName))
def createLocalGroup(self, name):
self.logger.debug('createLocalGroup called for %s' % name)
def addLocalUserToGroup(self, username, groupName):
self.logger.debug('addLocalUserToGroup called for %s, %s' % (username, groupName))
def deleteLocalUserFromGroup(self, username, groupName):
self.logger.debug('adddeleteLocalUserFromGroup called for %s, %s' % (username, groupName))
def getGroupInfo(self, groupName):
self.logger.debug('getGroupInfo called for %s' % name)
def setGroupUsers(self, groupName, usernameList):
self.logger.debug('setGroupUsers called for %s, %s' % (groupName, usernameList))
def setPathReadExecutePermissionsForGroup(self, path, groupName):
self.logger.debug('setPathReadExecutePermissionsForGroup called for %s, %s' % (path, groupName))
def changePathGroupOwner(self, path, groupName):
self.logger.debug('changePathGroupOwner called for %s, %s' % (path, groupName))
def recursivelyChangePathGroupOwner(self, path, groupName):
self.logger.debug('recursivelyChangePathGroupOwner called for %s, %s' % (path, groupName))
def refreshNscdGroupCache(self):
self.logger.debug('refreshNscdGroupCache called')
def refreshAuthFiles(self):
self.logger.debug('refreshAuthFiles called')
def chmodPathForFilesInDirectory(self, directoryPath, fileMode):
self.logger.debug('chmodPathForFilesInDirectory called for %s, %s' % (directoryPath, fileMode))
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import threading
import time
from collections import deque
class ObjectCache:
"""
Cache objects identified by id. Objects are removed from cache
based on the last accessed algorithm.
"""
# How much larger than object cache should time stamp deq be
# allowed to grow.
DEFAULT_TIME_STAMP_DEQ_SIZE_FACTOR = 2
# Cache info expiration time.
DEFAULT_OBJECT_LIFETIME = 60 # seconds
def __init__(self, cacheSize, objectLifetime=DEFAULT_OBJECT_LIFETIME, objectClass=None):
self.lock = threading.RLock()
self.objectMap = {} # id/object map
self.timestampDeq = deque() # timestamp deq
self.cacheSize = cacheSize
self.objectLifetime = objectLifetime
self.deqSize = ObjectCache.DEFAULT_TIME_STAMP_DEQ_SIZE_FACTOR*cacheSize
self.objectClass = objectClass
def setCacheSize(self, cacheSize):
self.cacheSize = cacheSize
def setObjectLifetime(self, objectLifetime):
self.objectLifetime = objectLifetime
def __purgeOne(self):
# Get rid of one cached item based on the last accessed algorithm.
while True:
deqEntry = self.timestampDeq.popleft()
oldId = deqEntry[0]
cachedEntry = self.objectMap.get(oldId)
if cachedEntry is not None:
# Timestamp entry is valid.
if cachedEntry == deqEntry:
# Found an old item, get rid of it from the cache.
del self.objectMap[oldId]
break
# Done.
return
def __purgeTimestampDeq(self):
# Get rid of stale entries.
timestampDeq = deque()
while len(self.timestampDeq):
deqEntry = self.timestampDeq.popleft()
id = deqEntry[0]
cachedEntry = self.objectMap.get(id)
if cachedEntry is not None:
# Timestamp entry is valid.
if cachedEntry == deqEntry:
# Found current item, keep it.
timestampDeq.append(deqEntry)
# Done.
self.timestampDeq = timestampDeq
return
def put(self, id, item, objectLifetime=None):
updateTime = time.time()
expirationTime = updateTime + self.objectLifetime
if objectLifetime is not None:
expirationTime = updateTime + objectLifetime
entry = (id, item, updateTime, expirationTime)
self.lock.acquire()
try:
self.objectMap[id] = entry
self.timestampDeq.append(entry)
if len(self.objectMap) > self.cacheSize:
self.__purgeOne()
if len(self.timestampDeq) > self.deqSize:
self.__purgeTimestampDeq()
finally:
self.lock.release()
def get(self, id):
item = None
itemTuple = self.objectMap.get(id)
if itemTuple is not None:
id, item, updateTime, expirationTime = itemTuple
return item
def getAll(self):
# Item tuple: id, item, updateTime, expirationTime = itemTuple
return map(lambda itemTuple:itemTuple[1], self.objectMap.values())
def getItemTuple(self, id):
itemTuple = self.objectMap.get(id)
if itemTuple is None:
itemTuple = (id, None, None, None)
return itemTuple
def remove(self, id):
self.lock.acquire()
try:
item = self.objectMap.get(id)
if item is not None:
del self.objectMap[id]
return item
finally:
self.lock.release()
def isEmpty(self):
return len(self.objectMap) == 0
def size(self):
return len(self.objectMap)
def __str__(self):
return '%s' % self.timestampDeq
#######################################################################
# Testing.
if __name__ == '__main__':
c = ObjectCache(3)
class Item:
def __init__(self, id):
self.id = id
def getId(self):
return self.id
def __str__(self):
return '%s' % self.id
class Item2:
def __init__(self, name):
self.name = name
def getName(self):
return self.name
def __str__(self):
return '%s' % self.name
for i in range(0,5):
item = Item(i)
c.put(i, item)
print 'Added item: ', item
print 'Cache: ', c
time.sleep(1)
for j in range(0,3):
item = Item(2)
c.put(2, item)
print 'Updated item: ', item
print 'Cache: ', c
time.sleep(1)
item = c.remove(2)
print 'Deleted item 2: ', item
print 'Cache: ', c
time.sleep(1)
item = c.get(2)
print 'Got item 2: ', item
print 'Cache: ', c
print
time.sleep(1)
print
c = ObjectCache(3)
c.put('sv', Item2('sv'))
print c
i = c.get('sv')
print i
print 'Done'
#!/usr/bin/env python
import threading
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectCache import ObjectCache
from dm.common.utility.singleton import Singleton
class ObjectTracker(Singleton):
DEFAULT_CACHE_SIZE = 1000
DEFAULT_OBJECT_LIFETIME_IN_SECONDS = 86400*30 # 30 days
# Singleton.
__instanceLock = threading.RLock()
# Cache configuration
objectClass = None
objectLifetime = DEFAULT_OBJECT_LIFETIME_IN_SECONDS
cacheSize = DEFAULT_CACHE_SIZE
def __init__(self, *args, **kwargs):
ObjectTracker.__instanceLock.acquire()
try:
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.objectCache = ObjectCache(self.cacheSize, self.objectLifetime, self.objectClass)
self.logger.debug('Initialization complete')
finally:
ObjectTracker.__instanceLock.release()
def put(self, id, item, objectLifetime=None):
return self.objectCache.put(id, item, objectLifetime)
def get(self, id):
return self.objectCache.get(id)
def getAll(self):
return self.objectCache.getAll()
def remove(self, id):
return self.objectCache.remove(id)
####################################################################
# Testing
if __name__ == '__main__':
ot = ObjectTracker.getInstance()
print ot
ot2 = ObjectTracker()
print ot2
#!/usr/bin/env python
from dm.common.utility.loggingManager import LoggingManager
class ObjectUtility:
@classmethod
def createObjectInstance(cls, moduleName, className, constructor, importPath=None):
logger = LoggingManager.getInstance().getLogger(cls.__name__)
logger.debug('Creating object: %s, %s, %s' % (moduleName, className, constructor))
if importPath is not None:
# Add import path if it was provided
sys.path.append[importPath]
cmd = 'from %s import %s' % (moduleName, className)
exec cmd
cmd = 'objectInstance = %s' % (constructor)
exec cmd
if importPath is not None:
# Remove import path that was added
del sys.path[-1]
return objectInstance
#!/usr/bin/env python
import os
import stat
class OsUtility:
@classmethod
def createDir(cls, path, mode=None):
""" Create directory if it does not exist already. """
if not os.path.isdir(path):
os.makedirs(path)
if mode is not None:
os.chmod(path, mode)
@classmethod
def removeLink(cls, path):
""" Remove link on a given path. """
if not os.path.islink(path):
return
os.remove(path)
@classmethod
def removeFile(cls, path):
""" Remove file on a given path. """
if not os.path.isfile(path):
return
os.remove(path)
@classmethod
def removeAndIgnoreErrors(cls, path):
""" Remove file on a given path and ignore any errors. """
try:
os.remove(path)
except Exception, ex:
pass
@classmethod
def removeDir(cls, path):
""" Remove dir on a given path, even if it is not empty. """
if not os.path.isdir(path):
return
files=os.listdir(path)
for f in files:
fullPath=os.path.join(path, f)
if os.path.islink(fullPath) or not os.path.isdir(fullPath):
os.remove(fullPath)
else:
removeDir(fullPath)
os.rmdir(path)
@classmethod
def chmodPath(cls, path, fileMode=None, dirMode=None):
if os.path.isfile(path):
if fileMode is not None:
os.chmod(path, fileMode)
return
elif os.path.isdir(path):
files = os.listdir(path)
for f in files:
fullPath = os.path.join(path, f)
cls.chmodPath(fullPath, fileMode, dirMode)
if dirMode is not None:
os.chmod(path, dirMode)
@classmethod
def chownPath(cls, path, uid, gid):
""" Change owner on a given path recursively. """
if os.path.isfile(path):
os.chown(path, uid, gid)
return
elif os.path.islink(path):
os.lchown(path, uid, gid)
return
elif os.path.isdir(path):
files = os.listdir(path)
for f in files:
fullPath = os.path.join(path, f)
cls.chownPath(fullPath, uid, gid)
os.chown(path, uid, gid)
@classmethod
def chownPathByUserName(cls, path, userName):
""" Change owner on a given path recursively. """
import pwd
user = pwd.getpwnam(userName)
chownPath(path, user.pw_uid, user.pw_gid)
@classmethod
def findFiles(cls, dirPath, fileList=None):
""" List files in a given directory. Return list of absolute paths.
Do not follow symbolic links.
"""
fList = fileList
if not fList:
fList = []
if os.path.isdir(dirPath):
files = os.listdir(dirPath)
for f in files:
fullPath = os.path.join(dirPath, f)
if os.path.isfile(fullPath):
fList.append(fullPath)
elif os.path.isdir(fullPath):
fList = cls.findFiles(fullPath, fList)
return fList
@classmethod
def findFilesAsDict(cls, dirPath, fileDict=None):
""" Find files in a given directory. Return dictionary of
absolute paths.
Do not follow symbolic links.
"""
fDict = fileDict
if not fDict:
fDict = {}
if os.path.isdir(dirPath):
files = os.listdir(dirPath)
for f in files:
fullPath = os.path.join(dirPath, f)
if os.path.isfile(fullPath):
statResult = os.stat(fullPath)
fileInfo = {}
fileInfo['fileSize'] = statResult[stat.ST_SIZE]
fileInfo['fileCreationTime'] = statResult[stat.ST_CTIME]
fileInfo['fileModificationTime'] = statResult[stat.ST_MTIME]
fDict[fullPath] = fileInfo
elif os.path.isdir(fullPath):
fDict = cls.findFilesAsDict(fullPath, fDict)
return fDict
@classmethod
def importNameFromFile(cls, name, filePath):
""" Import specified name from file. """
import sys
import os.path
dirName = os.path.dirname(filePath)
moduleName = os.path.basename(filePath).replace('.py', '')
sys.path = [dirName] + sys.path
cmd = 'from %s import %s as tmpObject' % (moduleName, name)
exec cmd
del sys.path[0]
return tmpObject
@classmethod
def getUserHomeDir(cls):
""" Get current user home directory. """
from os.path import expanduser
home = expanduser('~')
return home
#######################################################################
# Testing.
if __name__ == '__main__':
OsUtility.chmodPath('/home/sveseli/Work/DM/data/ESAF/exp3', 0700)
#!/usr/bin/env python
from dm.common.utility.dmSubprocess import DmSubprocess
class RsyncFileTransfer:
COMMAND = 'rsync'
def __init__(self, src, dest, flags='-arvlP'):
self.src = src
self.dest = dest
self.flags = flags
self.command = '%s %s %s %s' % (self.COMMAND, self.flags, self.src, self.dest)
self.subprocess = DmSubprocess.getSubprocess(self.command)
def execute(self):
return self.subprocess.run()
def getStdOut(self):
return self.subprocess.getStdOut()
def getStdErr(self):
return self.subprocess.getStdErr()
def getExitStatus(self):
return self.subprocess.getExitStatus()
#######################################################################
# Testing.
if __name__ == '__main__':
ft = RsyncFileTransfer('/tmp/abc', '/tmp/abc2')
ft.execute()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
#!/usr/bin/env python
import threading
import copy
import stat
import pysftp
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.loggingManager import LoggingManager
from dm.common.exceptions.commandFailed import CommandFailed
import urlparse
class SftpUtility:
DEFAULT_PORT = 22
def __init__(self, host, port=DEFAULT_PORT, username=None, password=None, privateKey=None):
self.host = host
self.port = port
self.username = username
self.password = password
self.privateKey = privateKey
self.sftpClient = None
self.lock = threading.RLock()
@classmethod
def parseFtpUrl(cls, url, defaultHost=None, defaultPort=None):
host = defaultHost
port = defaultPort
scheme = None
dirPath = url
if url.startswith('ftp://'):
parseResult = urlparse.urlparse(url)
scheme = parseResult.scheme
netlocTokens = parseResult.netloc.split(':')
host = netlocTokens[0]
if len(netlocTokens) > 1:
port = int(netlocTokens[1])
dirPath = parseResult.path
return (scheme, host, port, dirPath)
@classmethod
def getSftpClient(cls, host, port=DEFAULT_PORT, username=None, password=None, privateKey=None):
sftp = pysftp.Connection(host, username=username, password=password, port=port, private_key=privateKey)
return sftp
@classmethod
def getLogger(cls):
logger = LoggingManager.getInstance().getLogger(cls.__name__)
return logger
def __parseKeyValue(cls, keyValue, outputDict={}):
key,value = keyValue.split('=')
value = value.strip()
outputDict[key] = value
return outputDict
def getFiles(self, dirPath, fileDict={}, replacementDirPath=None):
self.lock.acquire()
try:
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
if not replacementDirPath:
replacementDirPath = dirPath
try:
attrs = self.sftpClient.listdir_attr(dirPath)
except Exception, ex:
self.getLogger().error('Could not retrieve files from %s: %s' % (dirPath,ex))
self.closeConnection()
raise
finally:
self.lock.release()
for attr in attrs:
fileName = attr.filename
mode = attr.st_mode
if stat.S_ISDIR(mode):
dirPath2 = '%s/%s' % (dirPath, fileName)
replacementDirPath2 = '%s/%s' % (replacementDirPath, fileName)
self.getFiles(dirPath2, fileDict, replacementDirPath2)
elif stat.S_ISREG(mode):
filePath = '%s/%s' % (replacementDirPath, fileName)
fileInfo = {'fileSize' : attr.st_size,
'fileModificationTime' : attr.st_mtime }
fileDict[filePath] = fileInfo
return fileDict
def getMd5Sum(self, filePath, fileInfo={}):
self.lock.acquire()
try:
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
try:
#md5Sum = self.sftpClient.execute('md5sum "%s"' % filePath)[0].split()[0]
output = self.sftpClient.execute('md5sum "%s"' % filePath)[0].strip()
if not output.endswith(filePath):
raise CommandFailed(output)
md5Sum = output.split()[0]
fileInfo['md5Sum'] = md5Sum
except CommandFailed, ex:
self.getLogger().error('Could not get md5sum for file %s: %s' % (filePath,ex))
raise
except Exception, ex:
self.getLogger().error('Could not get md5sum for file %s: %s' % (filePath,ex))
self.closeConnection()
raise
return md5Sum
finally:
self.lock.release()
def statFile(self, filePath, fileInfo={}):
self.lock.acquire()
try:
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
try:
attr = self.sftpClient.stat(filePath)
fileInfo['fileSize'] = attr.st_size
fileInfo['fileModificationTime'] = attr.st_mtime
except Exception, ex:
self.getLogger().error('Could not get stat file %s: %s' % (filePath,ex))
self.closeConnection()
raise
return fileInfo
finally:
self.lock.release()
def closeConnection(self):
logger = self.getLogger()
self.lock.acquire()
try:
try:
if self.sftpClient:
logger.warn('Closing SFTP connection to host %s' % self.host)
self.sftpClient.close()
except Exception, ex:
logger.error('Could not close SFTP connection to host %s: %s' % (self.host, ex))
self.sftpClient = None
finally:
self.lock.release()
#######################################################################
# Testing.
if __name__ == '__main__':
#sftpUtility = SftpUtility('s1dserv', username='dmadmin', password='theKey12')
#sftpUtility = SftpUtility('s1dserv',privateKey='/home/beams/DMADMIN/.ssh/id_dsa')
sftpUtility = SftpUtility('xstor-devel')
files = sftpUtility.getFiles('/data/testing/id7-test02')
#files = sftpUtility.getFiles('/export/dm/test')
print files
#print sftpUtility.getMd5Sum('/export/dm/test/testfile03(2nd).txt')
#print sftpUtility.getMd5Sum('/export/dm/test/testfile 04.txt')
#print 'Closing connection'
#sftpUtility.closeConnection()
#print sftpUtility.statFile('/export/dm/test/testfile01')
#!/usr/bin/env python
class Singleton(object):
__instance = None
# This class will behave properly as long as getInstance() is called.
# If object is constructed using constructor, __init__() will be called
# multiple times in the derived class (i.e., one needs protection
# against multiple initializations in the derived class)
def __new__(cls, *args, **kwargs):
# Allow subclasses to create their own instances.
if cls.__instance is None or cls != type(cls.__instance):
instance = object.__new__(cls, *args, **kwargs)
instance.__init__(*args, **kwargs)
cls.__instance = instance
return cls.__instance
@classmethod
def getInstance(cls, *args, **kwargs):
return cls.__new__(cls, *args, **kwargs)
def __init__(self, *args, **kwargs):
# Only initialize once.
if self.__class__.__instance is not None:
return
####################################################################
# Testing
if __name__ == '__main__':
print 'Testing Singleton class'
s1 = Singleton.getInstance()
s2 = Singleton()
s3 = Singleton.getInstance()
s4 = Singleton()
print 'S1: ', s1
print 'S2: ', s2
print 'S3: ', s3
print 'S4: ', s4
class A(Singleton):
__instance = None
def __init__(self, x):
if self.__class__.__instance is None:
print 'in A.__init__()'
self.x = x
class B(Singleton):
def __init__(self, x):
self.x = x
class C(Singleton):
def __init__(self):
self.x = 14
print
print 'Testing Class A'
print 'Init A(3)'
a1 = A(3)
print 'Init A(4)'
a2 = A(4)
print 'A.getInstance()'
a3 = A.getInstance()
print 'A.getInstance()'
a4 = A.getInstance()
print a1
print a2
print a3
print a3.x, a2.x, a1.x
print
print 'Testing Class B'
b1 = B(6)
b2 = B(5)
print b1
print b2
print b2.x, b1.x
print
print 'Testing Class C'
c1 = C()
c2 = C()
print c1
print c2
print c2.x, c1.x
#!/usr/bin/env python
import ssl
class SslUtility:
DEFAULT_SSL_CONTEXT = ssl._create_default_https_context
@classmethod
def useUnverifiedSslContext(cls, func):
def wrapper(*args, **kwargs):
# Disable SSL checking
ssl._create_default_https_context = ssl._create_unverified_context
# Perform function call
result = func(*args, **kwargs)
# Revert back to original SSL settings
ssl._create_default_https_context = SslUtility.DEFAULT_SSL_CONTEXT
return result
return wrapper
#!/usr/bin/env python
import threading
class ThreadSafeQueue:
def __init__(self):
self.lock = threading.RLock()
self.queue = []
def push(self, item):
self.lock.acquire()
try:
self.queue.insert(0,item)
finally:
self.lock.release()
def pop(self):
# Return None if work queue is empty.
self.lock.acquire()
try:
item = None
if len(self.queue):
item = self.queue.pop()
return item
finally:
self.lock.release()
def getLength(self):
return len(self.queue)
def isEmpty(self):
return len(self.queue) == 0
####################################################################
# Testing
if __name__ == '__main__':
q = ThreadSafeQueue()
for i in range(0,10):
q.push(i)
print 'Added: ', i
while not q.isEmpty():
i = q.pop()
print 'Got: ', i
#!/usr/bin/env python
class ThreadingUtility:
# Assumes class has instance lock initialized
@classmethod
def synchronize(cls, func):
def synchronized(*args, **kwargs):
im_self = args[0]
im_self.lock.acquire()
try:
result = func(*args, **kwargs)
return result
finally:
im_self.lock.release()
return synchronized
#######################################################################
# Testing.
if __name__ == '__main__':
import threading
class A:
def __init__(self):
self.lock = threading.RLock()
@ThreadingUtility.synchronize
def twoX(self, x):
print 'X=', x
return 2*x
a = A()
t = a.twoX(3)
print 'Result: ', t
#!/usr/bin/env python
import threading
import time
# Uses earliest allowed processing timestamp to sort items in the queue
# Queued item will not be processed until its earliest allowed processing
# timestamp has passed
class TimeBasedProcessingQueue:
def __init__(self):
self.lock = threading.RLock()
self.queue = []
self.itemPopTimeList = []
def push(self, item, itemProcessingWaitTime=0):
self.lock.acquire()
try:
earliestPopTime = time.time() + itemProcessingWaitTime
popIndex = 0
for t in self.itemPopTimeList:
if earliestPopTime <= t:
break
popIndex += 1
self.itemPopTimeList.insert(popIndex, earliestPopTime)
self.queue.insert(popIndex,item)
finally:
self.lock.release()
def pop(self):
# Return None if work queue is empty.
self.lock.acquire()
try:
item = None
if len(self.queue):
if self.itemPopTimeList[0] <= time.time():
del self.itemPopTimeList[0]
item = self.queue[0]
del self.queue[0]
return item
finally:
self.lock.release()
def getLength(self):
return len(self.queue)
def isEmpty(self):
return len(self.queue) == 0
####################################################################
# Testing
if __name__ == '__main__':
import random
q = TimeBasedProcessingQueue()
for i in range(0,10000000):
waitTime = random.uniform(0,10)
q.push(i, waitTime)
#print 'Added: ', i, '; Processing wait: ', waitTime
print "Sleeping..."
time.sleep(60)
print "Removing..."
while not q.isEmpty():
i = q.pop()
#print 'Got: ', i
#time.sleep(1)
print "Sleeping..."
time.sleep(60)
#!/usr/bin/env python
import time
import pytz
import datetime
from tzlocal import get_localzone
from dm.common.exceptions.invalidArgument import InvalidArgument
class TimeUtility:
UTC_MINUS_LOCAL_TIME = None
@classmethod
def getCurrentGMTimestamp(cls):
""" Formats GMT timestamp. """
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(time.time()))
@classmethod
def formatGMTimestamp(cls, t):
""" Format GMT timestamp. """
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(t))
@classmethod
def getCurrentLocalTimestamp(cls):
""" Formats local timestamp. """
return time.strftime('%Y/%m/%d %H:%M:%S %Z', time.localtime(time.time()))
@classmethod
def formatLocalTimestamp(cls, t):
""" Formats local timestamp. """
return time.strftime('%Y/%m/%d %H:%M:%S %Z', time.localtime(t))
@classmethod
def toDateTime(cls, t, format):
if not t:
return None
tz = get_localzone()
try:
dt = datetime.datetime.strptime(t, format)
except Exception, ex:
raise InvalidArgument('Cannot parse input: %s' % ex)
return tz.localize(dt, is_dst=None)
@classmethod
def utcToLocalTime(cls, utc):
if cls.UTC_MINUS_LOCAL_TIME is None:
cls.UTC_MINUS_LOCAL_TIME = (datetime.datetime.utcnow()-datetime.datetime.now()).total_seconds()
if cls.UTC_MINUS_LOCAL_TIME > 0:
cls.UTC_MINUS_LOCAL_TIME = int(cls.UTC_MINUS_LOCAL_TIME+0.5)
else:
cls.UTC_MINUS_LOCAL_TIME = int(cls.UTC_MINUS_LOCAL_TIME-0.5)
localTime = utc - cls.UTC_MINUS_LOCAL_TIME
return localTime
#######################################################################
# Testing.
if __name__ == '__main__':
print TimeUtility.toDateTime('2015-01-03', '%Y-%m-%d')
dt0 = datetime.datetime.utcnow()
dt1 = datetime.datetime.now()
ts0 = time.mktime(dt0.timetuple())
ts1 = time.mktime(dt1.timetuple())
t0 = time.strftime("%Y/%m/%d %H:%M:%S", dt0.timetuple())
print 'UTC: ', t0, ts0
t1 = time.strftime("%Y/%m/%d %H:%M:%S", dt1.timetuple())
print 'LOCAL: ', t1, ts1
print 'UTC TO LOCAL: ', TimeUtility.utcToLocalTime(ts0)
#!/usr/bin/env python
#######################################################################
import socket
import pwd
import os
from logging.handlers import TimedRotatingFileHandler
#######################################################################
class TimedRotatingFileLoggingHandler(TimedRotatingFileHandler):
""" Class that enables logging into files. """
def __init__(self, filename, when='D', interval=1, backupCount=0, encoding=None):
TimedRotatingFileHandler.__init__(self, filename, when, interval, backupCount, encoding)
self.user = pwd.getpwuid(os.getuid())[0]
self.host = socket.gethostname()
def emit(self, record):
record.__dict__['user'] = self.user
record.__dict__['host'] = self.host
return TimedRotatingFileHandler.emit(self, record)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
class ValueUtility:
@classmethod
def toBoolean(cls, value):
if value is None:
return False
strValue = str(value).lower()
if strValue == '1':
return True
elif strValue == 'true':
return True
return False
#######################################################################
# Testing.
if __name__ == '__main__':
print ValueUtility.toBoolean('True')
#!/usr/bin/env python
from dm.common.api.dmRestApi import DmRestApi
from dm.common.utility.configurationManager import ConfigurationManager
class DaqRestApi(DmRestApi):
""" Base DAQ DM REST api class. """
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
if host == None:
host = ConfigurationManager.getInstance().getDaqWebServiceHost()
if port == None:
port = ConfigurationManager.getInstance().getDaqWebServicePort()
DmRestApi.__init__(self, username, password, host, port, protocol)
#######################################################################
# Testing.
if __name__ == '__main__':
api = DaqRestApi('sveseli', 'sveseli')
#api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list', 'GET', data='')
import urllib
from dm.common.utility.configurationManager import ConfigurationManager
cm = ConfigurationManager.getInstance()
cm.setSessionCacheFile('/tmp/session')
#print 'Non-session request'
#print api.sendRequest('https://zagreb.svdev.net:10232/dm/directory/list?path=/tmp', 'GET')
print 'Session request'
data = { 'path' : '/tmp/xyz' }
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write?path=/tmp/xyz&content=xyz', 'POST', contentType='application/x-www-form-urlencoded', data=urllib.urlencode(data))
#print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', data=data)
postdata='path=/tmp/xyz'
postdata+='&content=%s' % urllib.quote_plus('Hey there')
print api.sendSessionRequest('https://zagreb.svdev.net:10232/dm/file/write', 'POST', contentType='application/x-www-form-urlencoded', data=postdata)
#!/usr/bin/env python
import os
import json
import urllib
from dm.common.constants import dmProcessingStatus
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.objects.experiment import Experiment
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.daqInfo import DaqInfo
from daqRestApi import DaqRestApi
class ExperimentDaqApi(DaqRestApi):
'''
Data Management API for accessing experiment interface provided by the
DAQ service on a DM experiment station.
'''
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
'''
Constructor.
:param username: DM username
:type username: str
:param password: DM password
:type password: str
:param host: DM service host
:type host: str
:param port: DM service port
:type port: int
:param protocol: DM service protocol
:type protocol: str
>>> api = ExperimentDaqApi(username='dm', password='XYZ', host='bluegill1', port=22236, protocol='https')
'''
DaqRestApi.__init__(self, username, password, host, port, protocol)
@DaqRestApi.execute2
def startDaq(self, experimentName, dataDirectory, daqInfo={}):
'''
Start data acquisition (real-time directory monitoring and file upload).
Only files created or modified after this command was issued will be
uploaded to storage.
:param experimentName: experiment name
:type experimentName: str
:param dataDirectory: data directory URL
:type dataDirectory: str
:param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition; several keys have special meaning:
- *processHiddenFiles* (bool): if set to True, hidden files will be processed
- *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
- *maxRunTimeInHours* (int): specifies maximum data acquisition run time in hours
- *uploadDataDirectoryOnExit* (str): specifies URL of the data directory that should be uploaded after data acquisition completes
- *uploadDestDirectoryOnExit* (str): specifies directory path relative to experiment root directory where uploaded files should be stored
- *skipPlugins* (str): comma-separated list of plugins which should not process files
:type daqInfo: dict
:returns: DaqInfo object
:raises InvalidRequest: in case of empty experiment name or data directory
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: in case experiment does not exist
:raises DmException: in case of any other errors
>>> daqInfo = api.startDaq(experimentName='test01', dataDirectory='/home/dm/test')
>>> daqId = daqInfo.get('id')
>>> daqStatus = daqInfo.get('status')
'''
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/startDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return DaqInfo(responseDict)
@DaqRestApi.execute2
def stopDaq(self, experimentName, dataDirectory):
'''
Stop data acquisition (real-time directory monitoring and file upload).
:param experimentName: experiment name
:type experimentName: str
:param dataDirectory: data directory URL
:type dataDirectory: str
:returns: DaqInfo object
:raises InvalidRequest: in case of empty experiment name or data directory
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: in case experiment does not exist, or if there are no active DAQs for a given experiment and data directory
:raises DmException: in case of any other errors
>>> daqInfo = api.stopDaq(experimentName='test01', dataDirectory='/home/dm/test')
>>> daqStatus = daqInfo.get('status')
'''
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/stopDaq/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
responseDict = self.sendSessionRequest(url=url, method='POST')
return DaqInfo(responseDict)
@DaqRestApi.execute2
def getDaqInfo(self, id):
'''
Get data acquisition details.
:param id: data acquisition id
:type id: str
:returns: DaqInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known DAQ with a given id
:raises DmException: in case of any other errors
>>> daqInfo = api.getDaqInfo('84311a75-c88b-4605-8948-08257eae6f5c')
>>> daqStatus = daqInfo.get('status')
'''
if not id:
raise InvalidRequest('Daq id must be provided.')
url = '%s/experimentDaqs/%s' % (self.getContextRoot(),id)
responseDict = self.sendSessionRequest(url=url, method='GET')
return DaqInfo(responseDict)
@DaqRestApi.execute2
def listDaqs(self, status=None):
'''
Get list of known DAQs.
:param status: status string; if not supplied, all DAQs will be included in the returned list
:type status: str
:returns: list of DaqInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> daqInfoList = api.listDaqs()
>>> for daqInfo in daqInfoList:
>>> print daqInfo['id'], daqInfo['status']
'''
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, DaqInfo)
@DaqRestApi.execute2
def upload(self, experimentName, dataDirectory, daqInfo={}):
'''
Upload files from the given directory. Only files found at the time
when command was issued will be uploaded to storage.
:param experimentName: experiment name
:type experimentName: str
:param dataDirectory: data directory URL
:type dataDirectory: str
:param daqInfo: dictionary of optional metadata (key/value pairs) describing data acquisition or processing that generated files that will be uploaded; several keys have special meaning:
- *processHiddenFiles* (bool): if set to True, hidden files will be processed
- *reprocessFiles* (bool): if set to True, files will be uploaded regardless of whether or not they already exist in storage and have not changed
- *destDirectory* (str): specifies directory path relative to experiment root directory where files will be stored
- *processingMode* (str): specifies processing mode, and can be set to "files" (service plugins process individual files one at a time) or "directory" (service plugins process entire directory at once; works faster for uploads of a large number of small files)
- *skipPlugins* (str): comma-separated list of plugins which should not process files
:type daqInfo: dict
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid input arguments
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: in case experiment does not exist
:raises DmException: in case of any other errors
>>> uploadInfo = api.upload(experimentName='test01', dataDirectory='/home/dm/test', daqInfo={'sample' : 'A1'})
>>> uploadId = uploadInfo.get('id')
'''
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not dataDirectory:
raise InvalidRequest('Experiment data directory must be provided.')
url = '%s/experimentsByName/%s/upload/%s' % (self.getContextRoot(), Encoder.encode(experimentName), Encoder.encode(dataDirectory))
url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return UploadInfo(responseDict)
@DaqRestApi.execute
def getUploadInfo(self, id):
'''
Get upload details.
:param id: upload id
:type id: str
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known upload with a given id
:raises DmException: in case of any other errors
>>> uplaodInfo = api.getUploadInfo('84311a75-c88b-4605-8948-08257eae6f5c')
>>> uploadStatus = uploadInfo.get('status')
'''
url = '%s/experimentUploads/%s' % (self.getContextRoot(),id)
if not id:
raise InvalidRequest('Upload id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='GET')
return UploadInfo(responseDict)
@DaqRestApi.execute2
def listUploads(self, status=None):
'''
Get list of known uploads.
:param status: status string; if not supplied, all uploads will be included in the returned list
:type status: str
:returns: list of UploadInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> uploadInfoList = api.listUploads(status='running')
>>> for uploadInfo in uploadInfoList:
>>> print uploadInfo['id']
'''
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, UploadInfo)
@DaqRestApi.execute2
def stopUpload(self, id):
'''
Abort upload.
:param id: upload id
:type id: str
:returns: UploadInfo object
:raises InvalidRequest: in case of invalid (empty or None) id
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises ObjectNotFound: if there is no known upload with a given id
:raises DmException: in case of any other errors
>>> uplaodInfo = api.stopUpload('84311a75-c88b-4605-8948-08257eae6f5c')
>>> print uploadInfo.get('nCompletedFiles')
'''
url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id)
if not id:
raise InvalidRequest('Upload id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='POST')
return UploadInfo(responseDict)
@DaqRestApi.execute2
def listProcessingPlugins(self):
'''
Get list of DAQ service processing plugins.
:returns: list of PluginInfo objects
:raises AuthorizationError: in case user is not authorized to manage DM station
:raises DmException: in case of any other errors
>>> pluginInfoList = api.getProcessingPlugins()
>>> for pluginInfo in pluginInfoList:
>>> print pluginInfo['name']
'''
url = '%s/processingPlugins' % (self.getContextRoot())
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, PluginInfo)
#######################################################################
# Testing.
if __name__ == '__main__':
api = ExperimentDaqApi('sveseli', 'sveseli', 'zagreb.svdev.net', 33336, 'http')
print api.startDaq('experiment1', '/tmp/data/experiment1')