Skip to content
Snippets Groups Projects
Commit 7358366c authored by markn92's avatar markn92
Browse files

keeping the invariant

parent 311236f5
No related branches found
No related tags found
No related merge requests found
...@@ -138,18 +138,6 @@ class ChargingFunction: ...@@ -138,18 +138,6 @@ class ChargingFunction:
"""Comparison for dominance check.""" """Comparison for dominance check."""
return self.c < other.c return self.c < other.c
def __le__(self, other) -> bool:
"""Comparison for dominance check."""
return self.c <= other.c
def __eq__(self, other) -> bool:
"""Comparison for dominance check."""
return self.c == other.c
def __ge__(self, other):
"""Comparison for dominance check."""
return self.c >= other.c
def __gt__(self, other): def __gt__(self, other):
"""Comparison for dominance check.""" """Comparison for dominance check."""
return self.c > other.c return self.c > other.c
...@@ -245,6 +233,30 @@ class SoCFunction: ...@@ -245,6 +233,30 @@ class SoCFunction:
self.cf_cs(t - self.t_trip, self.soc_last_cs) self.cf_cs(t - self.t_trip, self.soc_last_cs)
) )
def __lt__(self, other: 'SoCFunction') -> bool:
"""Comparison for dominance check."""
for t_i, soc_i in self.breakpoints:
if other(t_i) < soc_i:
return False
for t_i, soc_i in other.breakpoints:
if soc_i < self(t_i):
return False
return True
def __gt__(self, other: 'SoCFunction') -> bool:
"""Comparison for dominance check."""
for t_i, soc_i in self.breakpoints:
if other(t_i) > soc_i:
return False
for t_i, soc_i in other.breakpoints:
if soc_i > self(t_i):
return False
return True
@property @property
def minimum(self) -> Time: def minimum(self) -> Time:
""" """
......
from typing import Dict, List from typing import Dict, List, Set
from math import inf from math import inf
import networkx as nx import networkx as nx
...@@ -39,8 +39,8 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, ...@@ -39,8 +39,8 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
label_factory = LabelsFactory(G, capacity, f_soc, initial_soc) label_factory = LabelsFactory(G, capacity, f_soc, initial_soc)
# Init maps to manage labels # Init maps to manage labels
l_set: Dict[int, set] = {v: set() for v in G} l_set: Dict[int, Set[Label]] = {v: set() for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(cf) for v in G} l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(cf, l_set[v]) for v in G}
# Init environment # Init environment
entry_label = _create_entry_label(G, charging_stations, entry_label = _create_entry_label(G, charging_stations,
...@@ -102,11 +102,16 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, ...@@ -102,11 +102,16 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
# #
# That means, the SoC and thereby the range is restricted # That means, the SoC and thereby the range is restricted
# to the SoC at the last cs (soc_last_cs). # to the SoC at the last cs (soc_last_cs).
pass continue
else:
if l_new == l_uns[n].peak_min(): try:
key, count = _key(l_new, f_soc) is_new_min_label: bool = l_new == l_uns[n].peak_min()
prio_queue.insert(n, priority=key, count=count) except KeyError:
continue
if is_new_min_label:
key, count = _key(l_new, f_soc)
prio_queue.insert(n, priority=key, count=count)
def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, capacity: SoC) -> List[Time]: def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, capacity: SoC) -> List[Time]:
......
from typing import Set, Any
from math import inf from math import inf
from evrouting.utils import PriorityQueue from evrouting.utils import PriorityQueue
...@@ -8,9 +9,10 @@ from .factories import ChargingFunctionMap ...@@ -8,9 +9,10 @@ from .factories import ChargingFunctionMap
class LabelPriorityQueue(PriorityQueue): class LabelPriorityQueue(PriorityQueue):
def __init__(self, cf: ChargingFunctionMap): def __init__(self, cf: ChargingFunctionMap, l_set: Set[Label]):
super().__init__() super().__init__()
self.cf: ChargingFunctionMap = cf self.cf: ChargingFunctionMap = cf
self.l_set: Set[Label] = l_set
def insert(self, label: Label): def insert(self, label: Label):
"""Breaking ties with lowest soc at t_min.""" """Breaking ties with lowest soc at t_min."""
...@@ -32,3 +34,24 @@ class LabelPriorityQueue(PriorityQueue): ...@@ -32,3 +34,24 @@ class LabelPriorityQueue(PriorityQueue):
priority=t_min, priority=t_min,
count=soc_min count=soc_min
) )
if self.peak_min() == label:
self.dominance_check()
def delete_min(self) -> Any:
min_label = super().delete_min()
self.dominance_check()
return min_label
def dominance_check(self):
try:
label: Label = self.peak_min()
except KeyError:
return
soc = SoCFunction(label, self.cf[label.last_cs])
for other_label in self.l_set:
if SoCFunction(other_label, self.cf[other_label.last_cs]) > soc:
self.remove_item(label)
return
...@@ -49,6 +49,14 @@ class TestWithFinalSoC: ...@@ -49,6 +49,14 @@ class TestWithFinalSoC:
assert path == 5 assert path == 5
def test_path_impossilbe(self):
"""Not possible to end with full battery."""
conf = init_config(edge_case)
conf['final_soc'] = 4
path = shortest_path(**conf)
assert path is None
def test_shortest_path_charge_at_s_only(self): def test_shortest_path_charge_at_s_only(self):
"""Charging at s and a to reach final_soc.""" """Charging at s and a to reach final_soc."""
conf = init_config(edge_case_a_slow) conf = init_config(edge_case_a_slow)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment