From abb44c665ccd60135ad3d835671fade95fcf7f40 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Tue, 23 Jun 2015 20:26:01 +0000 Subject: [PATCH] added time-based processing queue class --- setup.sh | 1 + .../DmWebPortal/setup/glassfish-resources.xml | 2 +- .../dm/common/utility/configurationManager.py | 30 +++++++++ src/python/dm/common/utility/encoder.py | 20 ++++++ src/python/dm/common/utility/linuxUtility.py | 62 +++++++++++++++++ src/python/dm/common/utility/osUtility.py | 31 ++++++--- .../utility/timeBasedProcessingQueue.py | 67 +++++++++++++++++++ 7 files changed, 204 insertions(+), 9 deletions(-) create mode 100755 src/python/dm/common/utility/linuxUtility.py create mode 100755 src/python/dm/common/utility/timeBasedProcessingQueue.py diff --git a/setup.sh b/setup.sh index 9796b902..6e70d801 100644 --- a/setup.sh +++ b/setup.sh @@ -65,6 +65,7 @@ prependPathIfDirExists() { } # Setup path and other variables +prependPathIfDirExists $DM_SUPPORT_DIR/mongodb/$DM_HOST_ARCH/bin prependPathIfDirExists $DM_SUPPORT_DIR/postgresql/$DM_HOST_ARCH/bin prependPathIfDirExists $DM_SUPPORT_DIR/java/$DM_HOST_ARCH/bin prependPathIfDirExists $DM_SUPPORT_DIR/ant/bin diff --git a/src/java/DmWebPortal/setup/glassfish-resources.xml b/src/java/DmWebPortal/setup/glassfish-resources.xml index ab069720..fb4700de 100644 --- a/src/java/DmWebPortal/setup/glassfish-resources.xml +++ b/src/java/DmWebPortal/setup/glassfish-resources.xml @@ -6,7 +6,7 @@ <property name="portNumber" value="11136"/> <property name="databaseName" value="dm"/> <property name="User" value="dm"/> - <property name="Password" value="dbuser"/> + <property name="Password" value="dm"/> <property name="URL" value="jdbc:postgresql://localhost:11136/dm"/> <property name="driverClass" value="org.postgresql.Driver"/> </jdbc-connection-pool> diff --git a/src/python/dm/common/utility/configurationManager.py b/src/python/dm/common/utility/configurationManager.py index 96819e84..3ee50aee 100755 --- a/src/python/dm/common/utility/configurationManager.py +++ b/src/python/dm/common/utility/configurationManager.py @@ -45,6 +45,8 @@ DEFAULT_DM_DS_WEB_SERVICE_PORT = 22236 # 222DM DEFAULT_DM_DS_WEB_SERVICE_HOST = '127.0.0.1' DEFAULT_DM_DAQ_WEB_SERVICE_PORT = 33336 # 333DM DEFAULT_DM_DAQ_WEB_SERVICE_HOST = '127.0.0.1' +DEFAULT_DM_CAT_WEB_SERVICE_PORT = 44436 # 444DM +DEFAULT_DM_CAT_WEB_SERVICE_HOST = '127.0.0.1' DEFAULT_DM_DB = 'postgresql' DEFAULT_DM_DB_HOST = '127.0.0.1' @@ -131,6 +133,8 @@ class ConfigurationManager(UserDict.UserDict): self['defaultDaqWebServiceHost'] = DEFAULT_DM_DAQ_WEB_SERVICE_HOST self['defaultDsWebServicePort'] = DEFAULT_DM_DS_WEB_SERVICE_PORT self['defaultDsWebServiceHost'] = DEFAULT_DM_DS_WEB_SERVICE_HOST + self['defaultCatWebServicePort'] = DEFAULT_DM_CAT_WEB_SERVICE_PORT + self['defaultCatWebServiceHost'] = DEFAULT_DM_CAT_WEB_SERVICE_HOST self['defaultDb'] = DEFAULT_DM_DB self['defaultDbHost'] = DEFAULT_DM_DB_HOST @@ -165,6 +169,8 @@ class ConfigurationManager(UserDict.UserDict): self.__setFromEnvVar('daqWebServicePort', 'DM_DAQ_WEB_SERVICE_PORT') self.__setFromEnvVar('dsWebServiceHost', 'DM_DS_WEB_SERVICE_HOST') self.__setFromEnvVar('dsWebServicePort', 'DM_DS_WEB_SERVICE_PORT') + self.__setFromEnvVar('catWebServiceHost', 'DM_CAT_WEB_SERVICE_HOST') + self.__setFromEnvVar('catWebServicePort', 'DM_CAT_WEB_SERVICE_PORT') self.__setFromEnvVar('contextRoot', 'DM_CONTEXT_ROOT') @@ -576,6 +582,30 @@ class ConfigurationManager(UserDict.UserDict): def hasDsWebServiceHost(self): return self.has_key('dsWebServiceHost') + def getDefaultCatWebServicePort(self): + return self['defaultCatWebServicePort'] + + def setCatWebServicePort(self, catWebServicePort): + self['catWebServicePort'] = catWebServicePort + + def getCatWebServicePort(self, default='__dm_default__'): + return int(self.__getKeyValue('catWebServicePort', default)) + + def hasCatWebServicePort(self): + return self.has_key('catWebServicePort') + + def getDefaultCatWebServiceHost(self): + return self['defaultCatWebServiceHost'] + + def setCatWebServiceHost(self, catWebServiceHost): + self['catWebServiceHost'] = catWebServiceHost + + def getCatWebServiceHost(self, default='__dm_default__'): + return self.__getKeyValue('catWebServiceHost', default) + + def hasCatWebServiceHost(self): + return self.has_key('catWebServiceHost') + def getDefaultDb(self): return self['defaultDb'] diff --git a/src/python/dm/common/utility/encoder.py b/src/python/dm/common/utility/encoder.py index 61b0c305..b4d0e9ec 100755 --- a/src/python/dm/common/utility/encoder.py +++ b/src/python/dm/common/utility/encoder.py @@ -15,3 +15,23 @@ class Encoder: def decode(cls, encodedData): data = base64.decodestring(base64.b64decode('%s' % encodedData)) return data + + @classmethod + def encodeDict(cls, dict, excludedKeyList=[]): + encodedDict = {} + for (key,value) in dict.items(): + if key not in excludedKeyList: + encodedDict[key] = cls.encode(value) + else: + encodedDict[key] = value + return encodedDict + + @classmethod + def decodeDict(cls, encodedDict, excludedKeyList=[]): + dict = {} + for (key,value) in encodedDict.items(): + if key not in excludedKeyList: + dict[key] = cls.decode(value) + else: + dict[key] = value + return dict diff --git a/src/python/dm/common/utility/linuxUtility.py b/src/python/dm/common/utility/linuxUtility.py new file mode 100755 index 00000000..4aa8f291 --- /dev/null +++ b/src/python/dm/common/utility/linuxUtility.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +import grp +from dm.common.utility.loggingManager import LoggingManager +from dmSubprocess import DmSubprocess + +class LinuxUtility: + + SUDO_CMD = '/usr/bin/sudo' + GROUPADD_CMD = '/usr/sbin/groupadd' + USERMOD_CMD = '/usr/sbin/usermod' + SETFACL_CMD = '/usr/bin/setfacl' + + @classmethod + def getLogger(cls): + logger = LoggingManager.getInstance().getLogger(cls.__name__) + return logger + + @classmethod + def executeSudoCommand(cls, cmd): + p = DmSubprocess('%s %s' % (cls.SUDO_CMD, cmd)) + p.run() + + @classmethod + def createGroup(cls, name): + """ Create group if it does not exist. """ + logger = cls.getLogger() + try: + group = grp.getgrnam(name) + logger.debug('Group %s already exists' % name) + return + except KeyError, ex: + # ok, we need to create group + pass + logger.debug('Creating group %s' % name) + cmd = '%s %s' % (cls.GROUPADD_CMD, name) + cls.executeSudoCommand(cmd) + + @classmethod + def addUserToGroup(cls, username, groupName): + """ Add user to group. """ + logger = cls.getLogger() + logger.debug('Adding user %s to group %s' % (username, groupName)) + cmd = '%s -a -G %s %s' % (cls.USERMOD_CMD, groupName, username) + cls.executeSudoCommand(cmd) + + @classmethod + def setPathReadExecutePermissionsForGroup(cls, path, groupName): + """ Add user to group. """ + logger = cls.getLogger() + logger.debug('Allowing group %s to read/execute path %s' % (groupName, path)) + cmd = '%s -m group\:%s\:rx %s' % (cls.SETFACL_CMD, groupName, path) + cls.executeSudoCommand(cmd) + +####################################################################### +# Testing. + +if __name__ == '__main__': + LinuxUtility.createGroup('exp3') + LinuxUtility.addUserToGroup('hpc', 'exp3') + LinuxUtility.setPathReadExecutePermissionsForGroup('/home/sveseli/Work/DM/data/ESAF/exp3', 'exp3') + diff --git a/src/python/dm/common/utility/osUtility.py b/src/python/dm/common/utility/osUtility.py index 41f532f9..be6e5937 100755 --- a/src/python/dm/common/utility/osUtility.py +++ b/src/python/dm/common/utility/osUtility.py @@ -41,13 +41,27 @@ class OsUtility: return files=os.listdir(path) for f in files: - fullpath=os.path.join(path, f) - if os.path.islink(fullpath) or not os.path.isdir(fullpath): - os.remove(fullpath) + fullPath=os.path.join(path, f) + if os.path.islink(fullPath) or not os.path.isdir(fullPath): + os.remove(fullPath) else: - removeDir(fullpath) + removeDir(fullPath) os.rmdir(path) + @classmethod + def chmodPath(cls, path, fileMode=None, dirMode=None): + if os.path.isfile(path): + if fileMode is not None: + os.chmod(path, fileMode) + return + elif os.path.isdir(path): + files = os.listdir(path) + for f in files: + fullPath = os.path.join(path, f) + cls.chmodPath(fullPath, fileMode, dirMode) + if dirMode is not None: + os.chmod(path, dirMode) + @classmethod def chownPath(cls, path, uid, gid): """ Change owner on a given path recursively. """ @@ -58,10 +72,10 @@ class OsUtility: os.lchown(path, uid, gid) return elif os.path.isdir(path): - files=os.listdir(path) + files = os.listdir(path) for f in files: - fullpath=os.path.join(path, f) - chownPath(fullpath, uid, gid) + fullPath = os.path.join(path, f) + cls.chownPath(fullPath, uid, gid) os.chown(path, uid, gid) @classmethod @@ -113,4 +127,5 @@ class OsUtility: # Testing. if __name__ == '__main__': - pass + OsUtility.chmodPath('/home/sveseli/Work/DM/data/ESAF/exp3', 0700) + diff --git a/src/python/dm/common/utility/timeBasedProcessingQueue.py b/src/python/dm/common/utility/timeBasedProcessingQueue.py new file mode 100755 index 00000000..d25544e3 --- /dev/null +++ b/src/python/dm/common/utility/timeBasedProcessingQueue.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python + +import threading +import time + +# Uses earliest allowed processing timestamp to sort items in the queue +# Queued item will not be processed until its earliest allowed processing +# timestamp has passed +class TimeBasedProcessingQueue: + + def __init__(self): + self.lock = threading.RLock() + self.queue = [] + self.itemPopTimeList = [] + + def push(self, item, itemProcessingWaitTime=0): + self.lock.acquire() + try: + earliestPopTime = time.time() + itemProcessingWaitTime + popIndex = 0 + for t in self.itemPopTimeList: + if earliestPopTime <= t: + break + popIndex += 1 + self.itemPopTimeList.insert(popIndex, earliestPopTime) + self.queue.insert(popIndex,item) + finally: + self.lock.release() + + def pop(self): + # Return None if work queue is empty. + self.lock.acquire() + try: + item = None + if len(self.queue): + if self.itemPopTimeList[0] <= time.time(): + del self.itemPopTimeList[0] + item = self.queue[0] + del self.queue[0] + return item + finally: + self.lock.release() + + def getLength(self): + return len(self.queue) + + def isEmpty(self): + return len(self.queue) == 0 + +#################################################################### +# Testing + +if __name__ == '__main__': + import random + q = TimeBasedProcessingQueue() + for i in range(0,10): + waitTime = random.uniform(0,10) + q.push(i, waitTime) + print 'Added: ', i, '; Processing wait: ', waitTime + + while not q.isEmpty(): + i = q.pop() + print 'Got: ', i + time.sleep(1) + + + -- GitLab