Skip to content
Snippets Groups Projects
Commit abb44c66 authored by sveseli's avatar sveseli
Browse files

added time-based processing queue class

parent 83964bb4
No related branches found
No related tags found
No related merge requests found
...@@ -65,6 +65,7 @@ prependPathIfDirExists() { ...@@ -65,6 +65,7 @@ prependPathIfDirExists() {
} }
# Setup path and other variables # Setup path and other variables
prependPathIfDirExists $DM_SUPPORT_DIR/mongodb/$DM_HOST_ARCH/bin
prependPathIfDirExists $DM_SUPPORT_DIR/postgresql/$DM_HOST_ARCH/bin prependPathIfDirExists $DM_SUPPORT_DIR/postgresql/$DM_HOST_ARCH/bin
prependPathIfDirExists $DM_SUPPORT_DIR/java/$DM_HOST_ARCH/bin prependPathIfDirExists $DM_SUPPORT_DIR/java/$DM_HOST_ARCH/bin
prependPathIfDirExists $DM_SUPPORT_DIR/ant/bin prependPathIfDirExists $DM_SUPPORT_DIR/ant/bin
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
<property name="portNumber" value="11136"/> <property name="portNumber" value="11136"/>
<property name="databaseName" value="dm"/> <property name="databaseName" value="dm"/>
<property name="User" value="dm"/> <property name="User" value="dm"/>
<property name="Password" value="dbuser"/> <property name="Password" value="dm"/>
<property name="URL" value="jdbc:postgresql://localhost:11136/dm"/> <property name="URL" value="jdbc:postgresql://localhost:11136/dm"/>
<property name="driverClass" value="org.postgresql.Driver"/> <property name="driverClass" value="org.postgresql.Driver"/>
</jdbc-connection-pool> </jdbc-connection-pool>
......
...@@ -45,6 +45,8 @@ DEFAULT_DM_DS_WEB_SERVICE_PORT = 22236 # 222DM ...@@ -45,6 +45,8 @@ DEFAULT_DM_DS_WEB_SERVICE_PORT = 22236 # 222DM
DEFAULT_DM_DS_WEB_SERVICE_HOST = '127.0.0.1' DEFAULT_DM_DS_WEB_SERVICE_HOST = '127.0.0.1'
DEFAULT_DM_DAQ_WEB_SERVICE_PORT = 33336 # 333DM DEFAULT_DM_DAQ_WEB_SERVICE_PORT = 33336 # 333DM
DEFAULT_DM_DAQ_WEB_SERVICE_HOST = '127.0.0.1' DEFAULT_DM_DAQ_WEB_SERVICE_HOST = '127.0.0.1'
DEFAULT_DM_CAT_WEB_SERVICE_PORT = 44436 # 444DM
DEFAULT_DM_CAT_WEB_SERVICE_HOST = '127.0.0.1'
DEFAULT_DM_DB = 'postgresql' DEFAULT_DM_DB = 'postgresql'
DEFAULT_DM_DB_HOST = '127.0.0.1' DEFAULT_DM_DB_HOST = '127.0.0.1'
...@@ -131,6 +133,8 @@ class ConfigurationManager(UserDict.UserDict): ...@@ -131,6 +133,8 @@ class ConfigurationManager(UserDict.UserDict):
self['defaultDaqWebServiceHost'] = DEFAULT_DM_DAQ_WEB_SERVICE_HOST self['defaultDaqWebServiceHost'] = DEFAULT_DM_DAQ_WEB_SERVICE_HOST
self['defaultDsWebServicePort'] = DEFAULT_DM_DS_WEB_SERVICE_PORT self['defaultDsWebServicePort'] = DEFAULT_DM_DS_WEB_SERVICE_PORT
self['defaultDsWebServiceHost'] = DEFAULT_DM_DS_WEB_SERVICE_HOST self['defaultDsWebServiceHost'] = DEFAULT_DM_DS_WEB_SERVICE_HOST
self['defaultCatWebServicePort'] = DEFAULT_DM_CAT_WEB_SERVICE_PORT
self['defaultCatWebServiceHost'] = DEFAULT_DM_CAT_WEB_SERVICE_HOST
self['defaultDb'] = DEFAULT_DM_DB self['defaultDb'] = DEFAULT_DM_DB
self['defaultDbHost'] = DEFAULT_DM_DB_HOST self['defaultDbHost'] = DEFAULT_DM_DB_HOST
...@@ -165,6 +169,8 @@ class ConfigurationManager(UserDict.UserDict): ...@@ -165,6 +169,8 @@ class ConfigurationManager(UserDict.UserDict):
self.__setFromEnvVar('daqWebServicePort', 'DM_DAQ_WEB_SERVICE_PORT') self.__setFromEnvVar('daqWebServicePort', 'DM_DAQ_WEB_SERVICE_PORT')
self.__setFromEnvVar('dsWebServiceHost', 'DM_DS_WEB_SERVICE_HOST') self.__setFromEnvVar('dsWebServiceHost', 'DM_DS_WEB_SERVICE_HOST')
self.__setFromEnvVar('dsWebServicePort', 'DM_DS_WEB_SERVICE_PORT') self.__setFromEnvVar('dsWebServicePort', 'DM_DS_WEB_SERVICE_PORT')
self.__setFromEnvVar('catWebServiceHost', 'DM_CAT_WEB_SERVICE_HOST')
self.__setFromEnvVar('catWebServicePort', 'DM_CAT_WEB_SERVICE_PORT')
self.__setFromEnvVar('contextRoot', 'DM_CONTEXT_ROOT') self.__setFromEnvVar('contextRoot', 'DM_CONTEXT_ROOT')
...@@ -576,6 +582,30 @@ class ConfigurationManager(UserDict.UserDict): ...@@ -576,6 +582,30 @@ class ConfigurationManager(UserDict.UserDict):
def hasDsWebServiceHost(self): def hasDsWebServiceHost(self):
return self.has_key('dsWebServiceHost') return self.has_key('dsWebServiceHost')
def getDefaultCatWebServicePort(self):
return self['defaultCatWebServicePort']
def setCatWebServicePort(self, catWebServicePort):
self['catWebServicePort'] = catWebServicePort
def getCatWebServicePort(self, default='__dm_default__'):
return int(self.__getKeyValue('catWebServicePort', default))
def hasCatWebServicePort(self):
return self.has_key('catWebServicePort')
def getDefaultCatWebServiceHost(self):
return self['defaultCatWebServiceHost']
def setCatWebServiceHost(self, catWebServiceHost):
self['catWebServiceHost'] = catWebServiceHost
def getCatWebServiceHost(self, default='__dm_default__'):
return self.__getKeyValue('catWebServiceHost', default)
def hasCatWebServiceHost(self):
return self.has_key('catWebServiceHost')
def getDefaultDb(self): def getDefaultDb(self):
return self['defaultDb'] return self['defaultDb']
......
...@@ -15,3 +15,23 @@ class Encoder: ...@@ -15,3 +15,23 @@ class Encoder:
def decode(cls, encodedData): def decode(cls, encodedData):
data = base64.decodestring(base64.b64decode('%s' % encodedData)) data = base64.decodestring(base64.b64decode('%s' % encodedData))
return data return data
@classmethod
def encodeDict(cls, dict, excludedKeyList=[]):
encodedDict = {}
for (key,value) in dict.items():
if key not in excludedKeyList:
encodedDict[key] = cls.encode(value)
else:
encodedDict[key] = value
return encodedDict
@classmethod
def decodeDict(cls, encodedDict, excludedKeyList=[]):
dict = {}
for (key,value) in encodedDict.items():
if key not in excludedKeyList:
dict[key] = cls.decode(value)
else:
dict[key] = value
return dict
#!/usr/bin/env python
import grp
from dm.common.utility.loggingManager import LoggingManager
from dmSubprocess import DmSubprocess
class LinuxUtility:
SUDO_CMD = '/usr/bin/sudo'
GROUPADD_CMD = '/usr/sbin/groupadd'
USERMOD_CMD = '/usr/sbin/usermod'
SETFACL_CMD = '/usr/bin/setfacl'
@classmethod
def getLogger(cls):
logger = LoggingManager.getInstance().getLogger(cls.__name__)
return logger
@classmethod
def executeSudoCommand(cls, cmd):
p = DmSubprocess('%s %s' % (cls.SUDO_CMD, cmd))
p.run()
@classmethod
def createGroup(cls, name):
""" Create group if it does not exist. """
logger = cls.getLogger()
try:
group = grp.getgrnam(name)
logger.debug('Group %s already exists' % name)
return
except KeyError, ex:
# ok, we need to create group
pass
logger.debug('Creating group %s' % name)
cmd = '%s %s' % (cls.GROUPADD_CMD, name)
cls.executeSudoCommand(cmd)
@classmethod
def addUserToGroup(cls, username, groupName):
""" Add user to group. """
logger = cls.getLogger()
logger.debug('Adding user %s to group %s' % (username, groupName))
cmd = '%s -a -G %s %s' % (cls.USERMOD_CMD, groupName, username)
cls.executeSudoCommand(cmd)
@classmethod
def setPathReadExecutePermissionsForGroup(cls, path, groupName):
""" Add user to group. """
logger = cls.getLogger()
logger.debug('Allowing group %s to read/execute path %s' % (groupName, path))
cmd = '%s -m group\:%s\:rx %s' % (cls.SETFACL_CMD, groupName, path)
cls.executeSudoCommand(cmd)
#######################################################################
# Testing.
if __name__ == '__main__':
LinuxUtility.createGroup('exp3')
LinuxUtility.addUserToGroup('hpc', 'exp3')
LinuxUtility.setPathReadExecutePermissionsForGroup('/home/sveseli/Work/DM/data/ESAF/exp3', 'exp3')
...@@ -41,13 +41,27 @@ class OsUtility: ...@@ -41,13 +41,27 @@ class OsUtility:
return return
files=os.listdir(path) files=os.listdir(path)
for f in files: for f in files:
fullpath=os.path.join(path, f) fullPath=os.path.join(path, f)
if os.path.islink(fullpath) or not os.path.isdir(fullpath): if os.path.islink(fullPath) or not os.path.isdir(fullPath):
os.remove(fullpath) os.remove(fullPath)
else: else:
removeDir(fullpath) removeDir(fullPath)
os.rmdir(path) os.rmdir(path)
@classmethod
def chmodPath(cls, path, fileMode=None, dirMode=None):
if os.path.isfile(path):
if fileMode is not None:
os.chmod(path, fileMode)
return
elif os.path.isdir(path):
files = os.listdir(path)
for f in files:
fullPath = os.path.join(path, f)
cls.chmodPath(fullPath, fileMode, dirMode)
if dirMode is not None:
os.chmod(path, dirMode)
@classmethod @classmethod
def chownPath(cls, path, uid, gid): def chownPath(cls, path, uid, gid):
""" Change owner on a given path recursively. """ """ Change owner on a given path recursively. """
...@@ -58,10 +72,10 @@ class OsUtility: ...@@ -58,10 +72,10 @@ class OsUtility:
os.lchown(path, uid, gid) os.lchown(path, uid, gid)
return return
elif os.path.isdir(path): elif os.path.isdir(path):
files=os.listdir(path) files = os.listdir(path)
for f in files: for f in files:
fullpath=os.path.join(path, f) fullPath = os.path.join(path, f)
chownPath(fullpath, uid, gid) cls.chownPath(fullPath, uid, gid)
os.chown(path, uid, gid) os.chown(path, uid, gid)
@classmethod @classmethod
...@@ -113,4 +127,5 @@ class OsUtility: ...@@ -113,4 +127,5 @@ class OsUtility:
# Testing. # Testing.
if __name__ == '__main__': if __name__ == '__main__':
pass OsUtility.chmodPath('/home/sveseli/Work/DM/data/ESAF/exp3', 0700)
#!/usr/bin/env python
import threading
import time
# Uses earliest allowed processing timestamp to sort items in the queue
# Queued item will not be processed until its earliest allowed processing
# timestamp has passed
class TimeBasedProcessingQueue:
def __init__(self):
self.lock = threading.RLock()
self.queue = []
self.itemPopTimeList = []
def push(self, item, itemProcessingWaitTime=0):
self.lock.acquire()
try:
earliestPopTime = time.time() + itemProcessingWaitTime
popIndex = 0
for t in self.itemPopTimeList:
if earliestPopTime <= t:
break
popIndex += 1
self.itemPopTimeList.insert(popIndex, earliestPopTime)
self.queue.insert(popIndex,item)
finally:
self.lock.release()
def pop(self):
# Return None if work queue is empty.
self.lock.acquire()
try:
item = None
if len(self.queue):
if self.itemPopTimeList[0] <= time.time():
del self.itemPopTimeList[0]
item = self.queue[0]
del self.queue[0]
return item
finally:
self.lock.release()
def getLength(self):
return len(self.queue)
def isEmpty(self):
return len(self.queue) == 0
####################################################################
# Testing
if __name__ == '__main__':
import random
q = TimeBasedProcessingQueue()
for i in range(0,10):
waitTime = random.uniform(0,10)
q.push(i, waitTime)
print 'Added: ', i, '; Processing wait: ', waitTime
while not q.isEmpty():
i = q.pop()
print 'Got: ', i
time.sleep(1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment