Skip to content
Snippets Groups Projects
Commit 1c096115 authored by markn92's avatar markn92
Browse files

test setup

parent 6a05ba1a
No related branches found
No related tags found
No related merge requests found
import networkx as nx
from dataclasses import dataclass
from typing import Tuple, Union, NewType, Dict, Any, List
from typing import Tuple, Union, NewType, Dict, Any, List, Callable
Node = int
Edge = Tuple[Node, Node]
......@@ -14,6 +15,8 @@ ChargingCoefficient = Union[float, int, None]
Time = Union[float, int]
ConsumptionFunction = Callable[[nx.Graph, Node, Node], float]
@dataclass
class Result:
......
......@@ -2,20 +2,26 @@ from typing import Dict
import networkx as nx
from evrouting.T import Node, SoC
from evrouting.graph_tools import charging_cofficient, consumption
from evrouting.T import Node, SoC, ConsumptionFunction
from evrouting.graph_tools import charging_cofficient
from evrouting.charge.T import SoCProfile, SoCFunction, ChargingFunction, Label
class SoCProfileFactory:
"""Maps Nodes to their (cached) charging functions."""
def __init__(self, G: nx.Graph, capacity: SoC):
def __init__(self, G: nx.Graph, capacity: SoC, c: ConsumptionFunction):
"""
:param G:
:param capacity:
:param c: Function to calc consumption for an edge.
"""
self.G: nx.Graph = G
self.capacity: SoC = capacity
self.c = c
def __call__(self, u: Node, v: Node = None) -> SoCProfile:
path_cost = 0 if v is None else consumption(self.G, u, v)
path_cost = 0 if v is None else self.c(self.G, u, v)
return SoCProfile(path_cost, self.capacity)
......
......@@ -11,9 +11,9 @@ from typing import Dict, List, Tuple, Set
from math import inf
import networkx as nx
from evrouting.T import Node, SoC, Time, Result, EmptyResult
from evrouting.T import Node, SoC, Time, Result, EmptyResult, ConsumptionFunction
from evrouting.utils import PriorityQueue
from evrouting.graph_tools import distance
from evrouting.graph_tools import distance, consumption
from evrouting.charge.T import SoCFunction, Label
from evrouting.charge.utils import LabelPriorityQueue
from evrouting.charge.factories import (
......@@ -24,7 +24,7 @@ from evrouting.charge.factories import (
def shortest_path(G: nx.DiGraph, charging_stations: Set[Node], s: Node, t: Node,
initial_soc: SoC, final_soc: SoC, capacity: SoC) -> Result:
initial_soc: SoC, final_soc: SoC, capacity: SoC, c=consumption) -> Result:
"""
Calculates shortest path using the CHarge algorithm.
......@@ -40,7 +40,14 @@ def shortest_path(G: nx.DiGraph, charging_stations: Set[Node], s: Node, t: Node,
:return:
"""
t, factories, queues = _setup(
G, charging_stations, capacity, initial_soc, final_soc, s, t
G=G,
charging_stations=charging_stations,
capacity=capacity,
initial_soc=initial_soc,
final_soc=final_soc,
s=s,
t=t,
c=c
)
f_soc_factory: SoCFunctionFactory = factories['f_soc']
......@@ -132,7 +139,8 @@ def shortest_path(G: nx.DiGraph, charging_stations: Set[Node], s: Node, t: Node,
def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
initial_soc: SoC, final_soc: SoC, s: Node, t: Node
initial_soc: SoC, final_soc: SoC, s: Node, t: Node,
c: ConsumptionFunction
) -> Tuple[Node, Dict, Dict]:
"""
Initialises the data structures and graph setup.
......@@ -159,7 +167,7 @@ def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
# Init factories
cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
f_soc_factory = SoCFunctionFactory(cf_map)
soc_profile_factory = SoCProfileFactory(G, capacity)
soc_profile_factory = SoCProfileFactory(G, capacity, c)
# Init maps to manage labels
l_set: Dict[int, List[Label]] = {v: [] for v in G}
......
......@@ -26,6 +26,8 @@ from evrouting.osm.routing import point, haversine_distance
logger = logging.getLogger(__name__)
HAVERSINE_KEY = 'haversine'
class OSMGraph(nx.DiGraph):
"""
......@@ -122,22 +124,24 @@ def read_osm(osm_xml_data, profile) -> OSMGraph:
u, v = osm.nodes[u_id], osm.nodes[v_id]
# Travel-time from u to v
d = haversine_distance(
u.lon, u.lat, v.lon, v.lat, unit_m=True
) / speed(w, profile) * ms_to_kmh
d = haversine_distance(u.lon, u.lat, v.lon, v.lat, unit_m=True) # in m
t = d / (speed(w, profile) / ms_to_kmh) # in s
if w.tags.get('oneway', 'no') == 'yes':
# ONLY ONE DIRECTION
G.add_edge(u_id, v_id, **{
DISTANCE_KEY: d
DISTANCE_KEY: t,
HAVERSINE_KEY: d
})
else:
# BOTH DIRECTION
G.add_edge(u_id, v_id, **{
DISTANCE_KEY: d
DISTANCE_KEY: t,
HAVERSINE_KEY: d
})
G.add_edge(v_id, u_id, **{
DISTANCE_KEY: d
DISTANCE_KEY: t,
HAVERSINE_KEY: d
})
# Complete the used nodes' information
......
import os
import json
import pytest
from evrouting.osm.imports import read_osm, OSMGraph
from evrouting import charge
from evrouting.T import Result
from evrouting.osm.imports import read_osm, OSMGraph, HAVERSINE_KEY
from evrouting.osm.profiles import car
from evrouting.osm.routing import shortest_path
from evrouting.graph_tools import CHARGING_COEFFICIENT_KEY
from evrouting.graph_tools import CHARGING_COEFFICIENT_KEY, DISTANCE_KEY
@pytest.fixture
......@@ -29,8 +32,12 @@ def graph():
@pytest.fixture
def map_graph():
G = read_osm(os.path.join(os.path.dirname(__file__), 'static/map.osm'),
car)
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
G = read_osm(os.path.join(STATIC_DIR, 'map.osm'), car)
with open(os.path.join(STATIC_DIR, 'charging_stations.json'), 'r') as f:
charging_stations = json.load(f)
G.insert_charging_stations(charging_stations)
yield G
del G
......@@ -58,6 +65,8 @@ def test_insert_charging_stations_eq(graph):
def test_shortest_route(map_graph):
s = (51.7769461, 6.9832152)
t = (51.7796487, 6.9795230)
_s = map_graph.find_nearest(s)
_t = map_graph.find_nearest(t)
route = [
"1827268706",
......@@ -69,21 +78,48 @@ def test_shortest_route(map_graph):
"418009799"
]
assert route == shortest_path(map_graph, s, t, car)
assert route == shortest_path(map_graph, _s, _t, car)
def test_other_shortest_route(map_graph):
s = (51.75344308292687, 6.943187713623048)
t = (51.754452602619935, 6.958980560302735)
def test_shortest_route_dimensions(map_graph):
s = (51.75041438844966, 6.9332313537597665)
t = (51.75657783347559, 7.000350952148438)
_s = map_graph.find_nearest(s)
_t = map_graph.find_nearest(t)
route = [
"1827268706",
"1826594887",
"4955446046",
"4955446048",
"34053450",
"4955446051",
"418009799"
]
path = shortest_path(map_graph, _s, _t, car)
time = sum([map_graph[u][v][DISTANCE_KEY] for u, v in zip(path[:-1], path[1:])])
distance = sum([map_graph[u][v][HAVERSINE_KEY] for u, v in zip(path[:-1], path[1:])])
assert time / 60 < 10
assert time / 60 > 5
assert distance / 1000 < 6
assert distance / 1000 > 4
def test_charge_shortest_route_dimensions(map_graph):
s = (51.75041438844966, 6.9332313537597665)
t = (51.75657783347559, 7.000350952148438)
_s = map_graph.find_nearest(s)
_t = map_graph.find_nearest(t)
consumption = 1 # kWh/km
cost_path = 6 * consumption * 1000 # distance * consumption in Wh
def c(G, u, v):
"""Returns consumption in Wh from u to v."""
return G[u][v][HAVERSINE_KEY] * consumption * 1000
result = charge.routing.shortest_path(
G=map_graph,
charging_stations=map_graph.charging_stations,
s=_s,
t=_t,
initial_soc=10000, # > cost_path
final_soc=0,
capacity=10000,
c=c
)
assert route == shortest_path(map_graph, s, t, car)
assert type(result) is Result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment