diff --git a/evrouting/charge/T.py b/evrouting/charge/T.py index e85d78b07d9105380445771a8acb8e6b6e438d1f..dbf7c1d54662b4c42abf3503c3c041848a826d33 100644 --- a/evrouting/charge/T.py +++ b/evrouting/charge/T.py @@ -180,11 +180,6 @@ class Label(NamedTuple): last_cs: Node soc_profile_cs_v: SoCProfile - @property - def key(self): - """Key for sorting.""" - return self.t_trip - class SoCFunction: """ diff --git a/evrouting/charge/factories.py b/evrouting/charge/factories.py index 5d60978b0b952c01d4f2b952ccad93f8bd83a477..03af6fd8cfdd94d429cdfb440836236803c2a271 100644 --- a/evrouting/charge/factories.py +++ b/evrouting/charge/factories.py @@ -3,7 +3,7 @@ from typing import Dict from .T import SoCProfile, SoCFunction, ChargingFunction, Label from ..T import Node, SoC, Time -from ..graph_tools import charging_cofficient, consumption, distance +from ..graph_tools import charging_cofficient, consumption def charging_function_factory( diff --git a/evrouting/charge/routing.py b/evrouting/charge/routing.py index 8a25a6cb4ed73278c31a67f89acdad894dfe902a..4c62972845468c43a62e00f043cd95075f726901 100644 --- a/evrouting/charge/routing.py +++ b/evrouting/charge/routing.py @@ -4,7 +4,9 @@ from math import inf import networkx as nx from evrouting.T import Node, SoC from evrouting.utils import PriorityQueue -from evrouting.charge.factories import LabelsFactory, ChargingFunctionMap, soc_profile_factory +from evrouting.charge.factories import ( + LabelsFactory, ChargingFunctionMap, soc_profile_factory +) from ..graph_tools import distance from .T import SoCFunction, SoCProfile, Label @@ -26,21 +28,24 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, :param U: Capacity :return: """ + t = _apply_final_constraints(G, t, final_soc) + cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc) label_factory = LabelsFactory(G, capacity, cf, initial_soc) # Init maps to manage labels l_set: Dict[int, set] = {v: set() for v in G} - l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue() for v in G} + l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(cf) for v in G} # Init environment - entry_label = _create_entry_label(G, charging_stations, s, initial_soc, capacity) - l_uns[s].insert(entry_label, cf[entry_label.last_cs]) + entry_label = _create_entry_label(G, charging_stations, + s, initial_soc, capacity) + l_uns[s].insert(entry_label) # A priority queue defines which node to visit next. # The key is the trip time. prio_queue = PriorityQueue() - prio_queue.insert(s, 0) + prio_queue.insert(s, priority=0, count=0) while True: try: @@ -58,14 +63,16 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, ).minimum # handle charging stations - if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs: + if minimum_node in charging_stations and \ + not minimum_node == label_minimum_node.last_cs: if cf[minimum_node] > cf[label_minimum_node.last_cs]: - label_new = label_factory.spawn_label(minimum_node, label_minimum_node) - l_uns[minimum_node].insert(label_new, cf[minimum_node]) + label_new = label_factory.spawn_label(minimum_node, + label_minimum_node) + l_uns[minimum_node].insert(label_new) # Update priority queue. This node might have gotten a new # minimum label spawned is th previous step. - _update_priority_queue(prio_queue, l_uns, minimum_node) + _update_priority_queue(cf, prio_queue, l_uns, minimum_node) # scan outgoing arcs for n in G.neighbors(minimum_node): @@ -81,7 +88,7 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, soc_profile_cs_v=soc_profile ) try: - l_uns[n].insert(l_new, cf[l_new.last_cs]) + l_uns[n].insert(l_new) except ValueError: # Infeasible because last_cs might be an # dummy charging station. Therefore, the path might @@ -94,7 +101,20 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, pass else: if l_new == l_uns[n].peak_min(): - prio_queue.insert(n, l_new.key) + key, count = _key(l_new, cf[l_new.last_cs]) + prio_queue.insert(n, priority=key, count=count) + + +def _key(label, cf): + soc_function = SoCFunction( + label, + cf + ) + + t_min = soc_function.minimum + soc_min = soc_function(t_min) + + return t_min, soc_min def _create_entry_label( @@ -136,6 +156,7 @@ def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool: def _update_priority_queue( + cf: ChargingFunctionMap, prio_queue: PriorityQueue, l_uns: Dict[int, LabelPriorityQueue], node: Node): @@ -144,9 +165,18 @@ def _update_priority_queue( its minimum label. """ try: - minimum_label = l_uns[node].peak_min() + minimum_label: Label = l_uns[node].peak_min() except KeyError: # l_uns[v] empty prio_queue.delete_min() else: - prio_queue.insert(node, minimum_label.key) + key, count = _key(minimum_label, cf[minimum_label.last_cs]) + prio_queue.insert(node, priority=key, count=count) + + +def _apply_final_constraints(G: nx.Graph, t: Node, final_soc: SoC) -> Node: + temp_final_node = len(G) + G.add_node(temp_final_node) + G.add_edge(t, temp_final_node, weight=0, c=final_soc) + + return temp_final_node diff --git a/evrouting/charge/utils.py b/evrouting/charge/utils.py index 0ebd923b487340ea035acba75903a778a547cb0c..376dd8337178a2836b2ed08302f7d3c09877c12f 100644 --- a/evrouting/charge/utils.py +++ b/evrouting/charge/utils.py @@ -1,17 +1,22 @@ from math import inf from evrouting.utils import PriorityQueue -from evrouting.T import SoC, Time, Node +from evrouting.T import SoC, Time -from .T import Label, SoCFunction, ChargingFunction +from .T import Label, SoCFunction +from .factories import ChargingFunctionMap class LabelPriorityQueue(PriorityQueue): - def insert(self, label: Label, cf: ChargingFunction): + def __init__(self, cf: ChargingFunctionMap): + super().__init__() + self.cf: ChargingFunctionMap = cf + + def insert(self, label: Label): """Breaking ties with lowest soc at t_min.""" soc_function = SoCFunction( label, - cf + self.cf[label.last_cs] ) t_min: Time = soc_function.minimum @@ -27,5 +32,3 @@ class LabelPriorityQueue(PriorityQueue): priority=t_min, count=soc_min ) - - diff --git a/tests/charge/test_charge_routing.py b/tests/charge/test_charge_routing.py index 6d6a5a28166937fb2e985f86df6c635ebbbdd8de..901335e3481c576a1a78612c7f8947a88c6de6d5 100644 --- a/tests/charge/test_charge_routing.py +++ b/tests/charge/test_charge_routing.py @@ -8,33 +8,60 @@ from ..config import ( ) -def test_shortest_path_charge_at_s_and_a(): - """Charging at s.""" - path = shortest_path(**init_config(edge_case)) +class TestRoutes: - assert path == 3.5 + def test_shortest_path_charge_at_s_and_a(self): + """Charging at s.""" + path = shortest_path(**init_config(edge_case)) + assert path == 3.5 -def test_shortest_path_charge_at_s_only(): - """Charging at s.""" - path = shortest_path(**init_config(edge_case_a_slow)) + def test_shortest_path_charge_at_s_only(self): + """Charging at s.""" + path = shortest_path(**init_config(edge_case_a_slow)) - assert path == 3 + assert path == 3 + def test_shortest_path_no_charge_s_path_t(self): + """No charging at s but enough initial SoC to go to t directly.""" + conf = init_config(edge_case_start_node_no_cs) + conf['initial_soc'] = 4 + path = shortest_path(**conf) -def test_shortest_path_no_charge_s_path_t(): - """No charging at s but enough initial SoC to go to t directly.""" - conf = init_config(edge_case_start_node_no_cs) - conf['initial_soc'] = 4 - path = shortest_path(**conf) + assert path == 1 - assert path == 1 + def test_shortest_path_no_charge_s_path_a(self): + """No charging at s but just enough SoC to go to t via a.""" + conf = init_config(edge_case_start_node_no_cs) + conf['initial_soc'] = 2 + path = shortest_path(**conf) + assert path == 2 -def test_shortest_path_no_charge_s_path_a(): - """No charging at s but just enough SoC to go to t via a.""" - conf = init_config(edge_case_start_node_no_cs) - conf['initial_soc'] = 2 - path = shortest_path(**conf) - assert path == 2 +class TestWithFinalSoC: + + def test_shortest_path_charge_at_s_and_a(self): + """Charging at s.""" + conf = init_config(edge_case) + conf['final_soc'] = 3 + path = shortest_path(**conf) + + assert path == 5 + + def test_shortest_path_charge_at_s_only(self): + """Charging at s and a to reach final_soc.""" + conf = init_config(edge_case_a_slow) + conf['final_soc'] = 3 + path = shortest_path(**conf) + + assert path == 4 + + def test_shortest_path_no_charge_s_path_t(self): + """No charging at s but initial soc.""" + conf = init_config(edge_case_start_node_no_cs) + conf['initial_soc'] = 4 + conf['final_soc'] = 3 + path = shortest_path(**conf) + + assert path == 2.5