Skip to content
Snippets Groups Projects
Commit a7bf0e15 authored by markn92's avatar markn92
Browse files

wip some tests broken, maybe forever.

parent bcbe5ccd
Branches
No related tags found
No related merge requests found
......@@ -2,7 +2,7 @@ from typing import Set, List
import networkx as nx
from evrouting.T import Node, SoC, Result, EmptyResult, Time
from evrouting.gasstation.T import State, DistFunction
from evrouting.gasstation.T import State
from evrouting.graph_tools import (
CONSUMPTION_KEY,
DISTANCE_KEY,
......@@ -20,17 +20,18 @@ def insert_start_node(s: Node,
graph_extended: nx.DiGraph,
capacity: SoC,
initial_soc: SoC,
c: float = 1.
) -> nx.DiGraph:
"""Insert s into extended graph an create states and edges as necessary."""
graph_extended.add_node((s, initial_soc))
v: Node
for v in gas_stations:
shortest_p: List[Node] = nx.shortest_path(graph_core, s, v, weight=CONSUMPTION_KEY)
w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY)
shortest_p: List[Node] = nx.shortest_path(graph_core, s, v, weight=DISTANCE_KEY)
d = fold_path(graph_core, shortest_p, weight=DISTANCE_KEY)
w = c * d
if w > initial_soc:
continue
d = fold_path(graph_core, shortest_p, weight=DISTANCE_KEY)
c_v = charging_cofficient(graph_core, v)
g = initial_soc - w
......@@ -61,17 +62,17 @@ def insert_final_node(t: Node,
graph_extended: nx.DiGraph,
capacity: SoC,
final_soc: SoC,
c: float = 1.
) -> nx.DiGraph:
"""Insert terminal node into extended graph an create states and edges as necessary."""
graph_extended.add_node((t, final_soc))
u: Node
for u in gas_stations:
shortest_p: List[Node] = nx.shortest_path(graph_core, t, u, weight=CONSUMPTION_KEY)
w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY)
shortest_p: List[Node] = nx.shortest_path(graph_core, t, u, weight=DISTANCE_KEY)
d_u_t = fold_path(graph_core, shortest_p, weight=DISTANCE_KEY)
w = c * d_u_t
if w + final_soc > capacity:
continue
d_u_t = fold_path(graph_core, shortest_p, weight=DISTANCE_KEY)
c_u = charging_cofficient(graph_core, u)
for g in [g for n, g in graph_extended.nodes if n == u]:
if g > w + final_soc:
......@@ -86,7 +87,7 @@ def insert_final_node(t: Node,
def contract_graph(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
c=1) -> nx.Graph:
c: float = 1.) -> nx.Graph:
"""
:param G: Original graph
:param charging_stations: Charging stations
......@@ -192,6 +193,7 @@ def shortest_path(G: nx.Graph,
initial_soc: SoC,
final_soc: SoC,
capacity: SoC,
c: float,
extended_graph=None,
contracted_graph=None
) -> Result:
......@@ -209,19 +211,20 @@ def shortest_path(G: nx.Graph,
"""
# Check if t is reachable from s
try:
_path = nx.shortest_path(G, s, t, weight=CONSUMPTION_KEY)
_path = nx.shortest_path(G, s, t, weight=DISTANCE_KEY)
except nx.NetworkXNoPath:
return EmptyResult()
_w = fold_path(G, _path, weight=CONSUMPTION_KEY)
_t = fold_path(G, _path, weight=DISTANCE_KEY)
_w = c * _t
if _w <= initial_soc:
return Result(
trip_time=fold_path(G, _path, weight=DISTANCE_KEY),
charge_path=[(s, 0), (t, 0)]
trip_time=_t,
charge_path=[(n, 0) for n in _path]
)
if extended_graph is None or contracted_graph is None:
contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity)
contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity, c=c)
extended_graph = state_graph(contracted_graph, capacity)
extended_graph = insert_start_node(
......@@ -231,7 +234,8 @@ def shortest_path(G: nx.Graph,
gas_stations=charging_stations,
graph_extended=extended_graph,
capacity=capacity,
initial_soc=initial_soc
initial_soc=initial_soc,
c=c
)
extended_graph = insert_final_node(
......@@ -240,7 +244,8 @@ def shortest_path(G: nx.Graph,
gas_stations=charging_stations,
graph_extended=extended_graph,
capacity=capacity,
final_soc=final_soc
final_soc=final_soc,
c=c
)
try:
......
from collections import namedtuple
from typing import Union
import networkx as nx
from evrouting.T import (
......@@ -47,7 +48,11 @@ def label(G: nx.Graph, u: Node) -> str:
return G.nodes[u][LABEL_KEY]
def sum_weights(G, path, weight) -> float:
def sum_weights(G, path, weight: str) -> float:
"""
:param weight: either key so weights are G[u][v][weight] or
a function f(G, u, v) -> float.
"""
return sum(G[u][v][weight] for u, v in zip(path[:-1], path[1:]))
......@@ -64,15 +69,3 @@ def consumption_function_distance_factory(consumption: float) -> ConsumptionFunc
return G[u][v][CONSUMPTION_KEY]
return c
def consumption_function_time_factory(consumption: float) -> ConsumptionFunction:
"""
:param consumption: in kWh/s
"""
def c(G, u, v):
"""Returns consumption in Wh from u to v."""
return G[u][v][DISTANCE_KEY] * consumption * 1000
return c
......@@ -2,12 +2,6 @@ from evrouting import gasstation
from evrouting.T import Result
from evrouting.osm.profiles import car
from evrouting.osm.routing import shortest_path
from evrouting.graph_tools import (
CHARGING_COEFFICIENT_KEY,
DISTANCE_KEY,
HAVERSINE_KEY,
consumption_function_distance_factory
)
def test_charge_shortest_route_dimensions(map_graph):
......@@ -17,19 +11,19 @@ def test_charge_shortest_route_dimensions(map_graph):
_s = map_graph.find_nearest(s)
_t = map_graph.find_nearest(t)
consumption = 1 # kWh/km
cost_path = 6 * consumption * 1000 # distance * consumption in Wh
c = consumption_function_distance_factory(consumption)
result = charge.routing.shortest_path(
G=map_graph,
consumption = 0.5 # kWh/s
# Traveltime gonna be less than 10 min = 600 sek = 300 kWh.
# Therefore initial soc of 300 000
result = gasstation.shortest_path(G=map_graph,
charging_stations=map_graph.charging_stations,
s=_s,
t=_t,
initial_soc=10000, # > cost_path
initial_soc=300000,
final_soc=0,
capacity=10000,
c=c
capacity=300000,
c=consumption * 1000,
extended_graph=None,
contracted_graph=None
)
path = shortest_path(map_graph, _s, _t, car)
......@@ -45,27 +39,21 @@ def test_charge_shortest_route_stop(map_graph):
_s = map_graph.find_nearest(s)
_t = map_graph.find_nearest(t)
consumption = 1 # kWh/km
cost_path = 6 * consumption * 1000 # distance * consumption in Wh
c = consumption_function_distance_factory(consumption)
result = charge.routing.shortest_path(
G=map_graph,
consumption = 0.5 # kWh/s
# Traveltime gonna be less than 10 min = 600 sek := 300 kWh.
# => initial soc of 300 000
# Traveltime to first charging station is < 5 min = 300 sek := 150 kWh
# => Initial soc of 150 000 is enough to charge but not to reach target.
result = gasstation.shortest_path(G=map_graph,
charging_stations=map_graph.charging_stations,
s=_s,
t=_t,
initial_soc=2000, # > cost_path
initial_soc=150000,
final_soc=0,
capacity=10000,
c=c
capacity=300000,
c=consumption * 1000,
extended_graph=None,
contracted_graph=None
)
assert type(result) is Result
charge_at = [t for n, t in result.charge_path if t > 0]
# charge once
assert len(charge_at) == 1
# Charge about 10 min
assert charge_at[0] < 600
assert charge_at[0] > 500
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment