Skip to content
Snippets Groups Projects
Commit 02cdd26f authored by markn92's avatar markn92
Browse files

better dependecie injection

parent 2cef1292
No related branches found
No related tags found
No related merge requests found
......@@ -2,16 +2,13 @@ from typing import Set, List
import networkx as nx
from evrouting.T import Node, SoC, Result, EmptyResult, Time
from evrouting.gasstation.T import State
from evrouting.gasstation.T import State, DistFunction
from evrouting.graph_tools import (
CONSUMPTION_KEY,
DISTANCE_KEY,
consumption,
distance,
AccessFunctions
)
from evrouting.graph_tools import sum_weights as fold_path
def insert_start_node(s: Node,
graph_core: nx.Graph,
......@@ -20,27 +17,26 @@ def insert_start_node(s: Node,
graph_extended: nx.DiGraph,
capacity: SoC,
initial_soc: SoC,
c: float = 1.,
charging_coefficient=charging_cofficient
f: AccessFunctions = AccessFunctions()
) -> nx.DiGraph:
"""Insert s into extended graph an create states and edges as necessary."""
graph_extended.add_node((s, initial_soc))
v: Node
for v in gas_stations:
shortest_p: List[Node] = nx.shortest_path(graph_core, s, v, weight=DISTANCE_KEY)
d = fold_path(graph_core, shortest_p, weight=DISTANCE_KEY)
w = c * d
shortest_p: List[Node] = nx.shortest_path(graph_core, s, v, weight=CONSUMPTION_KEY)
w = f.path_consumption(graph_core, shortest_p)
if w > initial_soc:
continue
c_v = charging_cofficient(graph_core, v)
d = f.path_distance(graph_core, shortest_p)
c_v = f.charging_coefficient(graph_core, v)
g = initial_soc - w
graph_extended.add_edge((s, initial_soc), (v, g), weight=d)
for u in graph_contracted.neighbors(v):
c_u = charging_cofficient(graph_contracted, u)
w_v_u = consumption(graph_contracted, u, v)
d_v_u = distance(graph_contracted, u, v)
c_u = f.charging_coefficient(graph_contracted, u)
w_v_u = f.consumption(graph_contracted, u, v)
d_v_u = f.distance(graph_contracted, u, v)
if c_v < c_u:
graph_extended.add_edge(
(v, g),
......@@ -63,19 +59,19 @@ def insert_final_node(t: Node,
graph_extended: nx.DiGraph,
capacity: SoC,
final_soc: SoC,
c: float = 1.
charging_cofficient=charging_cofficient
f: AccessFunctions = AccessFunctions()
) -> nx.DiGraph:
"""Insert terminal node into extended graph an create states and edges as necessary."""
graph_extended.add_node((t, final_soc))
u: Node
for u in gas_stations:
shortest_p: List[Node] = nx.shortest_path(graph_core, t, u, weight=DISTANCE_KEY)
d_u_t = fold_path(graph_core, shortest_p, weight=DISTANCE_KEY)
w = c * d_u_t
shortest_p: List[Node] = nx.shortest_path(graph_core, t, u, weight=CONSUMPTION_KEY)
w = f.path_consumption(graph_core, shortest_p)
if w + final_soc > capacity:
continue
c_u = charging_cofficient(graph_core, u)
d_u_t = f.path_distance(graph_core, shortest_p)
c_u = f.charging_coefficient(graph_core, u)
for g in [g for n, g in graph_extended.nodes if n == u]:
if g > w + final_soc:
continue
......@@ -89,7 +85,7 @@ def insert_final_node(t: Node,
def contract_graph(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
c: float = 1.) -> nx.Graph:
f: AccessFunctions = AccessFunctions()) -> nx.Graph:
"""
:param G: Original graph
:param charging_stations: Charging stations
......@@ -110,14 +106,14 @@ def contract_graph(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
H.add_node(cs, **G.nodes[cs])
# Iterate unvisited charging stations
for n_cs in all_cs[i + 1:]:
t_min = nx.algorithms.shortest_path_length(G, cs, n_cs, weight=DISTANCE_KEY)
w_cs_n: SoC = c * t_min
path = nx.algorithms.shortest_path(G, cs, n_cs, weight=DISTANCE_KEY)
w_cs_n: SoC = f.path_consumption(G, path)
if w_cs_n <= capacity:
H.add_edge(
cs, n_cs,
**{
CONSUMPTION_KEY: w_cs_n,
DISTANCE_KEY: t_min
DISTANCE_KEY: f.path_distance(G, path)
}
)
H.add_node(all_cs[-1], **G.nodes[all_cs[-1]])
......@@ -125,39 +121,39 @@ def contract_graph(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
return H
def get_possible_arriving_soc(G: nx.Graph, u: Node, capacity: SoC) -> List[SoC]:
def get_possible_arriving_soc(G: nx.Graph, u: Node, capacity: SoC, f: AccessFunctions = AccessFunctions()) -> List[SoC]:
"""
:returns: All possible SoC when arriving at node u, according to
the optimal fuelling strategy.
"""
possible_arriving_soc: Set[SoC] = {0}
c_u = charging_cofficient(G, u)
c_u = f.charging_coefficient(G, u)
for n in G.neighbors(u):
arriving_soc = capacity - consumption(G, u, n)
if arriving_soc > 0 and charging_cofficient(G, n) < c_u and \
arriving_soc = capacity - f.consumption(G, u, n)
if arriving_soc > 0 and f.charging_coefficient(G, n) < c_u and \
arriving_soc not in possible_arriving_soc:
possible_arriving_soc.add(arriving_soc)
return list(possible_arriving_soc)
def state_graph(G: nx.Graph, capacity: SoC) -> nx.DiGraph:
def state_graph(G: nx.Graph, capacity: SoC, f: AccessFunctions = AccessFunctions()) -> nx.DiGraph:
"""Calculate Graph connecting (Node, Arrival SoC) states."""
H: nx.DiGraph = nx.DiGraph()
for u in G.nodes:
c_u = charging_cofficient(G, u)
c_u = f.charging_coefficient(G, u)
for v in G.neighbors(u):
w = consumption(G, u, v)
w = f.consumption(G, u, v)
if w <= capacity:
for g in get_possible_arriving_soc(G, u, capacity):
c_v = charging_cofficient(G, v)
c_v = f.charging_coefficient(G, v)
if c_v <= c_u and g < w:
weight = (w - g) * c_u + distance(G, u, v)
weight = (w - g) * c_u + f.distance(G, u, v)
H.add_edge((u, g), (v, 0), weight=weight)
elif c_v > c_u:
weight = (capacity - g) * c_u + distance(G, u, v)
weight = (capacity - g) * c_u + f.distance(G, u, v)
H.add_edge((u, g), (v, capacity - w), weight=weight)
return H
......@@ -176,9 +172,10 @@ def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph,
v, g_v = path[i + 1]
t: Time = extended_graph.edges[(u, g_u), (v, g_v)]['weight']
trip_time += t
charge_time_u: Time = t - fold_path(
charge_time_u: Time = t - nx.shortest_path_length(
graph_core,
nx.shortest_path(graph_core, u, v, weight=DISTANCE_KEY),
u,
v,
weight=DISTANCE_KEY
)
charge_path.append((u, charge_time_u))
......@@ -188,17 +185,8 @@ def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph,
return Result(trip_time=trip_time, charge_path=charge_path)
def shortest_path(G: nx.Graph,
charging_stations: Set[Node],
s: Node,
t: Node,
initial_soc: SoC,
final_soc: SoC,
capacity: SoC,
c: float,
extended_graph=None,
contracted_graph=None
) -> Result:
def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
initial_soc: SoC, final_soc: SoC, capacity: SoC, f: AccessFunctions = AccessFunctions()) -> Result:
"""
Calculates shortest path using a generalized gas station algorithm.
......@@ -213,21 +201,19 @@ def shortest_path(G: nx.Graph,
"""
# Check if t is reachable from s
try:
_path = nx.shortest_path(G, s, t, weight=DISTANCE_KEY)
_path = nx.shortest_path(G, s, t, weight=CONSUMPTION_KEY)
except nx.NetworkXNoPath:
return EmptyResult()
_t = fold_path(G, _path, weight=DISTANCE_KEY)
_w = c * _t
_w = f.path_consumption(G, _path)
if _w <= initial_soc:
return Result(
trip_time=_t,
charge_path=[(n, 0) for n in _path]
trip_time=f.path_distance(G, _path),
charge_path=[(s, 0), (t, 0)]
)
if extended_graph is None or contracted_graph is None:
contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity, c=c)
extended_graph = state_graph(contracted_graph, capacity)
contracted_graph: nx.Graph = contract_graph(G, charging_stations, capacity)
extended_graph = state_graph(contracted_graph, capacity)
extended_graph = insert_start_node(
s=s,
......@@ -236,8 +222,7 @@ def shortest_path(G: nx.Graph,
gas_stations=charging_stations,
graph_extended=extended_graph,
capacity=capacity,
initial_soc=initial_soc,
c=c
initial_soc=initial_soc
)
extended_graph = insert_final_node(
......@@ -246,8 +231,7 @@ def shortest_path(G: nx.Graph,
gas_stations=charging_stations,
graph_extended=extended_graph,
capacity=capacity,
final_soc=final_soc,
c=c
final_soc=final_soc
)
try:
......
......@@ -82,7 +82,7 @@ class AccessFunctions:
self.charging_coefficient = charging_coefficient
def path_distance(self, G, path):
sum_weights(G, path, weight=DISTANCE_KEY)
return sum_weights(G, path, weight=DISTANCE_KEY)
def path_consumption(self, G, path):
sum_weights(G, path, weight=CONSUMPTION_KEY)
return sum_weights(G, path, weight=CONSUMPTION_KEY)
......@@ -35,14 +35,12 @@ class TestContraction:
@pytest.mark.parametrize('u,v,weight,value', [
('s', 'a', CONSUMPTION_KEY, 1),
('s', 'a', DISTANCE_KEY, 1),
('s', 'f', CONSUMPTION_KEY, 1), # Not exist
('s', 'f', DISTANCE_KEY, 1), # Not exist
('s', 'f', '', None), # Not exist
('s', 'b', '', None), # Not exist
('s', 'd', '', None), # Not exist
('s', 'c', CONSUMPTION_KEY, 2),
('s', 'c', DISTANCE_KEY, 2),
('s', 'e', CONSUMPTION_KEY, 2),
('s', 'e', DISTANCE_KEY, 2)
('s', 'e', '', None),
])
def test_contraction_edges(self, u, v, weight, value):
"""
......@@ -56,7 +54,7 @@ class TestContraction:
conf: dict = init_config(gasstation)
H: nx.Graph = contract_graph(conf['G'],
conf['charging_stations'],
conf['capacity'], c=1)
conf['capacity'])
label_map = self.label_map(H)
try:
......@@ -242,8 +240,7 @@ class Integration:
return contract_graph(
G=graph_config['G'],
charging_stations=graph_config['charging_stations'],
capacity=graph_config['capacity'],
c=.5
capacity=graph_config['capacity']
)
@pytest.fixture
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment