diff --git a/evrouting/T.py b/evrouting/T.py index a72149b7a1db4576c9924da58726c55be577cc3b..da06ff23b7180bffb7dde89b8e45e31d13c666a8 100644 --- a/evrouting/T.py +++ b/evrouting/T.py @@ -1,4 +1,5 @@ -from typing import Tuple, Union, NewType, Dict, Any +from dataclasses import dataclass +from typing import Tuple, Union, NewType, Dict, Any, List Node = int Edge = Tuple[Node, Node] @@ -12,3 +13,14 @@ SoC = NewType('SoC', Wh) ChargingCoefficient = Union[float, int, None] Time = Union[float, int] + + +@dataclass +class Result: + trip_time: Union[Time, None] + charge_path: List[Tuple[Node, Time]] + + +class EmptyResult(Result): + def __init__(self): + super().__init__(trip_time=None, charge_path=[]) diff --git a/evrouting/charge/routing.py b/evrouting/charge/routing.py index af83a3fbd87375d25b8190f79f8abc09197ff0eb..c7133a59622c801545bc8921e066606370200d79 100644 --- a/evrouting/charge/routing.py +++ b/evrouting/charge/routing.py @@ -11,7 +11,7 @@ from typing import Dict, List, Tuple, Set, Union from math import inf import networkx as nx -from evrouting.T import Node, SoC, Time +from evrouting.T import Node, SoC, Time, Result, EmptyResult from evrouting.utils import PriorityQueue from evrouting.graph_tools import distance from evrouting.charge.T import SoCFunction, Label @@ -24,7 +24,7 @@ from evrouting.charge.factories import ( def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, - initial_soc: SoC, final_soc: SoC, capacity: SoC) -> Dict: + initial_soc: SoC, final_soc: SoC, capacity: SoC) -> Result: """ Calculates shortest path using the CHarge algorithm. @@ -128,7 +128,7 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, if is_new_min: prio_queue.insert(n, **keys(f_soc_factory(label_neighbour))) - return _result() + return EmptyResult() def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, @@ -203,7 +203,7 @@ def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, ) -def _result(label: Label = None, f_soc_min: Time = None) -> Dict: +def _result(label: Label, f_soc_min: Time) -> Result: """ Returns a dict with two fields, as described below. @@ -215,9 +215,6 @@ def _result(label: Label = None, f_soc_min: Time = None) -> Dict: :return List[Tuple[Node, Time]] result['path']: List of Nodes and their according charging time along the path. """ - if any(arg is None for arg in [label, f_soc_min]): - return {'trip_time': None, 'path': []} - # Remember where charging time applies # First entry comes from the time necessary to charge at the last # charging stop to reach the goal. @@ -238,4 +235,4 @@ def _result(label: Label = None, f_soc_min: Time = None) -> Dict: node = label.parent_node label = label.parent_label - return {'trip_time': f_soc_min, 'path': path[::-1]} + return Result(trip_time=f_soc_min, charge_path=path[::-1]) diff --git a/evrouting/gasstation/routing.py b/evrouting/gasstation/routing.py index 3a17f678169546ba9c6afe815f7c65ecb8152daa..7f26a2ac30b5e02a6f6b52766fb665ac72351b46 100644 --- a/evrouting/gasstation/routing.py +++ b/evrouting/gasstation/routing.py @@ -1,7 +1,7 @@ -from typing import Set, Callable, List, Any, Dict, Tuple +from typing import Set, Callable, List, Any, Tuple import networkx as nx -from evrouting.T import Node, SoC +from evrouting.T import Node, SoC, Result, EmptyResult from evrouting.graph_tools import ( CONSUMPTION_KEY, DISTANCE_KEY, @@ -15,7 +15,10 @@ DistFunction = Callable[[nx.Graph, Node, Node, str], Path] def dijkstra(G: nx.Graph, u: Any, v: Any, weight: str = 'weight') -> list: - return nx.algorithms.shortest_path(G, u, v, weight=weight) + try: + return nx.algorithms.shortest_path(G, u, v, weight=weight) + except nx.NetworkXNoPath: + return [] def fold_path(G: nx.Graph, path: Path, weight: str): @@ -32,6 +35,7 @@ def insert_start_node(s: Node, dist: DistFunction = dijkstra ) -> nx.DiGraph: """Insert s into extended graph an create states and edges as necessary.""" + graph_extended.add_node((s, initial_soc)) for v in gas_stations: shortest_p = dist(graph_core, s, v, weight=CONSUMPTION_KEY) w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY) @@ -72,6 +76,7 @@ def insert_final_node(t: Node, dist: DistFunction = dijkstra ) -> nx.DiGraph: """Insert terminal node into extended graph an create states and edges as necessary.""" + graph_extended.add_node((t, final_soc)) for u in gas_stations: shortest_p = dist(graph_core, t, u, weight=CONSUMPTION_KEY) w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY) @@ -162,11 +167,11 @@ def state_graph(G: nx.Graph, capacity: SoC) -> nx.DiGraph: def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph, path: List[Tuple[Node, Node]], - dist=dijkstra) -> dict: - charge_path = [(path[0][0], 0)] # Start Node + dist=dijkstra) -> Result: trip_time = sum([extended_graph.edges[u, v]['weight'] for u, v in zip(path[:-1], path[1:])]) - for i in range(1, len(path) - 1): + charge_path = [] + for i in range(len(path) - 1): u, g_u = path[i] v, g_v = path[i + 1] t = extended_graph.edges[(u, g_u), (v, g_v)]['weight'] @@ -179,11 +184,11 @@ def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph, path: List[ charge_path.append((path[-1][0], 0)) # Final Node - return {'trip_time': trip_time, 'path': charge_path} + return Result(trip_time=trip_time, charge_path=charge_path) def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, - initial_soc: SoC, final_soc: SoC, capacity: SoC) -> Dict: + initial_soc: SoC, final_soc: SoC, capacity: SoC) -> Result: """ Calculates shortest path using a generalized gas station algorithm. @@ -196,14 +201,13 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, :return: """ # Check if t is reachable from s - # Todo: Test! _path = dijkstra(G, s, t, weight=CONSUMPTION_KEY) _w = fold_path(G, _path, weight=CONSUMPTION_KEY) if _w <= initial_soc: - return { - 'trip_time': fold_path(G, _path, weight=DISTANCE_KEY), - 'path': [(s, 0), (t, 0)] - } + return Result( + trip_time=fold_path(G, _path, weight=DISTANCE_KEY), + charge_path=[(s, 0), (t, 0)] + ) contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity) extended_graph = state_graph(contracted_graph, capacity) @@ -229,7 +233,7 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, path: List[Tuple[Node, Node]] = dijkstra(extended_graph, (s, initial_soc), (t, final_soc)) - return compose_result( + return EmptyResult() if not path else compose_result( graph_core=G, extended_graph=extended_graph, path=path diff --git a/tests/charge/test_charge_routing.py b/tests/charge/test_charge_routing.py index 815697346eb63c72072df2cd97c4a55d9c2924fb..c4fcc062f872e6650d5d93ea1c50a51ba9dc59d4 100644 --- a/tests/charge/test_charge_routing.py +++ b/tests/charge/test_charge_routing.py @@ -1,5 +1,5 @@ from evrouting.charge import shortest_path - +from evrouting.T import Result, EmptyResult from ..config import ( edge_case, edge_case_start_node_no_cs, @@ -14,15 +14,15 @@ class TestRoutes: """Charging at s.""" result = shortest_path(**init_config(edge_case)) - assert result['trip_time'] == 3.5 - assert result['path'] == [(0, 1), (1, 0.5), (2, 0)] + assert result.trip_time == 3.5 + assert result.charge_path == [(0, 1), (1, 0.5), (2, 0)] def test_shortest_path_charge_at_s_only(self): """Charging at s.""" result = shortest_path(**init_config(edge_case_a_slow)) - assert result['trip_time'] == 3 - assert result['path'] == [(0, 1), (1, 0), (2, 0)] + assert result.trip_time == 3 + assert result.charge_path == [(0, 1), (1, 0), (2, 0)] def test_shortest_path_no_charge_s_path_t(self): """No charging at s but enough initial SoC to go to t directly.""" @@ -30,8 +30,8 @@ class TestRoutes: conf['initial_soc'] = 4 result = shortest_path(**conf) - assert result['trip_time'] == 1 - assert result['path'] == [(0, 0), (2, 0)] + assert result.trip_time == 1 + assert result.charge_path == [(0, 0), (2, 0)] def test_shortest_path_no_charge_s_path_a(self): """No charging at s but just enough SoC to go to t via a.""" @@ -39,8 +39,8 @@ class TestRoutes: conf['initial_soc'] = 2 result = shortest_path(**conf) - assert result['trip_time'] == 2 - assert result['path'] == [(0, 0), (1, 0), (2, 0)] + assert result.trip_time == 2 + assert result.charge_path == [(0, 0), (1, 0), (2, 0)] class TestWithFinalSoC: @@ -49,19 +49,18 @@ class TestWithFinalSoC: """Charging at s.""" conf = init_config(edge_case) conf['final_soc'] = 3 - result = shortest_path(**conf) + result: Result = shortest_path(**conf) - assert result['trip_time'] == 5 - assert result['path'] == [(0, 1), (1, 2), (2, 0)] + assert result.trip_time == 5 + assert result.charge_path == [(0, 1), (1, 2), (2, 0)] def test_path_impossilbe(self): """Not possible to end with full battery.""" conf = init_config(edge_case) conf['final_soc'] = 4 - result = shortest_path(**conf) + result: Result = shortest_path(**conf) - assert result['trip_time'] is None - assert result['path'] == [] + assert isinstance(result, EmptyResult) def test_shortest_path_charge_at_s_only(self): """Charging at s and a to reach final_soc.""" @@ -69,8 +68,8 @@ class TestWithFinalSoC: conf['final_soc'] = 3 result = shortest_path(**conf) - assert result['trip_time'] == 5 - assert result['path'] == [(0, 2), (1, 1), (2, 0)] + assert result.trip_time == 5 + assert result.charge_path == [(0, 2), (1, 1), (2, 0)] def test_shortest_path_no_charge_s_path_t(self): """No charging at s but initial soc.""" @@ -79,6 +78,6 @@ class TestWithFinalSoC: conf['final_soc'] = 3 result = shortest_path(**conf) - assert result['trip_time'] == 2.5 - assert result['path'] == [(0, 0), (1, .5), (2, 0)] + assert result.trip_time == 2.5 + assert result.charge_path == [(0, 0), (1, .5), (2, 0)] diff --git a/tests/gasstation/test_gasstation_routing.py b/tests/gasstation/test_gasstation_routing.py index 62d09a5d7c4f29aa2d2eebc3e5e8aca598a2c172..575f44cc3c1946fdcb3e24482613b789d960c51d 100644 --- a/tests/gasstation/test_gasstation_routing.py +++ b/tests/gasstation/test_gasstation_routing.py @@ -1,13 +1,18 @@ +import networkx as nx from evrouting.gasstation.routing import ( shortest_path, ) +from evrouting.graph_tools import ( + DISTANCE_KEY, CONSUMPTION_KEY +) +from evrouting.T import EmptyResult from tests.config import gasstation_complete, init_config def test_shortest_path(): conf = init_config(gasstation_complete) - path = shortest_path( + r = shortest_path( G=conf['G'], charging_stations=conf['charging_stations'], s=conf['s'], @@ -16,5 +21,114 @@ def test_shortest_path(): final_soc=conf['final_soc'], capacity=conf['capacity'] ) + assert r.charge_path == [(0, 0), (1, 1), (3, 0)] + assert r.trip_time == 11 + - assert path == {'path': [(0, 0), (1, 1), (3, 0)], 'trip_time': 11} +def test_unreachable_initial_soc(): + conf = init_config(gasstation_complete) + conf['initial_soc'] = 0 + + r = shortest_path( + G=conf['G'], + charging_stations=conf['charging_stations'], + s=conf['s'], + t=conf['t'], + initial_soc=conf['initial_soc'], + final_soc=conf['final_soc'], + capacity=conf['capacity'] + ) + assert isinstance(r, EmptyResult) + + +def test_unreachable(): + conf = init_config(gasstation_complete) + G: nx.Graph = conf['G'] + + G.add_edge(2, 3, **{ + DISTANCE_KEY: 10, + CONSUMPTION_KEY: 5 + }) + G.add_edge(1, 3, **{ + DISTANCE_KEY: 10, + CONSUMPTION_KEY: 5 + }) + + r = shortest_path( + G=conf['G'], + charging_stations=conf['charging_stations'], + s=conf['s'], + t=conf['t'], + initial_soc=conf['initial_soc'], + final_soc=conf['final_soc'], + capacity=conf['capacity'] + ) + assert isinstance(r, EmptyResult) + + +def test_unreachable_final_soc(): + conf = init_config(gasstation_complete) + conf['final_soc'] = 4 + + r = shortest_path( + G=conf['G'], + charging_stations=conf['charging_stations'], + s=conf['s'], + t=conf['t'], + initial_soc=conf['initial_soc'], + final_soc=conf['final_soc'], + capacity=conf['capacity'] + ) + + assert isinstance(r, EmptyResult) + + +def test_s_from_t_reachable(): + conf = init_config(gasstation_complete) + conf['t'] = 2 + r = shortest_path( + G=conf['G'], + charging_stations=conf['charging_stations'], + s=conf['s'], + t=conf['t'], + initial_soc=conf['initial_soc'], + final_soc=conf['final_soc'], + capacity=conf['capacity'] + ) + assert r.charge_path == [(0, 0), (2, 0)] + assert r.trip_time == 2 + + +def test_s_gasstation(): + conf = init_config(gasstation_complete) + conf['s'] = 2 + conf['initial_soc'] = 0 + r = shortest_path( + G=conf['G'], + charging_stations=conf['charging_stations'], + s=conf['s'], + t=conf['t'], + initial_soc=conf['initial_soc'], + final_soc=conf['final_soc'], + capacity=conf['capacity'] + ) + + assert r.charge_path == [(2, 8), (3, 0)] + assert r.trip_time == 8 + 8 + + +def test_t_gasstation(): + conf = init_config(gasstation_complete) + conf['t'] = 3 + + r = shortest_path( + G=conf['G'], + charging_stations=conf['charging_stations'], + s=conf['s'], + t=conf['t'], + initial_soc=conf['initial_soc'], + final_soc=conf['final_soc'], + capacity=conf['capacity'] + ) + assert r.charge_path == [(0, 0), (1, 1), (3, 0)] + assert r.trip_time == 11 diff --git a/tests/gasstation/test_transformations.py b/tests/gasstation/test_transformations.py index 1640cb995cb8894bf28ad09344f366f9f375423e..e6c5c9fdb23f416fe2d4f6eaad91459b9471b49c 100644 --- a/tests/gasstation/test_transformations.py +++ b/tests/gasstation/test_transformations.py @@ -345,7 +345,5 @@ class TestResult(Integration): def test_compose_result(self, graph, inserted_t): path = [(0, 4), (1, 1), (3, 0)] result = compose_result(graph, inserted_t, path) - assert result == { - 'trip_time': 6 + 1 + 4, - 'path': [(0, 0), (1, 1), (3, 0)] - } + assert result.trip_time == 6 + 1 + 4 + assert result.charge_path == [(0, 0), (1, 1), (3, 0)]