"""Holding the queue structure for unsettled Labels.""" from typing import Any, Dict, List from evrouting.utils import PriorityQueue from evrouting.T import SoC, Time from evrouting.charge.T import Label, SoCFunction from evrouting.charge.factories import SoCFunctionFactory class LabelPriorityQueue(PriorityQueue): """ Implementation of a variant of priority queue to store the ***unsettled*** labels of a vertex and efficiently extract the minimum label in the algorithm. The priority of a label is the minimum feasible time of it's according SoC Function. Tie breaker is the SoC at this time. It maintains the invariant: The queue is empty or the SoC Function of the minimum label is not dominated by any SoC Function of a ***settled*** label. """ def __init__(self, f_soc: SoCFunctionFactory, l_set: List[Label]): """ :param f_soc: SoC Function Factory to create SoC Functions of inserted labels for testing the invariant. :param l_set: Set of settled labels. """ super().__init__() self.f_soc_factory: SoCFunctionFactory = f_soc self.l_set: List[Label] = l_set def insert(self, label: Label): """Breaking ties with lowest soc at t_min.""" super().insert(item=label, **self.keys(self.f_soc_factory(label))) # If the minimum element has changed, check the invariant. if self.peak_min() == label: self.dominance_check() def delete_min(self) -> Any: """Delete and check the invariant.""" min_label = super().delete_min() self.dominance_check() return min_label def dominance_check(self): """ Compare the SoC Function of the minimum label with all SoC Functions of the already settled labels. If any settled label dominates the minimum label, the minimum label is removed, since it cannot lead to a better solution. """ try: label: Label = self.peak_min() except KeyError: return soc: SoCFunction = self.f_soc_factory(label) # Remove item if it gets dominated by any label in l_set if any(self.f_soc_factory(label).dominates(soc) for label in self.l_set): self.remove_item(label) @staticmethod def keys(f_soc: SoCFunction) -> Dict: """Return the keys for insertion. See class description.""" t_min: Time = f_soc.minimum soc_min: SoC = f_soc(t_min) return {'priority': t_min, 'count': soc_min}