Forked from
DM / dm-docs
261 commits behind, 787 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
timeBasedProcessingQueue.py 1.89 KiB
#!/usr/bin/env python
import threading
import time
# Uses earliest allowed processing timestamp to sort items in the queue
# Queued item will not be processed until its earliest allowed processing
# timestamp has passed
class TimeBasedProcessingQueue:
def __init__(self):
self.lock = threading.RLock()
self.queue = []
self.itemPopTimeList = []
def push(self, item, itemProcessingWaitTime=0):
self.lock.acquire()
try:
earliestPopTime = time.time() + itemProcessingWaitTime
popIndex = 0
for t in self.itemPopTimeList:
if earliestPopTime <= t:
break
popIndex += 1
self.itemPopTimeList.insert(popIndex, earliestPopTime)
self.queue.insert(popIndex,item)
finally:
self.lock.release()
def pop(self):
# Return None if work queue is empty.
self.lock.acquire()
try:
item = None
if len(self.queue):
if self.itemPopTimeList[0] <= time.time():
del self.itemPopTimeList[0]
item = self.queue[0]
del self.queue[0]
return item
finally:
self.lock.release()
def getLength(self):
return len(self.queue)
def isEmpty(self):
return len(self.queue) == 0
####################################################################
# Testing
if __name__ == '__main__':
import random
q = TimeBasedProcessingQueue()
for i in range(0,10000000):
waitTime = random.uniform(0,10)
q.push(i, waitTime)
#print 'Added: ', i, '; Processing wait: ', waitTime
print "Sleeping..."
time.sleep(60)
print "Removing..."
while not q.isEmpty():
i = q.pop()
#print 'Got: ', i
#time.sleep(1)
print "Sleeping..."
time.sleep(60)