From d5391c9f51c9a6e0b31c022382c3ed44153c4d24 Mon Sep 17 00:00:00 2001 From: "niehues.mark@gmail.com" <niehues.mark@gmail.com> Date: Thu, 26 Mar 2020 13:04:18 +0100 Subject: [PATCH] contraction --- evrouting/gasstation/routing.py | 51 ++++++++++++++---- tests/gasstation/test_gasstation_routing.py | 57 ++++++++++++++++++--- 2 files changed, 90 insertions(+), 18 deletions(-) diff --git a/evrouting/gasstation/routing.py b/evrouting/gasstation/routing.py index c31ff3c..e545c4f 100644 --- a/evrouting/gasstation/routing.py +++ b/evrouting/gasstation/routing.py @@ -1,8 +1,11 @@ -from typing import Set, Union, Callable, List +from typing import Set, Callable, List + import networkx as nx from evrouting.T import Node, SoC +from evrouting.graph_tools import CONSUMPTION_KEY, DISTANCE_KEY -DistFunction = Callable[[nx.Graph, Node, Node], Union[int, float]] +Path = List[Node] +DistFunction = Callable[[nx.Graph, Node, Node], Path] def shortest_path(G: nx.Graph, s, t, b_0: float, b_t: float, U: float): @@ -20,14 +23,40 @@ def shortest_path(G: nx.Graph, s, t, b_0: float, b_t: float, U: float): pass -def dijkstra(G: nx.Graph, u: Node, v: Node, - weight: str = 'weight') -> Union[int, float]: - length, _ = nx.algorithms.shortest_paths.single_source_dijkstra( - G, u, v, weight=weight - ) - return length +def dijkstra(G: nx.Graph, u: Node, v: Node, weight: str = 'weight') -> Path: + return nx.algorithms.shortest_path(G, u, v, weight=weight) -def contract_graph(G: nx.Graph, S: Set[Node], U: SoC, - f_dist: DistFunction = dijkstra) -> nx.Graph: - pass +def fold_path(G: nx.Graph, path: Path, weight: str): + return sum([G.edges[u, v][weight] for u, v in zip(path[:-1], path[1:])]) + + +def contract_graph(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, + dist: DistFunction = dijkstra) -> nx.Graph: + """ + :param G: Original graph + :param charging_stations: Charging stations + :param capacity: Maximum battery capacity + :param dist: Minimum distance function, necessary if G is not a fully + connected Graph to calculate consumptions between charging stations. + :returns: Graph only consisting of Charging Stations whose neighbours must + be within the capacity U. If so, their edge has consumption and + distance of the minimum path. + """ + H: nx.Graph = nx.Graph() + + for cs in list(charging_stations): + H.add_node(cs, **G.nodes[cs]) + # Iterate unvisited charging stations + for n_cs in [n for n in charging_stations if (n, cs) not in H.edges]: + min_path: Path = dist(G, cs, n_cs) + consumption: SoC = fold_path(G, min_path, weight=CONSUMPTION_KEY) + if consumption <= capacity: + H.add_edge( + cs, n_cs, + **{ + CONSUMPTION_KEY: consumption, + DISTANCE_KEY:fold_path(G, min_path, weight=DISTANCE_KEY) + } + ) + return H diff --git a/tests/gasstation/test_gasstation_routing.py b/tests/gasstation/test_gasstation_routing.py index c92e04a..70a0d4d 100644 --- a/tests/gasstation/test_gasstation_routing.py +++ b/tests/gasstation/test_gasstation_routing.py @@ -1,8 +1,8 @@ import networkx as nx import pytest -from evrouting.gasstation.routing import contract_graph, dijkstra -from evrouting.graph_tools import label, CONSUMPTION_KEY +from evrouting.gasstation.routing import contract_graph, dijkstra, fold_path +from evrouting.graph_tools import label, CONSUMPTION_KEY, DISTANCE_KEY from tests.config import edge_case, get_graph, gasstation, init_config @@ -10,23 +10,66 @@ class TestDjikstra: @pytest.mark.parametrize("u,v,min_len", [(0, 1, 1), (0, 2, 3), (0, 4, 2), (0, 6, 3)]) def test_djikstra(self, u, v, min_len): conf: dict = init_config(gasstation) - assert dijkstra(conf['G'], u, v, CONSUMPTION_KEY) == min_len + shortest_path = dijkstra(conf['G'], u, v, CONSUMPTION_KEY) + assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == min_len def test_djikstra_via_node(self): conf: dict = init_config(edge_case) + shortest_path = dijkstra(conf['G'], 0, 2, CONSUMPTION_KEY) - assert dijkstra(conf['G'], 0, 2, CONSUMPTION_KEY) == 2 + assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == 2 class TestContraction: + def label_map(self, G: nx.Graph) -> dict: + return {label(G, n): n for n in G.nodes} + def test_contraction(self): conf: dict = init_config(gasstation) G: nx.Graph = conf['G'] - H: nx.Graph = contract_graph(G, conf['charging_stations'], conf['U']) + H: nx.Graph = contract_graph(G, + conf['charging_stations'], + conf['capacity']) # Check available gas stations - nodes = set(label(H, n) for n in H.nodes) - assert nodes == {'s', 'a', 'c'} + assert set(H.nodes) == conf['charging_stations'] + + @pytest.mark.parametrize('u,v,weight,value', [ + ('s', 'a', CONSUMPTION_KEY, 1), + ('s', 'a', DISTANCE_KEY, 1), + ('s', 'f', '', None), # Not exist + ('s', 'b', '', None), # Not exist + ('s', 'd', '', None), # Not exist + ('s', 'c', CONSUMPTION_KEY, 2), + ('s', 'c', DISTANCE_KEY, 2), + ('s', 'e', '', None) + ]) + def test_contraction_edges(self, u, v, weight, value): + """ + Test edges and values of its weights. + :param u: Node + :param v: Node + :param weight: Weight key to check. + :param value: Value the edge weight should have. + If None, it tests if the egde does not exist. + """ + conf: dict = init_config(gasstation) + H: nx.Graph = contract_graph(conf['G'], + conf['charging_stations'], conf['capacity']) + + label_map = self.label_map(H) + try: + edge = (label_map[u], label_map[v]) + except KeyError: + # One of the edges is no an charging station + if value is None: + return True + raise + + if value is None: + assert edge not in H.edges + else: + assert H.edges[edge][weight] == value class TestRouting: -- GitLab