Skip to content
Snippets Groups Projects
Commit bcbe5ccd authored by markn92's avatar markn92
Browse files

cleanup

parent d978e077
Branches
No related tags found
No related merge requests found
...@@ -3,14 +3,14 @@ from typing import Set, List ...@@ -3,14 +3,14 @@ from typing import Set, List
import networkx as nx import networkx as nx
from evrouting.T import Node, SoC, Result, EmptyResult, Time from evrouting.T import Node, SoC, Result, EmptyResult, Time
from evrouting.gasstation.T import State, DistFunction from evrouting.gasstation.T import State, DistFunction
from evrouting.gasstation.utils import dijkstra, fold_path
from evrouting.graph_tools import ( from evrouting.graph_tools import (
CONSUMPTION_KEY, CONSUMPTION_KEY,
DISTANCE_KEY, DISTANCE_KEY,
consumption, consumption,
distance, distance,
charging_cofficient charging_cofficient,
) )
from evrouting.graph_tools import sum_weights as fold_path
def insert_start_node(s: Node, def insert_start_node(s: Node,
...@@ -20,13 +20,12 @@ def insert_start_node(s: Node, ...@@ -20,13 +20,12 @@ def insert_start_node(s: Node,
graph_extended: nx.DiGraph, graph_extended: nx.DiGraph,
capacity: SoC, capacity: SoC,
initial_soc: SoC, initial_soc: SoC,
dist: DistFunction = dijkstra
) -> nx.DiGraph: ) -> nx.DiGraph:
"""Insert s into extended graph an create states and edges as necessary.""" """Insert s into extended graph an create states and edges as necessary."""
graph_extended.add_node((s, initial_soc)) graph_extended.add_node((s, initial_soc))
v: Node v: Node
for v in gas_stations: for v in gas_stations:
shortest_p: List[Node] = dist(graph_core, s, v, weight=CONSUMPTION_KEY) shortest_p: List[Node] = nx.shortest_path(graph_core, s, v, weight=CONSUMPTION_KEY)
w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY) w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY)
if w > initial_soc: if w > initial_soc:
continue continue
...@@ -62,13 +61,12 @@ def insert_final_node(t: Node, ...@@ -62,13 +61,12 @@ def insert_final_node(t: Node,
graph_extended: nx.DiGraph, graph_extended: nx.DiGraph,
capacity: SoC, capacity: SoC,
final_soc: SoC, final_soc: SoC,
dist: DistFunction = dijkstra
) -> nx.DiGraph: ) -> nx.DiGraph:
"""Insert terminal node into extended graph an create states and edges as necessary.""" """Insert terminal node into extended graph an create states and edges as necessary."""
graph_extended.add_node((t, final_soc)) graph_extended.add_node((t, final_soc))
u: Node u: Node
for u in gas_stations: for u in gas_stations:
shortest_p: List[Node] = dist(graph_core, t, u, weight=CONSUMPTION_KEY) shortest_p: List[Node] = nx.shortest_path(graph_core, t, u, weight=CONSUMPTION_KEY)
w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY) w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY)
if w + final_soc > capacity: if w + final_soc > capacity:
continue continue
...@@ -163,7 +161,7 @@ def state_graph(G: nx.Graph, capacity: SoC) -> nx.DiGraph: ...@@ -163,7 +161,7 @@ def state_graph(G: nx.Graph, capacity: SoC) -> nx.DiGraph:
def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph, def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph,
path: List[State], dist=dijkstra) -> Result: path: List[State]) -> Result:
trip_time: Time = 0 trip_time: Time = 0
charge_path = [] charge_path = []
u: Node u: Node
...@@ -177,7 +175,7 @@ def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph, ...@@ -177,7 +175,7 @@ def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph,
trip_time += t trip_time += t
charge_time_u: Time = t - fold_path( charge_time_u: Time = t - fold_path(
graph_core, graph_core,
dist(graph_core, u, v, weight=DISTANCE_KEY), nx.shortest_path(graph_core, u, v, weight=DISTANCE_KEY),
weight=DISTANCE_KEY weight=DISTANCE_KEY
) )
charge_path.append((u, charge_time_u)) charge_path.append((u, charge_time_u))
...@@ -187,8 +185,16 @@ def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph, ...@@ -187,8 +185,16 @@ def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph,
return Result(trip_time=trip_time, charge_path=charge_path) return Result(trip_time=trip_time, charge_path=charge_path)
def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, def shortest_path(G: nx.Graph,
initial_soc: SoC, final_soc: SoC, capacity: SoC) -> Result: charging_stations: Set[Node],
s: Node,
t: Node,
initial_soc: SoC,
final_soc: SoC,
capacity: SoC,
extended_graph=None,
contracted_graph=None
) -> Result:
""" """
Calculates shortest path using a generalized gas station algorithm. Calculates shortest path using a generalized gas station algorithm.
...@@ -202,7 +208,11 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, ...@@ -202,7 +208,11 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
:return: :return:
""" """
# Check if t is reachable from s # Check if t is reachable from s
_path = dijkstra(G, s, t, weight=CONSUMPTION_KEY) try:
_path = nx.shortest_path(G, s, t, weight=CONSUMPTION_KEY)
except nx.NetworkXNoPath:
return EmptyResult()
_w = fold_path(G, _path, weight=CONSUMPTION_KEY) _w = fold_path(G, _path, weight=CONSUMPTION_KEY)
if _w <= initial_soc: if _w <= initial_soc:
return Result( return Result(
...@@ -210,8 +220,9 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, ...@@ -210,8 +220,9 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
charge_path=[(s, 0), (t, 0)] charge_path=[(s, 0), (t, 0)]
) )
contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity) if extended_graph is None or contracted_graph is None:
extended_graph = state_graph(contracted_graph, capacity) contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity)
extended_graph = state_graph(contracted_graph, capacity)
extended_graph = insert_start_node( extended_graph = insert_start_node(
s=s, s=s,
...@@ -232,9 +243,12 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, ...@@ -232,9 +243,12 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
final_soc=final_soc final_soc=final_soc
) )
path: List[State] = dijkstra(extended_graph, (s, initial_soc), (t, final_soc)) try:
path: List[State] = nx.shortest_path(extended_graph, (s, initial_soc), (t, final_soc))
except nx.NetworkXNoPath:
return EmptyResult()
return EmptyResult() if not path else compose_result( return compose_result(
graph_core=G, graph_core=G,
extended_graph=extended_graph, extended_graph=extended_graph,
path=path path=path
......
from typing import List
import networkx as nx
from evrouting.gasstation.T import N
def dijkstra(G: nx.Graph, u: N, v: N, weight: str = 'weight') -> List[N]:
try:
return nx.algorithms.shortest_path(G, u, v, weight=weight)
except nx.NetworkXNoPath:
return []
def fold_path(G: nx.Graph, path: List[N], weight: str):
return sum([G.edges[u, v][weight] for u, v in zip(path[:-1], path[1:])])
...@@ -3,8 +3,6 @@ import networkx as nx ...@@ -3,8 +3,6 @@ import networkx as nx
import pytest import pytest
from evrouting.gasstation.routing import ( from evrouting.gasstation.routing import (
contract_graph, contract_graph,
dijkstra,
fold_path,
get_possible_arriving_soc, get_possible_arriving_soc,
state_graph, state_graph,
insert_final_node, insert_final_node,
...@@ -17,21 +15,7 @@ from evrouting.graph_tools import ( ...@@ -17,21 +15,7 @@ from evrouting.graph_tools import (
DISTANCE_KEY, DISTANCE_KEY,
CHARGING_COEFFICIENT_KEY CHARGING_COEFFICIENT_KEY
) )
from tests.config import edge_case, gasstation, init_config, gasstation_complete from tests.config import gasstation, init_config, gasstation_complete
class TestDjikstra:
@pytest.mark.parametrize("u,v,min_len", [(0, 1, 1), (0, 2, 3), (0, 4, 2), (0, 6, 3)])
def test_djikstra(self, u, v, min_len):
conf: dict = init_config(gasstation)
shortest_path = dijkstra(conf['G'], u, v, CONSUMPTION_KEY)
assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == min_len
def test_djikstra_via_node(self):
conf: dict = init_config(edge_case)
shortest_path = dijkstra(conf['G'], 0, 2, CONSUMPTION_KEY)
assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == 2
class TestContraction: class TestContraction:
......
import os
import json
import pytest
from evrouting.osm.imports import OSMGraph, read_osm
from evrouting.osm.profiles import car
@pytest.fixture
def graph():
G = OSMGraph()
node_coordinates = [
(51.7705832, 7.0002595),
(51.7696529, 6.9568520)
]
for n_id, coordinates in enumerate(node_coordinates):
lat, lon = coordinates
# Add two nodes, that exist in osm test map
G.add_node(n_id, lat=lat, lon=lon)
G.insert_into_rtree(n_id)
yield G
del G
@pytest.fixture
def map_graph():
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
G = read_osm(os.path.join(STATIC_DIR, 'map.osm'), car)
with open(os.path.join(STATIC_DIR, 'charging_stations.json'), 'r') as f:
charging_stations = json.load(f)
G.insert_charging_stations(charging_stations)
yield G
del G
from evrouting import gasstation
from evrouting.T import Result
from evrouting.osm.profiles import car
from evrouting.osm.routing import shortest_path
from evrouting.graph_tools import (
CHARGING_COEFFICIENT_KEY,
DISTANCE_KEY,
HAVERSINE_KEY,
consumption_function_distance_factory
)
def test_charge_shortest_route_dimensions(map_graph):
"""Full tank is enough to get there. Must be equal to shortest path."""
s = (51.75041438844966, 6.9332313537597665)
t = (51.75657783347559, 7.000350952148438)
_s = map_graph.find_nearest(s)
_t = map_graph.find_nearest(t)
consumption = 1 # kWh/km
cost_path = 6 * consumption * 1000 # distance * consumption in Wh
c = consumption_function_distance_factory(consumption)
result = charge.routing.shortest_path(
G=map_graph,
charging_stations=map_graph.charging_stations,
s=_s,
t=_t,
initial_soc=10000, # > cost_path
final_soc=0,
capacity=10000,
c=c
)
path = shortest_path(map_graph, _s, _t, car)
assert type(result) is Result
assert result.charge_path == path.charge_path
def test_charge_shortest_route_stop(map_graph):
"""Full tank is enough to get there. Must be equal to shortest path."""
s = (51.75041438844966, 6.9332313537597665)
t = (51.75657783347559, 7.000350952148438)
_s = map_graph.find_nearest(s)
_t = map_graph.find_nearest(t)
consumption = 1 # kWh/km
cost_path = 6 * consumption * 1000 # distance * consumption in Wh
c = consumption_function_distance_factory(consumption)
result = charge.routing.shortest_path(
G=map_graph,
charging_stations=map_graph.charging_stations,
s=_s,
t=_t,
initial_soc=2000, # > cost_path
final_soc=0,
capacity=10000,
c=c
)
assert type(result) is Result
charge_at = [t for n, t in result.charge_path if t > 0]
# charge once
assert len(charge_at) == 1
# Charge about 10 min
assert charge_at[0] < 600
assert charge_at[0] > 500
import os
import json
import pytest
from evrouting import charge from evrouting import charge
from evrouting.T import Result from evrouting.T import Result
from evrouting.osm.imports import read_osm, OSMGraph
from evrouting.osm.profiles import car from evrouting.osm.profiles import car
from evrouting.osm.routing import shortest_path from evrouting.osm.routing import shortest_path
from evrouting.graph_tools import ( from evrouting.graph_tools import (
CHARGING_COEFFICIENT_KEY, CHARGING_COEFFICIENT_KEY,
DISTANCE_KEY, DISTANCE_KEY,
CONSUMPTION_KEY,
HAVERSINE_KEY, HAVERSINE_KEY,
consumption_function_distance_factory consumption_function_distance_factory
) )
@pytest.fixture
def graph():
G = OSMGraph()
node_coordinates = [
(51.7705832, 7.0002595),
(51.7696529, 6.9568520)
]
for n_id, coordinates in enumerate(node_coordinates):
lat, lon = coordinates
# Add two nodes, that exist in osm test map
G.add_node(n_id, lat=lat, lon=lon)
G.insert_into_rtree(n_id)
yield G
del G
@pytest.fixture
def map_graph():
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
G = read_osm(os.path.join(STATIC_DIR, 'map.osm'), car)
with open(os.path.join(STATIC_DIR, 'charging_stations.json'), 'r') as f:
charging_stations = json.load(f)
G.insert_charging_stations(charging_stations)
yield G
del G
def test_insert_charging_stations_close(graph): def test_insert_charging_stations_close(graph):
# Close two node 1 # Close two node 1
S = [{"lon": 7.0002593, "lat": 51.7705832, "power": 22.0}] S = [{"lon": 7.0002593, "lat": 51.7705832, "power": 22.0}]
...@@ -163,4 +125,3 @@ def test_charge_shortest_route_stop(map_graph): ...@@ -163,4 +125,3 @@ def test_charge_shortest_route_stop(map_graph):
# Charge about 10 min # Charge about 10 min
assert charge_at[0] < 600 assert charge_at[0] < 600
assert charge_at[0] > 500 assert charge_at[0] > 500
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment