Skip to content
Snippets Groups Projects
Commit ac34daa9 authored by markn92's avatar markn92
Browse files

fixing things, finetuning missing

parent 02cdd26f
No related branches found
No related tags found
No related merge requests found
......@@ -2,7 +2,7 @@ from typing import Set, List
import networkx as nx
from evrouting.T import Node, SoC, Result, EmptyResult, Time
from evrouting.gasstation.T import State, DistFunction
from evrouting.gasstation.T import State
from evrouting.graph_tools import (
CONSUMPTION_KEY,
DISTANCE_KEY,
......@@ -23,7 +23,7 @@ def insert_start_node(s: Node,
graph_extended.add_node((s, initial_soc))
v: Node
for v in gas_stations:
shortest_p: List[Node] = nx.shortest_path(graph_core, s, v, weight=CONSUMPTION_KEY)
shortest_p: List[Node] = nx.shortest_path(graph_core, s, v, weight=DISTANCE_KEY)
w = f.path_consumption(graph_core, shortest_p)
if w > initial_soc:
continue
......@@ -65,7 +65,7 @@ def insert_final_node(t: Node,
graph_extended.add_node((t, final_soc))
u: Node
for u in gas_stations:
shortest_p: List[Node] = nx.shortest_path(graph_core, t, u, weight=CONSUMPTION_KEY)
shortest_p: List[Node] = nx.shortest_path(graph_core, t, u, weight=DISTANCE_KEY)
w = f.path_consumption(graph_core, shortest_p)
if w + final_soc > capacity:
continue
......@@ -185,8 +185,17 @@ def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph,
return Result(trip_time=trip_time, charge_path=charge_path)
def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
initial_soc: SoC, final_soc: SoC, capacity: SoC, f: AccessFunctions = AccessFunctions()) -> Result:
def shortest_path(G: nx.Graph,
charging_stations: Set[Node],
s: Node,
t: Node,
initial_soc: SoC,
final_soc: SoC,
capacity: SoC,
f: AccessFunctions = AccessFunctions(),
extended_graph=None,
contracted_graph=None
) -> Result:
"""
Calculates shortest path using a generalized gas station algorithm.
......@@ -201,7 +210,7 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
"""
# Check if t is reachable from s
try:
_path = nx.shortest_path(G, s, t, weight=CONSUMPTION_KEY)
_path = nx.shortest_path(G, s, t, weight=DISTANCE_KEY)
except nx.NetworkXNoPath:
return EmptyResult()
......@@ -209,11 +218,11 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
if _w <= initial_soc:
return Result(
trip_time=f.path_distance(G, _path),
charge_path=[(s, 0), (t, 0)]
charge_path=[(n, 0) for n in _path]
)
contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity)
extended_graph = state_graph(contracted_graph, capacity)
contracted_graph: nx.Graph = contracted_graph or contract_graph(G, charging_stations, capacity, f)
extended_graph = extended_graph or state_graph(contracted_graph, capacity, f)
extended_graph = insert_start_node(
s=s,
......@@ -222,7 +231,8 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
gas_stations=charging_stations,
graph_extended=extended_graph,
capacity=capacity,
initial_soc=initial_soc
initial_soc=initial_soc,
f=f
)
extended_graph = insert_final_node(
......@@ -231,7 +241,8 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
gas_stations=charging_stations,
graph_extended=extended_graph,
capacity=capacity,
final_soc=final_soc
final_soc=final_soc,
f=f
)
try:
......
......@@ -72,17 +72,33 @@ def consumption_function_distance_factory(consumption: float) -> ConsumptionFunc
class AccessFunctions:
"""
Class for dependency injections to access path
attributes in flexible ways.
"""
def __init__(self,
distance=distance,
consumption=consumption,
charging_coefficient=charging_cofficient
):
self.distance = distance
self.consumption = consumption
self.charging_coefficient = charging_coefficient
self._distance = distance
self._consumption = consumption
self._charging_coefficient = charging_coefficient
def distance(self, G, u, v):
return self._distance(G, u, v)
def consumption(self, G, u, v):
return self._consumption(G, u, v)
def charging_coefficient(self, G, v):
return self._charging_coefficient(G, v)
def path_distance(self, G, path):
""":return: Distance of path."""
return sum_weights(G, path, weight=DISTANCE_KEY)
def path_consumption(self, G, path):
""":returns: consumption of given path."""
return sum_weights(G, path, weight=CONSUMPTION_KEY)
......@@ -5,7 +5,7 @@ import networkx as nx
from evrouting.T import Result, EmptyResult
from evrouting.graph_tools import (
DISTANCE_KEY, sum_weights
DISTANCE_KEY, sum_weights, AccessFunctions
)
from evrouting.osm.const import ms_to_kmh
......@@ -76,4 +76,19 @@ def to_coordinates(G, path):
def get_coordinates(G, n):
lat = G.nodes[n]['lat']
lon = G.nodes[n]['lon']
return lon, lat
\ No newline at end of file
return lon, lat
class GasstationAccessFunctions(AccessFunctions):
def __init__(self, consumption_coefficient):
super().__init__()
self.c = consumption_coefficient
def consumption(self, G, u, v):
return self.c * self._distance(G, u, v)
def charging_coefficient(self, G, v):
return 1 / self._charging_coefficient(G, v)
def path_consumption(self, G, path):
return self.c * self.path_distance(G, path)
......@@ -3,8 +3,7 @@ import networkx as nx
from evrouting import gasstation
from evrouting.T import Result
from evrouting.osm.profiles import car
from evrouting.osm.routing import shortest_path
from evrouting.graph_tools import charging_cofficient
from evrouting.osm.routing import shortest_path, GasstationAccessFunctions
def test_charge_shortest_route_dimensions(map_graph):
......@@ -17,6 +16,9 @@ def test_charge_shortest_route_dimensions(map_graph):
consumption = 0.5 # kWh/s
# Traveltime gonna be less than 10 min = 600 sek = 300 kWh.
# Therefore initial soc of 300 000
f = GasstationAccessFunctions(consumption)
result = gasstation.shortest_path(G=map_graph,
charging_stations=map_graph.charging_stations,
s=_s,
......@@ -24,7 +26,7 @@ def test_charge_shortest_route_dimensions(map_graph):
initial_soc=300000,
final_soc=0,
capacity=300000,
c=consumption * 1000,
f=f,
extended_graph=None,
contracted_graph=None
)
......@@ -42,7 +44,9 @@ def test_charge_shortest_route_stop(map_graph):
_s = map_graph.find_nearest(s)
_t = map_graph.find_nearest(t)
consumption = 0.5 * 1000 # Wh/s
consumption = 0.5 * 1000 # Wh/s
f = GasstationAccessFunctions(consumption)
# Traveltime to first charging station is < 5 min = 300 sek := 150 kWh
# => Initial soc of 150 000 is enough to charge but not to reach target.
initial_soc = 150000
......@@ -53,17 +57,17 @@ def test_charge_shortest_route_stop(map_graph):
initial_soc=initial_soc,
final_soc=0,
capacity=300000,
c=consumption,
f=f,
extended_graph=None,
contracted_graph=None
)
assert type(result) is Result
charge_nodes = [(n, t) for n, t in result.charge_path if t >0]
charge_nodes = [(n, t) for n, t in result.charge_path if t > 0]
assert len(charge_nodes) == 1
charge_node, charge_time = charge_nodes[0]
charging_station_c = charging_cofficient(map_graph, charge_node)
charging_station_c = f.charging_coefficient(map_graph, charge_node)
# calc travel time to charge_node an
s_to_c = consumption * nx.shortest_path_length(map_graph, _s, charge_node)
c_to_t = consumption * nx.shortest_path_length(map_graph, charge_node, _t)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment