diff --git a/evrouting/utils.py b/evrouting/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..e19100959ef999434fae2a3236fb73ef678827fe --- /dev/null +++ b/evrouting/utils.py @@ -0,0 +1,44 @@ +import itertools +from heapq import * + + +class PriorityQueue: + REMOVED = '<removed-task>' # placeholder for a removed task + + def __init__(self): + self.pq = [] # list of entries arranged in a heap + self.entry_finder = {} # mapping of tasks to entries + self.counter = itertools.count() # unique sequence count as tie break + + def insert(self, item, priority=0): + """Add a new task or update the priority of an existing task""" + if item in self.entry_finder: + self.remove_item(item) + count = next(self.counter) + entry = [priority, count, item] + self.entry_finder[item] = entry + heappush(self.pq, entry) + + def remove_item(self, item): + """Mark an existing task as REMOVED. Raise KeyError if not found.""" + entry = self.entry_finder.pop(item) + entry[-1] = self.REMOVED + + def delete_min(self): + """Remove and return the lowest priority task. Raise KeyError if empty.""" + while self.pq: + priority, count, item = heappop(self.pq) + if item is not self.REMOVED: + del self.entry_finder[item] + return item + raise KeyError('pop from an empty priority queue') + + def peak_min(self): + """Return minimum item without removing it from the queue.""" + while self.pq: + priority, count, item = self.pq[0] + if item is not self.REMOVED: + return item + else: + heappop(self.pq) + raise KeyError('Empty queue.') diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..2015392ccc7c09a54753489ca819c19f2f197a35 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,33 @@ +import pytest +from evrouting.utils import PriorityQueue + + +@pytest.fixture +def q(): + q = PriorityQueue() + q.insert('high', 3) + q.insert('low', 2) + + yield q + del q + + +class TestProrityQueue: + def test_insert(self, q): + assert q.delete_min() == 'low' + assert q.delete_min() == 'high' + + with pytest.raises(KeyError): + q.delete_min() + + def test_peak(self, q): + assert q.peak_min() == 'low' + assert q.peak_min() == 'low' # does not get removed + + def test_update(self, q): + q.insert('high', 1) + assert q.delete_min() == 'high' + assert q.delete_min() == 'low' + + with pytest.raises(KeyError): + q.delete_min()