diff --git a/evrouting/charge/T.py b/evrouting/charge/T.py index 6484fcc51c8d8fd538c93032000d892db62cc073..ba195d9b61ef734f20e9781843f616a55045bed9 100644 --- a/evrouting/charge/T.py +++ b/evrouting/charge/T.py @@ -160,6 +160,8 @@ class Label(NamedTuple): soc_last_cs: SoC last_cs: Node soc_profile_cs_v: SoCProfile + parent_label: Union['Label', None] = None + parent_node: Union[Node, None] = None class Breakpoint(NamedTuple): diff --git a/evrouting/charge/routing.py b/evrouting/charge/routing.py index 1dae50c136a41298a7abfe630fc623b48eae236f..92be48cd24330d16f67c85c3ab22dc45d7ccd15d 100644 --- a/evrouting/charge/routing.py +++ b/evrouting/charge/routing.py @@ -7,11 +7,11 @@ Implementation of the CHArge algorithm [0] with two further constraints: [0] https://dl.acm.org/doi/10.1145/2820783.2820826 """ -from typing import Dict, List, Tuple, Set +from typing import Dict, List, Tuple, Set, Union from math import inf import networkx as nx -from evrouting.T import Node, SoC +from evrouting.T import Node, SoC, Time from evrouting.utils import PriorityQueue from evrouting.graph_tools import distance from evrouting.charge.T import SoCFunction, Label @@ -24,7 +24,7 @@ from evrouting.charge.factories import ( def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, - initial_soc: SoC, final_soc: SoC, capacity: SoC): + initial_soc: SoC, final_soc: SoC, capacity: SoC) -> Dict: """ Calculates shortest path using the CHarge algorithm. @@ -39,6 +39,7 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, :return: """ + actual_t: Node = t t, factories, queues = _setup( G, charging_stations, capacity, initial_soc, final_soc, s, t ) @@ -61,7 +62,10 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, l_set[node_min].append(label_node_min) if node_min == t: - return f_soc_factory(label_node_min).minimum + return _result( + label_node_min, + f_soc_factory(label_node_min).minimum, + node_min) # Handle charging stations if node_min in charging_stations and node_min != label_node_min.last_cs: @@ -75,12 +79,14 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, t_trip=label_node_min.t_trip + t_charge, soc_last_cs=f_soc(label_node_min.t_trip + t_charge), last_cs=node_min, - soc_profile_cs_v=soc_profile_factory(node_min) + soc_profile_cs_v=soc_profile_factory(node_min), + parent_node=node_min, + parent_label=label_node_min ) ) # Update priority queue. This node might have gotten a new - # minimum label spawned is th previous step. + # minimum label spawned is the previous step. try: prio_queue.insert( item=node_min, @@ -108,7 +114,9 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, t_trip=label_node_min.t_trip + distance(G, node_min, n), soc_last_cs=label_node_min.soc_last_cs, last_cs=label_node_min.last_cs, - soc_profile_cs_v=soc_profile + soc_profile_cs_v=soc_profile, + parent_node=node_min, + parent_label=label_node_min ) l_uns[n].insert(label_neighbour) @@ -122,6 +130,8 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, if is_new_min: prio_queue.insert(n, **keys(f_soc_factory(label_neighbour))) + return _result() + def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, initial_soc: SoC, final_soc: SoC, s: Node, t: Node @@ -171,7 +181,9 @@ def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, t_trip=0, soc_last_cs=initial_soc, last_cs=dummy_node, - soc_profile_cs_v=soc_profile_factory(s) + soc_profile_cs_v=soc_profile_factory(s), + parent_node=None, + parent_label=None )) # A priority queue defines which node to visit next. @@ -191,3 +203,50 @@ def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, 'priority queue': prio_queue } ) + + +def _result(label: Label = None, f_soc_min: Time = None, + node: Node = None) -> Dict: + """ + Returns a dict with two fields, as described below. + + :param label: The final label of the algorithm + :param f_soc_min: The min time of the SoC Function of the final label + :param node: The final node. + + :return Time result['trip_time']: The overall trip time ```f_soc_min``` + :return List[Tuple[Time, Node]] result['path']: List of Nodes passed + along the path. It is modelled as Tuples of Trip Time and Node. + If a Node appears twice in a row, it means, the battery got + charged. The charging time is the difference of the trip times. + """ + if any(arg is None for arg in [label, f_soc_min, node]): + return {'trip_time': None, 'path': []} + + t_charge_map = {label.last_cs: f_soc_min - label.t_trip} + path = [] + + # Skip inserted extra node + node = label.parent_node + label = label.parent_label + time = f_soc_min + while label is not None: + if node == label.parent_node: + # Label got spawned at fixing t_charge of the parent's label + # last_cs. For the current label holds: label.last_cs == node + t_charge_map[label.parent_label.last_cs] = label.t_trip - label.parent_label.t_trip + else: + path.append((node, t_charge_map.get(node, 0))) + """ + if node not in t_charge_map: + t_trip = label.t_trip - label.parent_label.t_trip + time -= t_trip + path.append((node, time)) + else: + time -= t_charge_map[node] + path.append((node, t_charge_map[node])) + """ + node = label.parent_node + label = label.parent_label + + return {'trip_time': f_soc_min, 'path': path[::-1]} diff --git a/tests/charge/test_charge_routing.py b/tests/charge/test_charge_routing.py index b01a7fc4229730fc082ec6cc6982489e69cfa8fa..def061fd96f72e4fdf4816359f34f9ba459e14ca 100644 --- a/tests/charge/test_charge_routing.py +++ b/tests/charge/test_charge_routing.py @@ -12,31 +12,35 @@ class TestRoutes: def test_shortest_path_charge_at_s_and_a(self): """Charging at s.""" - path = shortest_path(**init_config(edge_case)) + result = shortest_path(**init_config(edge_case)) - assert path == 3.5 + assert result['trip_time'] == 3.5 + assert result['path'] == [(0, 0), (0, 1), (1, 2), (1, 2.5), (2, 3.5)] def test_shortest_path_charge_at_s_only(self): """Charging at s.""" - path = shortest_path(**init_config(edge_case_a_slow)) + result = shortest_path(**init_config(edge_case_a_slow)) - assert path == 3 + assert result['trip_time'] == 3 + assert result['path'] == [(0, 0), (0, 2), (2, 3)] def test_shortest_path_no_charge_s_path_t(self): """No charging at s but enough initial SoC to go to t directly.""" conf = init_config(edge_case_start_node_no_cs) conf['initial_soc'] = 4 - path = shortest_path(**conf) + result = shortest_path(**conf) - assert path == 1 + assert result['trip_time'] == 1 + assert result['path'] == [(0, 0), (2, 1)] def test_shortest_path_no_charge_s_path_a(self): """No charging at s but just enough SoC to go to t via a.""" conf = init_config(edge_case_start_node_no_cs) conf['initial_soc'] = 2 - path = shortest_path(**conf) + result = shortest_path(**conf) - assert path == 2 + assert result['trip_time'] == 2 + assert result['path'] == [(0, 0), (1, 1), (2, 2)] class TestWithFinalSoC: @@ -45,31 +49,36 @@ class TestWithFinalSoC: """Charging at s.""" conf = init_config(edge_case) conf['final_soc'] = 3 - path = shortest_path(**conf) + result = shortest_path(**conf) - assert path == 5 + assert result['trip_time'] == 5 + assert result['path'] == [(0, 0), (0, 1), (1, 2), (1, 4), (2, 5)] def test_path_impossilbe(self): """Not possible to end with full battery.""" conf = init_config(edge_case) conf['final_soc'] = 4 - path = shortest_path(**conf) + result = shortest_path(**conf) - assert path is None + assert result['trip_time'] is None + assert result['path'] == [] def test_shortest_path_charge_at_s_only(self): """Charging at s and a to reach final_soc.""" conf = init_config(edge_case_a_slow) conf['final_soc'] = 3 - path = shortest_path(**conf) + result = shortest_path(**conf) - assert path == 5 + assert result['trip_time'] == 5 + assert result['path'] == [(0, 0), (0, 2), (1, 3), (1, 4), (2, 5)] def test_shortest_path_no_charge_s_path_t(self): """No charging at s but initial soc.""" conf = init_config(edge_case_start_node_no_cs) conf['initial_soc'] = 4 conf['final_soc'] = 3 - path = shortest_path(**conf) + result = shortest_path(**conf) + + assert result['trip_time'] == 2.5 + assert result['path'] == [(0, 0), (1, 1), (1, 1.5), (2, 2.5)] - assert path == 2.5 diff --git a/tests/config.py b/tests/config.py index b8a730c9038a55ba7263b6c3a19c4b4647d10425..63877f8ddad236bd2a7276b99f20fc447a6e5116 100644 --- a/tests/config.py +++ b/tests/config.py @@ -44,7 +44,7 @@ edge_case_a_slow = { ], 'edges': [ TemplateEdge(0, 1, distance=1, consumption=1), - TemplateEdge(0, 2, distance=1, consumption=4), + TemplateEdge(0, 2, distance=1.5, consumption=4), TemplateEdge(1, 2, distance=1, consumption=1), ] }