From bcbe5ccd36b90c5f7b60204f1dc51f99afbbccea Mon Sep 17 00:00:00 2001 From: "niehues.mark@gmail.com" <niehues.mark@gmail.com> Date: Wed, 29 Apr 2020 10:11:33 +0200 Subject: [PATCH] cleanup --- evrouting/gasstation/routing.py | 44 ++++++++++----- evrouting/gasstation/utils.py | 14 ----- tests/gasstation/test_transformations.py | 18 +----- tests/osm/conftest.py | 38 +++++++++++++ tests/osm/notest_gasstation_osm.py | 71 ++++++++++++++++++++++++ tests/osm/test_osm_charge.py | 39 ------------- 6 files changed, 139 insertions(+), 85 deletions(-) delete mode 100644 evrouting/gasstation/utils.py create mode 100644 tests/osm/conftest.py create mode 100644 tests/osm/notest_gasstation_osm.py diff --git a/evrouting/gasstation/routing.py b/evrouting/gasstation/routing.py index 214d3b2..cccf5e5 100644 --- a/evrouting/gasstation/routing.py +++ b/evrouting/gasstation/routing.py @@ -3,14 +3,14 @@ from typing import Set, List import networkx as nx from evrouting.T import Node, SoC, Result, EmptyResult, Time from evrouting.gasstation.T import State, DistFunction -from evrouting.gasstation.utils import dijkstra, fold_path from evrouting.graph_tools import ( CONSUMPTION_KEY, DISTANCE_KEY, consumption, distance, - charging_cofficient + charging_cofficient, ) +from evrouting.graph_tools import sum_weights as fold_path def insert_start_node(s: Node, @@ -20,13 +20,12 @@ def insert_start_node(s: Node, graph_extended: nx.DiGraph, capacity: SoC, initial_soc: SoC, - dist: DistFunction = dijkstra ) -> nx.DiGraph: """Insert s into extended graph an create states and edges as necessary.""" graph_extended.add_node((s, initial_soc)) v: Node for v in gas_stations: - shortest_p: List[Node] = dist(graph_core, s, v, weight=CONSUMPTION_KEY) + shortest_p: List[Node] = nx.shortest_path(graph_core, s, v, weight=CONSUMPTION_KEY) w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY) if w > initial_soc: continue @@ -62,13 +61,12 @@ def insert_final_node(t: Node, graph_extended: nx.DiGraph, capacity: SoC, final_soc: SoC, - dist: DistFunction = dijkstra ) -> nx.DiGraph: """Insert terminal node into extended graph an create states and edges as necessary.""" graph_extended.add_node((t, final_soc)) u: Node for u in gas_stations: - shortest_p: List[Node] = dist(graph_core, t, u, weight=CONSUMPTION_KEY) + shortest_p: List[Node] = nx.shortest_path(graph_core, t, u, weight=CONSUMPTION_KEY) w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY) if w + final_soc > capacity: continue @@ -163,7 +161,7 @@ def state_graph(G: nx.Graph, capacity: SoC) -> nx.DiGraph: def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph, - path: List[State], dist=dijkstra) -> Result: + path: List[State]) -> Result: trip_time: Time = 0 charge_path = [] u: Node @@ -177,7 +175,7 @@ def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph, trip_time += t charge_time_u: Time = t - fold_path( graph_core, - dist(graph_core, u, v, weight=DISTANCE_KEY), + nx.shortest_path(graph_core, u, v, weight=DISTANCE_KEY), weight=DISTANCE_KEY ) charge_path.append((u, charge_time_u)) @@ -187,8 +185,16 @@ def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph, return Result(trip_time=trip_time, charge_path=charge_path) -def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, - initial_soc: SoC, final_soc: SoC, capacity: SoC) -> Result: +def shortest_path(G: nx.Graph, + charging_stations: Set[Node], + s: Node, + t: Node, + initial_soc: SoC, + final_soc: SoC, + capacity: SoC, + extended_graph=None, + contracted_graph=None + ) -> Result: """ Calculates shortest path using a generalized gas station algorithm. @@ -202,7 +208,11 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, :return: """ # Check if t is reachable from s - _path = dijkstra(G, s, t, weight=CONSUMPTION_KEY) + try: + _path = nx.shortest_path(G, s, t, weight=CONSUMPTION_KEY) + except nx.NetworkXNoPath: + return EmptyResult() + _w = fold_path(G, _path, weight=CONSUMPTION_KEY) if _w <= initial_soc: return Result( @@ -210,8 +220,9 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, charge_path=[(s, 0), (t, 0)] ) - contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity) - extended_graph = state_graph(contracted_graph, capacity) + if extended_graph is None or contracted_graph is None: + contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity) + extended_graph = state_graph(contracted_graph, capacity) extended_graph = insert_start_node( s=s, @@ -232,9 +243,12 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, final_soc=final_soc ) - path: List[State] = dijkstra(extended_graph, (s, initial_soc), (t, final_soc)) + try: + path: List[State] = nx.shortest_path(extended_graph, (s, initial_soc), (t, final_soc)) + except nx.NetworkXNoPath: + return EmptyResult() - return EmptyResult() if not path else compose_result( + return compose_result( graph_core=G, extended_graph=extended_graph, path=path diff --git a/evrouting/gasstation/utils.py b/evrouting/gasstation/utils.py deleted file mode 100644 index 854d41e..0000000 --- a/evrouting/gasstation/utils.py +++ /dev/null @@ -1,14 +0,0 @@ -from typing import List -import networkx as nx -from evrouting.gasstation.T import N - - -def dijkstra(G: nx.Graph, u: N, v: N, weight: str = 'weight') -> List[N]: - try: - return nx.algorithms.shortest_path(G, u, v, weight=weight) - except nx.NetworkXNoPath: - return [] - - -def fold_path(G: nx.Graph, path: List[N], weight: str): - return sum([G.edges[u, v][weight] for u, v in zip(path[:-1], path[1:])]) diff --git a/tests/gasstation/test_transformations.py b/tests/gasstation/test_transformations.py index 51a7858..ea09b58 100644 --- a/tests/gasstation/test_transformations.py +++ b/tests/gasstation/test_transformations.py @@ -3,8 +3,6 @@ import networkx as nx import pytest from evrouting.gasstation.routing import ( contract_graph, - dijkstra, - fold_path, get_possible_arriving_soc, state_graph, insert_final_node, @@ -17,21 +15,7 @@ from evrouting.graph_tools import ( DISTANCE_KEY, CHARGING_COEFFICIENT_KEY ) -from tests.config import edge_case, gasstation, init_config, gasstation_complete - - -class TestDjikstra: - @pytest.mark.parametrize("u,v,min_len", [(0, 1, 1), (0, 2, 3), (0, 4, 2), (0, 6, 3)]) - def test_djikstra(self, u, v, min_len): - conf: dict = init_config(gasstation) - shortest_path = dijkstra(conf['G'], u, v, CONSUMPTION_KEY) - assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == min_len - - def test_djikstra_via_node(self): - conf: dict = init_config(edge_case) - shortest_path = dijkstra(conf['G'], 0, 2, CONSUMPTION_KEY) - - assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == 2 +from tests.config import gasstation, init_config, gasstation_complete class TestContraction: diff --git a/tests/osm/conftest.py b/tests/osm/conftest.py new file mode 100644 index 0000000..0640e4d --- /dev/null +++ b/tests/osm/conftest.py @@ -0,0 +1,38 @@ +import os +import json + +import pytest + +from evrouting.osm.imports import OSMGraph, read_osm +from evrouting.osm.profiles import car + + +@pytest.fixture +def graph(): + G = OSMGraph() + + node_coordinates = [ + (51.7705832, 7.0002595), + (51.7696529, 6.9568520) + ] + + for n_id, coordinates in enumerate(node_coordinates): + lat, lon = coordinates + # Add two nodes, that exist in osm test map + G.add_node(n_id, lat=lat, lon=lon) + G.insert_into_rtree(n_id) + + yield G + del G + + +@pytest.fixture +def map_graph(): + STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') + G = read_osm(os.path.join(STATIC_DIR, 'map.osm'), car) + with open(os.path.join(STATIC_DIR, 'charging_stations.json'), 'r') as f: + charging_stations = json.load(f) + G.insert_charging_stations(charging_stations) + + yield G + del G diff --git a/tests/osm/notest_gasstation_osm.py b/tests/osm/notest_gasstation_osm.py new file mode 100644 index 0000000..126c3c4 --- /dev/null +++ b/tests/osm/notest_gasstation_osm.py @@ -0,0 +1,71 @@ +from evrouting import gasstation +from evrouting.T import Result +from evrouting.osm.profiles import car +from evrouting.osm.routing import shortest_path +from evrouting.graph_tools import ( + CHARGING_COEFFICIENT_KEY, + DISTANCE_KEY, + HAVERSINE_KEY, + consumption_function_distance_factory +) + + +def test_charge_shortest_route_dimensions(map_graph): + """Full tank is enough to get there. Must be equal to shortest path.""" + s = (51.75041438844966, 6.9332313537597665) + t = (51.75657783347559, 7.000350952148438) + _s = map_graph.find_nearest(s) + _t = map_graph.find_nearest(t) + + consumption = 1 # kWh/km + cost_path = 6 * consumption * 1000 # distance * consumption in Wh + + c = consumption_function_distance_factory(consumption) + result = charge.routing.shortest_path( + G=map_graph, + charging_stations=map_graph.charging_stations, + s=_s, + t=_t, + initial_soc=10000, # > cost_path + final_soc=0, + capacity=10000, + c=c + ) + + path = shortest_path(map_graph, _s, _t, car) + + assert type(result) is Result + assert result.charge_path == path.charge_path + + +def test_charge_shortest_route_stop(map_graph): + """Full tank is enough to get there. Must be equal to shortest path.""" + s = (51.75041438844966, 6.9332313537597665) + t = (51.75657783347559, 7.000350952148438) + _s = map_graph.find_nearest(s) + _t = map_graph.find_nearest(t) + + consumption = 1 # kWh/km + cost_path = 6 * consumption * 1000 # distance * consumption in Wh + + c = consumption_function_distance_factory(consumption) + + result = charge.routing.shortest_path( + G=map_graph, + charging_stations=map_graph.charging_stations, + s=_s, + t=_t, + initial_soc=2000, # > cost_path + final_soc=0, + capacity=10000, + c=c + ) + + assert type(result) is Result + charge_at = [t for n, t in result.charge_path if t > 0] + # charge once + assert len(charge_at) == 1 + + # Charge about 10 min + assert charge_at[0] < 600 + assert charge_at[0] > 500 diff --git a/tests/osm/test_osm_charge.py b/tests/osm/test_osm_charge.py index 8251672..0a24192 100644 --- a/tests/osm/test_osm_charge.py +++ b/tests/osm/test_osm_charge.py @@ -1,53 +1,15 @@ -import os -import json - -import pytest - from evrouting import charge from evrouting.T import Result -from evrouting.osm.imports import read_osm, OSMGraph from evrouting.osm.profiles import car from evrouting.osm.routing import shortest_path from evrouting.graph_tools import ( CHARGING_COEFFICIENT_KEY, DISTANCE_KEY, - CONSUMPTION_KEY, HAVERSINE_KEY, consumption_function_distance_factory ) -@pytest.fixture -def graph(): - G = OSMGraph() - - node_coordinates = [ - (51.7705832, 7.0002595), - (51.7696529, 6.9568520) - ] - - for n_id, coordinates in enumerate(node_coordinates): - lat, lon = coordinates - # Add two nodes, that exist in osm test map - G.add_node(n_id, lat=lat, lon=lon) - G.insert_into_rtree(n_id) - - yield G - del G - - -@pytest.fixture -def map_graph(): - STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') - G = read_osm(os.path.join(STATIC_DIR, 'map.osm'), car) - with open(os.path.join(STATIC_DIR, 'charging_stations.json'), 'r') as f: - charging_stations = json.load(f) - G.insert_charging_stations(charging_stations) - - yield G - del G - - def test_insert_charging_stations_close(graph): # Close two node 1 S = [{"lon": 7.0002593, "lat": 51.7705832, "power": 22.0}] @@ -163,4 +125,3 @@ def test_charge_shortest_route_stop(map_graph): # Charge about 10 min assert charge_at[0] < 600 assert charge_at[0] > 500 - -- GitLab