Skip to content
Snippets Groups Projects
utils.py 1.56 KiB
Newer Older
import itertools
markn92's avatar
markn92 committed
from typing import Any
from heapq import *


class PriorityQueue:
    REMOVED = '<removed-task>'  # placeholder for a removed task

    def __init__(self):
        self.pq = []  # list of entries arranged in a heap
        self.entry_finder = {}  # mapping of tasks to entries
        self.counter = itertools.count()  # unique sequence count as tie break

markn92's avatar
markn92 committed
    def insert(self, item: Any, priority=0):
        """Add a new task or update the priority of an existing task"""
        if item in self.entry_finder:
            self.remove_item(item)
        count = next(self.counter)
        entry = [priority, count, item]
        self.entry_finder[item] = entry
        heappush(self.pq, entry)

markn92's avatar
markn92 committed
    def remove_item(self, item: Any):
        """Mark an existing task as REMOVED.  Raise KeyError if not found."""
        entry = self.entry_finder.pop(item)
        entry[-1] = self.REMOVED

markn92's avatar
markn92 committed
    def delete_min(self) -> Any:
        """Remove and return the lowest priority task. Raise KeyError if empty."""
        while self.pq:
            priority, count, item = heappop(self.pq)
            if item is not self.REMOVED:
                del self.entry_finder[item]
                return item
        raise KeyError('pop from an empty priority queue')

markn92's avatar
markn92 committed
    def peak_min(self) -> Any:
        """Return minimum item without removing it from the queue."""
        while self.pq:
            priority, count, item = self.pq[0]
            if item is not self.REMOVED:
                return item
            else:
                heappop(self.pq)
        raise KeyError('Empty queue.')