Skip to content
Snippets Groups Projects
Commit d5391c9f authored by markn92's avatar markn92
Browse files

contraction

parent 03281d4c
No related branches found
No related tags found
No related merge requests found
from typing import Set, Union, Callable, List
from typing import Set, Callable, List
import networkx as nx
from evrouting.T import Node, SoC
from evrouting.graph_tools import CONSUMPTION_KEY, DISTANCE_KEY
DistFunction = Callable[[nx.Graph, Node, Node], Union[int, float]]
Path = List[Node]
DistFunction = Callable[[nx.Graph, Node, Node], Path]
def shortest_path(G: nx.Graph, s, t, b_0: float, b_t: float, U: float):
......@@ -20,14 +23,40 @@ def shortest_path(G: nx.Graph, s, t, b_0: float, b_t: float, U: float):
pass
def dijkstra(G: nx.Graph, u: Node, v: Node,
weight: str = 'weight') -> Union[int, float]:
length, _ = nx.algorithms.shortest_paths.single_source_dijkstra(
G, u, v, weight=weight
)
return length
def dijkstra(G: nx.Graph, u: Node, v: Node, weight: str = 'weight') -> Path:
return nx.algorithms.shortest_path(G, u, v, weight=weight)
def contract_graph(G: nx.Graph, S: Set[Node], U: SoC,
f_dist: DistFunction = dijkstra) -> nx.Graph:
pass
def fold_path(G: nx.Graph, path: Path, weight: str):
return sum([G.edges[u, v][weight] for u, v in zip(path[:-1], path[1:])])
def contract_graph(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
dist: DistFunction = dijkstra) -> nx.Graph:
"""
:param G: Original graph
:param charging_stations: Charging stations
:param capacity: Maximum battery capacity
:param dist: Minimum distance function, necessary if G is not a fully
connected Graph to calculate consumptions between charging stations.
:returns: Graph only consisting of Charging Stations whose neighbours must
be within the capacity U. If so, their edge has consumption and
distance of the minimum path.
"""
H: nx.Graph = nx.Graph()
for cs in list(charging_stations):
H.add_node(cs, **G.nodes[cs])
# Iterate unvisited charging stations
for n_cs in [n for n in charging_stations if (n, cs) not in H.edges]:
min_path: Path = dist(G, cs, n_cs)
consumption: SoC = fold_path(G, min_path, weight=CONSUMPTION_KEY)
if consumption <= capacity:
H.add_edge(
cs, n_cs,
**{
CONSUMPTION_KEY: consumption,
DISTANCE_KEY:fold_path(G, min_path, weight=DISTANCE_KEY)
}
)
return H
import networkx as nx
import pytest
from evrouting.gasstation.routing import contract_graph, dijkstra
from evrouting.graph_tools import label, CONSUMPTION_KEY
from evrouting.gasstation.routing import contract_graph, dijkstra, fold_path
from evrouting.graph_tools import label, CONSUMPTION_KEY, DISTANCE_KEY
from tests.config import edge_case, get_graph, gasstation, init_config
......@@ -10,23 +10,66 @@ class TestDjikstra:
@pytest.mark.parametrize("u,v,min_len", [(0, 1, 1), (0, 2, 3), (0, 4, 2), (0, 6, 3)])
def test_djikstra(self, u, v, min_len):
conf: dict = init_config(gasstation)
assert dijkstra(conf['G'], u, v, CONSUMPTION_KEY) == min_len
shortest_path = dijkstra(conf['G'], u, v, CONSUMPTION_KEY)
assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == min_len
def test_djikstra_via_node(self):
conf: dict = init_config(edge_case)
shortest_path = dijkstra(conf['G'], 0, 2, CONSUMPTION_KEY)
assert dijkstra(conf['G'], 0, 2, CONSUMPTION_KEY) == 2
assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == 2
class TestContraction:
def label_map(self, G: nx.Graph) -> dict:
return {label(G, n): n for n in G.nodes}
def test_contraction(self):
conf: dict = init_config(gasstation)
G: nx.Graph = conf['G']
H: nx.Graph = contract_graph(G, conf['charging_stations'], conf['U'])
H: nx.Graph = contract_graph(G,
conf['charging_stations'],
conf['capacity'])
# Check available gas stations
nodes = set(label(H, n) for n in H.nodes)
assert nodes == {'s', 'a', 'c'}
assert set(H.nodes) == conf['charging_stations']
@pytest.mark.parametrize('u,v,weight,value', [
('s', 'a', CONSUMPTION_KEY, 1),
('s', 'a', DISTANCE_KEY, 1),
('s', 'f', '', None), # Not exist
('s', 'b', '', None), # Not exist
('s', 'd', '', None), # Not exist
('s', 'c', CONSUMPTION_KEY, 2),
('s', 'c', DISTANCE_KEY, 2),
('s', 'e', '', None)
])
def test_contraction_edges(self, u, v, weight, value):
"""
Test edges and values of its weights.
:param u: Node
:param v: Node
:param weight: Weight key to check.
:param value: Value the edge weight should have.
If None, it tests if the egde does not exist.
"""
conf: dict = init_config(gasstation)
H: nx.Graph = contract_graph(conf['G'],
conf['charging_stations'], conf['capacity'])
label_map = self.label_map(H)
try:
edge = (label_map[u], label_map[v])
except KeyError:
# One of the edges is no an charging station
if value is None:
return True
raise
if value is None:
assert edge not in H.edges
else:
assert H.edges[edge][weight] == value
class TestRouting:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment