From 1c0961150af97029f6744f49c6ac00d07f1b7136 Mon Sep 17 00:00:00 2001 From: "niehues.mark@gmail.com" <niehues.mark@gmail.com> Date: Tue, 28 Apr 2020 16:53:00 +0200 Subject: [PATCH] test setup --- evrouting/T.py | 5 ++- evrouting/charge/factories.py | 14 +++++-- evrouting/charge/routing.py | 20 +++++++--- evrouting/osm/imports.py | 16 +++++--- tests/osm/test_osm_charge.py | 72 ++++++++++++++++++++++++++--------- tests/osm/test_routing.py | 0 6 files changed, 92 insertions(+), 35 deletions(-) create mode 100644 tests/osm/test_routing.py diff --git a/evrouting/T.py b/evrouting/T.py index da06ff2..cf060e1 100644 --- a/evrouting/T.py +++ b/evrouting/T.py @@ -1,5 +1,6 @@ +import networkx as nx from dataclasses import dataclass -from typing import Tuple, Union, NewType, Dict, Any, List +from typing import Tuple, Union, NewType, Dict, Any, List, Callable Node = int Edge = Tuple[Node, Node] @@ -14,6 +15,8 @@ ChargingCoefficient = Union[float, int, None] Time = Union[float, int] +ConsumptionFunction = Callable[[nx.Graph, Node, Node], float] + @dataclass class Result: diff --git a/evrouting/charge/factories.py b/evrouting/charge/factories.py index 7131324..7bafe26 100644 --- a/evrouting/charge/factories.py +++ b/evrouting/charge/factories.py @@ -2,20 +2,26 @@ from typing import Dict import networkx as nx -from evrouting.T import Node, SoC -from evrouting.graph_tools import charging_cofficient, consumption +from evrouting.T import Node, SoC, ConsumptionFunction +from evrouting.graph_tools import charging_cofficient from evrouting.charge.T import SoCProfile, SoCFunction, ChargingFunction, Label class SoCProfileFactory: """Maps Nodes to their (cached) charging functions.""" - def __init__(self, G: nx.Graph, capacity: SoC): + def __init__(self, G: nx.Graph, capacity: SoC, c: ConsumptionFunction): + """ + :param G: + :param capacity: + :param c: Function to calc consumption for an edge. + """ self.G: nx.Graph = G self.capacity: SoC = capacity + self.c = c def __call__(self, u: Node, v: Node = None) -> SoCProfile: - path_cost = 0 if v is None else consumption(self.G, u, v) + path_cost = 0 if v is None else self.c(self.G, u, v) return SoCProfile(path_cost, self.capacity) diff --git a/evrouting/charge/routing.py b/evrouting/charge/routing.py index 3b3f626..d1a57f0 100644 --- a/evrouting/charge/routing.py +++ b/evrouting/charge/routing.py @@ -11,9 +11,9 @@ from typing import Dict, List, Tuple, Set from math import inf import networkx as nx -from evrouting.T import Node, SoC, Time, Result, EmptyResult +from evrouting.T import Node, SoC, Time, Result, EmptyResult, ConsumptionFunction from evrouting.utils import PriorityQueue -from evrouting.graph_tools import distance +from evrouting.graph_tools import distance, consumption from evrouting.charge.T import SoCFunction, Label from evrouting.charge.utils import LabelPriorityQueue from evrouting.charge.factories import ( @@ -24,7 +24,7 @@ from evrouting.charge.factories import ( def shortest_path(G: nx.DiGraph, charging_stations: Set[Node], s: Node, t: Node, - initial_soc: SoC, final_soc: SoC, capacity: SoC) -> Result: + initial_soc: SoC, final_soc: SoC, capacity: SoC, c=consumption) -> Result: """ Calculates shortest path using the CHarge algorithm. @@ -40,7 +40,14 @@ def shortest_path(G: nx.DiGraph, charging_stations: Set[Node], s: Node, t: Node, :return: """ t, factories, queues = _setup( - G, charging_stations, capacity, initial_soc, final_soc, s, t + G=G, + charging_stations=charging_stations, + capacity=capacity, + initial_soc=initial_soc, + final_soc=final_soc, + s=s, + t=t, + c=c ) f_soc_factory: SoCFunctionFactory = factories['f_soc'] @@ -132,7 +139,8 @@ def shortest_path(G: nx.DiGraph, charging_stations: Set[Node], s: Node, t: Node, def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, - initial_soc: SoC, final_soc: SoC, s: Node, t: Node + initial_soc: SoC, final_soc: SoC, s: Node, t: Node, + c: ConsumptionFunction ) -> Tuple[Node, Dict, Dict]: """ Initialises the data structures and graph setup. @@ -159,7 +167,7 @@ def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, # Init factories cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc) f_soc_factory = SoCFunctionFactory(cf_map) - soc_profile_factory = SoCProfileFactory(G, capacity) + soc_profile_factory = SoCProfileFactory(G, capacity, c) # Init maps to manage labels l_set: Dict[int, List[Label]] = {v: [] for v in G} diff --git a/evrouting/osm/imports.py b/evrouting/osm/imports.py index f08247f..9fdea4e 100644 --- a/evrouting/osm/imports.py +++ b/evrouting/osm/imports.py @@ -26,6 +26,8 @@ from evrouting.osm.routing import point, haversine_distance logger = logging.getLogger(__name__) +HAVERSINE_KEY = 'haversine' + class OSMGraph(nx.DiGraph): """ @@ -122,22 +124,24 @@ def read_osm(osm_xml_data, profile) -> OSMGraph: u, v = osm.nodes[u_id], osm.nodes[v_id] # Travel-time from u to v - d = haversine_distance( - u.lon, u.lat, v.lon, v.lat, unit_m=True - ) / speed(w, profile) * ms_to_kmh + d = haversine_distance(u.lon, u.lat, v.lon, v.lat, unit_m=True) # in m + t = d / (speed(w, profile) / ms_to_kmh) # in s if w.tags.get('oneway', 'no') == 'yes': # ONLY ONE DIRECTION G.add_edge(u_id, v_id, **{ - DISTANCE_KEY: d + DISTANCE_KEY: t, + HAVERSINE_KEY: d }) else: # BOTH DIRECTION G.add_edge(u_id, v_id, **{ - DISTANCE_KEY: d + DISTANCE_KEY: t, + HAVERSINE_KEY: d }) G.add_edge(v_id, u_id, **{ - DISTANCE_KEY: d + DISTANCE_KEY: t, + HAVERSINE_KEY: d }) # Complete the used nodes' information diff --git a/tests/osm/test_osm_charge.py b/tests/osm/test_osm_charge.py index 94be86a..dd59817 100644 --- a/tests/osm/test_osm_charge.py +++ b/tests/osm/test_osm_charge.py @@ -1,11 +1,14 @@ import os +import json import pytest -from evrouting.osm.imports import read_osm, OSMGraph +from evrouting import charge +from evrouting.T import Result +from evrouting.osm.imports import read_osm, OSMGraph, HAVERSINE_KEY from evrouting.osm.profiles import car from evrouting.osm.routing import shortest_path -from evrouting.graph_tools import CHARGING_COEFFICIENT_KEY +from evrouting.graph_tools import CHARGING_COEFFICIENT_KEY, DISTANCE_KEY @pytest.fixture @@ -29,8 +32,12 @@ def graph(): @pytest.fixture def map_graph(): - G = read_osm(os.path.join(os.path.dirname(__file__), 'static/map.osm'), - car) + STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') + G = read_osm(os.path.join(STATIC_DIR, 'map.osm'), car) + with open(os.path.join(STATIC_DIR, 'charging_stations.json'), 'r') as f: + charging_stations = json.load(f) + G.insert_charging_stations(charging_stations) + yield G del G @@ -58,6 +65,8 @@ def test_insert_charging_stations_eq(graph): def test_shortest_route(map_graph): s = (51.7769461, 6.9832152) t = (51.7796487, 6.9795230) + _s = map_graph.find_nearest(s) + _t = map_graph.find_nearest(t) route = [ "1827268706", @@ -69,21 +78,48 @@ def test_shortest_route(map_graph): "418009799" ] - assert route == shortest_path(map_graph, s, t, car) + assert route == shortest_path(map_graph, _s, _t, car) -def test_other_shortest_route(map_graph): - s = (51.75344308292687, 6.943187713623048) - t = (51.754452602619935, 6.958980560302735) +def test_shortest_route_dimensions(map_graph): + s = (51.75041438844966, 6.9332313537597665) + t = (51.75657783347559, 7.000350952148438) + _s = map_graph.find_nearest(s) + _t = map_graph.find_nearest(t) - route = [ - "1827268706", - "1826594887", - "4955446046", - "4955446048", - "34053450", - "4955446051", - "418009799" - ] + path = shortest_path(map_graph, _s, _t, car) + + time = sum([map_graph[u][v][DISTANCE_KEY] for u, v in zip(path[:-1], path[1:])]) + distance = sum([map_graph[u][v][HAVERSINE_KEY] for u, v in zip(path[:-1], path[1:])]) + + assert time / 60 < 10 + assert time / 60 > 5 + assert distance / 1000 < 6 + assert distance / 1000 > 4 + + +def test_charge_shortest_route_dimensions(map_graph): + s = (51.75041438844966, 6.9332313537597665) + t = (51.75657783347559, 7.000350952148438) + _s = map_graph.find_nearest(s) + _t = map_graph.find_nearest(t) + + consumption = 1 # kWh/km + cost_path = 6 * consumption * 1000 # distance * consumption in Wh + + def c(G, u, v): + """Returns consumption in Wh from u to v.""" + return G[u][v][HAVERSINE_KEY] * consumption * 1000 + + result = charge.routing.shortest_path( + G=map_graph, + charging_stations=map_graph.charging_stations, + s=_s, + t=_t, + initial_soc=10000, # > cost_path + final_soc=0, + capacity=10000, + c=c + ) - assert route == shortest_path(map_graph, s, t, car) + assert type(result) is Result diff --git a/tests/osm/test_routing.py b/tests/osm/test_routing.py new file mode 100644 index 0000000..e69de29 -- GitLab