diff --git a/evrouting/gasstation/routing.py b/evrouting/gasstation/routing.py index cccf5e521d8bead8424ed24f3c5de580908bd3fb..580cb3b1520eb7c8185e256e4c345ce02b796c5b 100644 --- a/evrouting/gasstation/routing.py +++ b/evrouting/gasstation/routing.py @@ -2,7 +2,7 @@ from typing import Set, List import networkx as nx from evrouting.T import Node, SoC, Result, EmptyResult, Time -from evrouting.gasstation.T import State, DistFunction +from evrouting.gasstation.T import State from evrouting.graph_tools import ( CONSUMPTION_KEY, DISTANCE_KEY, @@ -20,17 +20,18 @@ def insert_start_node(s: Node, graph_extended: nx.DiGraph, capacity: SoC, initial_soc: SoC, + c: float = 1. ) -> nx.DiGraph: """Insert s into extended graph an create states and edges as necessary.""" graph_extended.add_node((s, initial_soc)) v: Node for v in gas_stations: - shortest_p: List[Node] = nx.shortest_path(graph_core, s, v, weight=CONSUMPTION_KEY) - w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY) + shortest_p: List[Node] = nx.shortest_path(graph_core, s, v, weight=DISTANCE_KEY) + d = fold_path(graph_core, shortest_p, weight=DISTANCE_KEY) + w = c * d if w > initial_soc: continue - d = fold_path(graph_core, shortest_p, weight=DISTANCE_KEY) c_v = charging_cofficient(graph_core, v) g = initial_soc - w @@ -61,17 +62,17 @@ def insert_final_node(t: Node, graph_extended: nx.DiGraph, capacity: SoC, final_soc: SoC, + c: float = 1. ) -> nx.DiGraph: """Insert terminal node into extended graph an create states and edges as necessary.""" graph_extended.add_node((t, final_soc)) u: Node for u in gas_stations: - shortest_p: List[Node] = nx.shortest_path(graph_core, t, u, weight=CONSUMPTION_KEY) - w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY) + shortest_p: List[Node] = nx.shortest_path(graph_core, t, u, weight=DISTANCE_KEY) + d_u_t = fold_path(graph_core, shortest_p, weight=DISTANCE_KEY) + w = c * d_u_t if w + final_soc > capacity: continue - - d_u_t = fold_path(graph_core, shortest_p, weight=DISTANCE_KEY) c_u = charging_cofficient(graph_core, u) for g in [g for n, g in graph_extended.nodes if n == u]: if g > w + final_soc: @@ -86,7 +87,7 @@ def insert_final_node(t: Node, def contract_graph(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, - c=1) -> nx.Graph: + c: float = 1.) -> nx.Graph: """ :param G: Original graph :param charging_stations: Charging stations @@ -192,6 +193,7 @@ def shortest_path(G: nx.Graph, initial_soc: SoC, final_soc: SoC, capacity: SoC, + c: float, extended_graph=None, contracted_graph=None ) -> Result: @@ -209,19 +211,20 @@ def shortest_path(G: nx.Graph, """ # Check if t is reachable from s try: - _path = nx.shortest_path(G, s, t, weight=CONSUMPTION_KEY) + _path = nx.shortest_path(G, s, t, weight=DISTANCE_KEY) except nx.NetworkXNoPath: return EmptyResult() - _w = fold_path(G, _path, weight=CONSUMPTION_KEY) + _t = fold_path(G, _path, weight=DISTANCE_KEY) + _w = c * _t if _w <= initial_soc: return Result( - trip_time=fold_path(G, _path, weight=DISTANCE_KEY), - charge_path=[(s, 0), (t, 0)] + trip_time=_t, + charge_path=[(n, 0) for n in _path] ) if extended_graph is None or contracted_graph is None: - contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity) + contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity, c=c) extended_graph = state_graph(contracted_graph, capacity) extended_graph = insert_start_node( @@ -231,7 +234,8 @@ def shortest_path(G: nx.Graph, gas_stations=charging_stations, graph_extended=extended_graph, capacity=capacity, - initial_soc=initial_soc + initial_soc=initial_soc, + c=c ) extended_graph = insert_final_node( @@ -240,7 +244,8 @@ def shortest_path(G: nx.Graph, gas_stations=charging_stations, graph_extended=extended_graph, capacity=capacity, - final_soc=final_soc + final_soc=final_soc, + c=c ) try: diff --git a/evrouting/graph_tools.py b/evrouting/graph_tools.py index 38c8ab6744f31ab79e0d5c07cf9256858e16d899..0add35f882a16ece3a78c5e59484c65ee1dec9f1 100644 --- a/evrouting/graph_tools.py +++ b/evrouting/graph_tools.py @@ -1,4 +1,5 @@ from collections import namedtuple +from typing import Union import networkx as nx from evrouting.T import ( @@ -47,7 +48,11 @@ def label(G: nx.Graph, u: Node) -> str: return G.nodes[u][LABEL_KEY] -def sum_weights(G, path, weight) -> float: +def sum_weights(G, path, weight: str) -> float: + """ + :param weight: either key so weights are G[u][v][weight] or + a function f(G, u, v) -> float. + """ return sum(G[u][v][weight] for u, v in zip(path[:-1], path[1:])) @@ -64,15 +69,3 @@ def consumption_function_distance_factory(consumption: float) -> ConsumptionFunc return G[u][v][CONSUMPTION_KEY] return c - - -def consumption_function_time_factory(consumption: float) -> ConsumptionFunction: - """ - :param consumption: in kWh/s - """ - - def c(G, u, v): - """Returns consumption in Wh from u to v.""" - return G[u][v][DISTANCE_KEY] * consumption * 1000 - - return c diff --git a/tests/osm/notest_gasstation_osm.py b/tests/osm/notest_gasstation_osm.py deleted file mode 100644 index 126c3c4e6d2c035d90d33c619c6d25dcded07977..0000000000000000000000000000000000000000 --- a/tests/osm/notest_gasstation_osm.py +++ /dev/null @@ -1,71 +0,0 @@ -from evrouting import gasstation -from evrouting.T import Result -from evrouting.osm.profiles import car -from evrouting.osm.routing import shortest_path -from evrouting.graph_tools import ( - CHARGING_COEFFICIENT_KEY, - DISTANCE_KEY, - HAVERSINE_KEY, - consumption_function_distance_factory -) - - -def test_charge_shortest_route_dimensions(map_graph): - """Full tank is enough to get there. Must be equal to shortest path.""" - s = (51.75041438844966, 6.9332313537597665) - t = (51.75657783347559, 7.000350952148438) - _s = map_graph.find_nearest(s) - _t = map_graph.find_nearest(t) - - consumption = 1 # kWh/km - cost_path = 6 * consumption * 1000 # distance * consumption in Wh - - c = consumption_function_distance_factory(consumption) - result = charge.routing.shortest_path( - G=map_graph, - charging_stations=map_graph.charging_stations, - s=_s, - t=_t, - initial_soc=10000, # > cost_path - final_soc=0, - capacity=10000, - c=c - ) - - path = shortest_path(map_graph, _s, _t, car) - - assert type(result) is Result - assert result.charge_path == path.charge_path - - -def test_charge_shortest_route_stop(map_graph): - """Full tank is enough to get there. Must be equal to shortest path.""" - s = (51.75041438844966, 6.9332313537597665) - t = (51.75657783347559, 7.000350952148438) - _s = map_graph.find_nearest(s) - _t = map_graph.find_nearest(t) - - consumption = 1 # kWh/km - cost_path = 6 * consumption * 1000 # distance * consumption in Wh - - c = consumption_function_distance_factory(consumption) - - result = charge.routing.shortest_path( - G=map_graph, - charging_stations=map_graph.charging_stations, - s=_s, - t=_t, - initial_soc=2000, # > cost_path - final_soc=0, - capacity=10000, - c=c - ) - - assert type(result) is Result - charge_at = [t for n, t in result.charge_path if t > 0] - # charge once - assert len(charge_at) == 1 - - # Charge about 10 min - assert charge_at[0] < 600 - assert charge_at[0] > 500 diff --git a/tests/osm/test_gasstation_osm.py b/tests/osm/test_gasstation_osm.py new file mode 100644 index 0000000000000000000000000000000000000000..bf2f2f4efbb26605c3e32061dd81044f9251e8d6 --- /dev/null +++ b/tests/osm/test_gasstation_osm.py @@ -0,0 +1,59 @@ +from evrouting import gasstation +from evrouting.T import Result +from evrouting.osm.profiles import car +from evrouting.osm.routing import shortest_path + + +def test_charge_shortest_route_dimensions(map_graph): + """Full tank is enough to get there. Must be equal to shortest path.""" + s = (51.75041438844966, 6.9332313537597665) + t = (51.75657783347559, 7.000350952148438) + _s = map_graph.find_nearest(s) + _t = map_graph.find_nearest(t) + + consumption = 0.5 # kWh/s + # Traveltime gonna be less than 10 min = 600 sek = 300 kWh. + # Therefore initial soc of 300 000 + result = gasstation.shortest_path(G=map_graph, + charging_stations=map_graph.charging_stations, + s=_s, + t=_t, + initial_soc=300000, + final_soc=0, + capacity=300000, + c=consumption * 1000, + extended_graph=None, + contracted_graph=None + ) + + path = shortest_path(map_graph, _s, _t, car) + + assert type(result) is Result + assert result.charge_path == path.charge_path + + +def test_charge_shortest_route_stop(map_graph): + """Full tank is enough to get there. Must be equal to shortest path.""" + s = (51.75041438844966, 6.9332313537597665) + t = (51.75657783347559, 7.000350952148438) + _s = map_graph.find_nearest(s) + _t = map_graph.find_nearest(t) + + consumption = 0.5 # kWh/s + # Traveltime gonna be less than 10 min = 600 sek := 300 kWh. + # => initial soc of 300 000 + # Traveltime to first charging station is < 5 min = 300 sek := 150 kWh + # => Initial soc of 150 000 is enough to charge but not to reach target. + result = gasstation.shortest_path(G=map_graph, + charging_stations=map_graph.charging_stations, + s=_s, + t=_t, + initial_soc=150000, + final_soc=0, + capacity=300000, + c=consumption * 1000, + extended_graph=None, + contracted_graph=None + ) + + assert type(result) is Result