Skip to content
Snippets Groups Projects
test_osm_charge.py 4.61 KiB
Newer Older
markn92's avatar
markn92 committed
import os
markn92's avatar
markn92 committed
import json
markn92's avatar
markn92 committed

markn92's avatar
markn92 committed
import pytest
markn92's avatar
markn92 committed

markn92's avatar
markn92 committed
from evrouting import charge
from evrouting.T import Result
from evrouting.osm.imports import read_osm, OSMGraph, HAVERSINE_KEY
markn92's avatar
markn92 committed
from evrouting.osm.profiles import car
markn92's avatar
markn92 committed
from evrouting.osm.routing import shortest_path
markn92's avatar
markn92 committed
from evrouting.graph_tools import CHARGING_COEFFICIENT_KEY, DISTANCE_KEY, CONSUMPTION_KEY
markn92's avatar
markn92 committed


markn92's avatar
markn92 committed
@pytest.fixture
def graph():
markn92's avatar
markn92 committed
    G = OSMGraph()
markn92's avatar
markn92 committed

    node_coordinates = [
        (51.7705832, 7.0002595),
        (51.7696529, 6.9568520)
    ]

    for n_id, coordinates in enumerate(node_coordinates):
        lat, lon = coordinates
        # Add two nodes, that exist in osm test map
        G.add_node(n_id, lat=lat, lon=lon)
markn92's avatar
markn92 committed
        G.insert_into_rtree(n_id)
markn92's avatar
markn92 committed
@pytest.fixture
def map_graph():
markn92's avatar
markn92 committed
    STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
    G = read_osm(os.path.join(STATIC_DIR, 'map.osm'), car)
    with open(os.path.join(STATIC_DIR, 'charging_stations.json'), 'r') as f:
        charging_stations = json.load(f)
    G.insert_charging_stations(charging_stations)

markn92's avatar
markn92 committed
    yield G
    del G

markn92's avatar
markn92 committed

markn92's avatar
markn92 committed
def test_insert_charging_stations_close(graph):
markn92's avatar
markn92 committed
    # Close two node 1
    S = [{"lon": 7.0002593, "lat": 51.7705832, "power": 22.0}]

markn92's avatar
markn92 committed
    graph.insert_charging_stations(S)
markn92's avatar
markn92 committed

markn92's avatar
markn92 committed
    assert graph.nodes[0][CHARGING_COEFFICIENT_KEY] == 22.0
    assert CHARGING_COEFFICIENT_KEY not in graph.nodes[1]
markn92's avatar
markn92 committed


markn92's avatar
markn92 committed
def test_insert_charging_stations_eq(graph):
markn92's avatar
markn92 committed
    # Close exactly at node 1
    S = [{"lon": 7.0002595, "lat": 51.7705832, "power": 22.0}]

markn92's avatar
markn92 committed
    graph.insert_charging_stations(S)
markn92's avatar
markn92 committed

markn92's avatar
markn92 committed
    assert graph.nodes[0][CHARGING_COEFFICIENT_KEY] == 22.0
    assert CHARGING_COEFFICIENT_KEY not in graph.nodes[1]
markn92's avatar
markn92 committed


def test_shortest_route(map_graph):
    s = (51.7769461, 6.9832152)
    t = (51.7796487, 6.9795230)
markn92's avatar
markn92 committed
    _s = map_graph.find_nearest(s)
    _t = map_graph.find_nearest(t)
markn92's avatar
markn92 committed

    route = [
        "1827268706",
        "1826594887",
        "4955446046",
        "4955446048",
        "34053450",
        "4955446051",
        "418009799"
    ]

markn92's avatar
markn92 committed
    assert route == shortest_path(map_graph, _s, _t, car)
markn92's avatar
markn92 committed


markn92's avatar
markn92 committed
def test_shortest_route_dimensions(map_graph):
    s = (51.75041438844966, 6.9332313537597665)
    t = (51.75657783347559, 7.000350952148438)
    _s = map_graph.find_nearest(s)
    _t = map_graph.find_nearest(t)
markn92's avatar
markn92 committed

markn92's avatar
markn92 committed
    path = shortest_path(map_graph, _s, _t, car)

    time = sum([map_graph[u][v][DISTANCE_KEY] for u, v in zip(path[:-1], path[1:])])
    distance = sum([map_graph[u][v][HAVERSINE_KEY] for u, v in zip(path[:-1], path[1:])])

    assert time / 60 < 10
    assert time / 60 > 5
    assert distance / 1000 < 6
    assert distance / 1000 > 4


def test_charge_shortest_route_dimensions(map_graph):
markn92's avatar
markn92 committed
    """Full tank is enough to get there. Must be equal to shortest path."""
markn92's avatar
markn92 committed
    s = (51.75041438844966, 6.9332313537597665)
    t = (51.75657783347559, 7.000350952148438)
    _s = map_graph.find_nearest(s)
    _t = map_graph.find_nearest(t)

    consumption = 1  # kWh/km
    cost_path = 6 * consumption * 1000  # distance * consumption in Wh

    def c(G, u, v):
        """Returns consumption in Wh from u to v."""
markn92's avatar
markn92 committed
        try:
            return G[u][v][HAVERSINE_KEY] * consumption
        except KeyError:
            return G[u][v][CONSUMPTION_KEY]
markn92's avatar
markn92 committed

    result = charge.routing.shortest_path(
        G=map_graph,
        charging_stations=map_graph.charging_stations,
        s=_s,
        t=_t,
        initial_soc=10000,  # > cost_path
        final_soc=0,
        capacity=10000,
        c=c
    )
markn92's avatar
markn92 committed

markn92's avatar
markn92 committed
    path = shortest_path(map_graph, _s, _t, car)

    assert type(result) is Result
    assert [n for n, t in result.charge_path] == path


def test_charge_shortest_route_stop(map_graph):
    """Full tank is enough to get there. Must be equal to shortest path."""
    s = (51.75041438844966, 6.9332313537597665)
    t = (51.75657783347559, 7.000350952148438)
    _s = map_graph.find_nearest(s)
    _t = map_graph.find_nearest(t)

    consumption = 1  # kWh/km
    cost_path = 6 * consumption * 1000  # distance * consumption in Wh

    def c(G, u, v):
        """Returns consumption in Wh from u to v."""
        try:
            return G[u][v][HAVERSINE_KEY] * consumption
        except KeyError:
            return G[u][v][CONSUMPTION_KEY]

    result = charge.routing.shortest_path(
        G=map_graph,
        charging_stations=map_graph.charging_stations,
        s=_s,
        t=_t,
        initial_soc=2000,  # > cost_path
        final_soc=0,
        capacity=10000,
        c=c
    )

markn92's avatar
markn92 committed
    assert type(result) is Result
markn92's avatar
markn92 committed
    charge_at = [t for n, t in result.charge_path if t > 0]
    # charge once
    assert len(charge_at) == 1

    # Charge about 10 min
    assert charge_at[0] < 600
    assert charge_at[0] > 500