Skip to content
Snippets Groups Projects
Commit a0e61fb9 authored by sveseli's avatar sveseli
Browse files

added singleton and thread safe queue classes

parent 9e6a9bb1
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
class Singleton(object):
__instance = None
def __new__(cls, *args, **kwargs):
# Allow subclasses to create their own instances.
if cls.__instance is None or cls != type(cls.__instance):
instance = object.__new__(cls, *args, **kwargs)
cls.__instance = instance
instance.__init__(*args, **kwargs)
return cls.__instance
@classmethod
def getInstance(cls, *args, **kwargs):
return cls.__new__(cls, *args, **kwargs)
def __init__(self, *args, **kwargs):
# Only initialize once.
if Singleton.__instance is not None:
return
####################################################################
# Testing
if __name__ == '__main__':
s1 = Singleton.getInstance()
s2 = Singleton()
s3 = Singleton.getInstance()
s4 = Singleton()
print 'S1: ', s1
print 'S2: ', s2
print 'S3: ', s3
print 'S4: ', s4
class A(Singleton):
def __init__(self, x):
self.x = x
class B(Singleton):
def __init__(self, x):
self.x = x
class C(Singleton):
def __init__(self):
self.x = 14
a1 = A(3)
a2 = A(4)
print a1
print a2
print a2.x, a1.x
b1 = B(6)
b2 = B(5)
print b1
print b2
print b2.x, b1.x
c1 = C()
c2 = C()
print c1
print c2
print c2.x, c1.x
#!/usr/bin/env python
import threading
class ThreadSafeQueue:
def __init__(self):
self.lock = threading.RLock()
self.queue = []
def push(self, item):
self.lock.acquire()
try:
self.queue.insert(0,item)
finally:
self.lock.release()
def pop(self):
# Return None if work queue is empty.
self.lock.acquire()
try:
item = None
if len(self.queue):
item = self.queue.pop()
return item
finally:
self.lock.release()
def getLength(self):
return len(self.queue)
def isEmpty(self):
return len(self.queue) == 0
####################################################################
# Testing
if __name__ == '__main__':
q = ThreadSafeQueue()
for i in range(0,10):
q.push(i)
print 'Added: ', i
while not q.isEmpty():
i = q.pop()
print 'Got: ', i
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment