Skip to content
Snippets Groups Projects
Commit 7b60d0f0 authored by markn92's avatar markn92
Browse files

state graph creation

parent 12d20b1f
No related branches found
No related tags found
No related merge requests found
...@@ -7,6 +7,7 @@ from evrouting.graph_tools import ( ...@@ -7,6 +7,7 @@ from evrouting.graph_tools import (
DISTANCE_KEY, DISTANCE_KEY,
CHARGING_COEFFICIENT_KEY, CHARGING_COEFFICIENT_KEY,
consumption, consumption,
distance,
charging_cofficient charging_cofficient
) )
...@@ -87,17 +88,32 @@ def get_possible_arriving_soc(G: nx.Graph, u: Node, capacity: SoC) -> List[SoC]: ...@@ -87,17 +88,32 @@ def get_possible_arriving_soc(G: nx.Graph, u: Node, capacity: SoC) -> List[SoC]:
for n in G.neighbors(u): for n in G.neighbors(u):
arriving_soc = capacity - consumption(G, u, n) arriving_soc = capacity - consumption(G, u, n)
if arriving_soc >= 0 and charging_cofficient(G, n) < c_u and \ if arriving_soc > 0 and charging_cofficient(G, n) < c_u and \
arriving_soc not in possible_arriving_soc: arriving_soc not in possible_arriving_soc:
possible_arriving_soc.add(arriving_soc) possible_arriving_soc.add(arriving_soc)
return list(possible_arriving_soc) return list(possible_arriving_soc)
def state_graph(G: nx.Graph, capacity: SoC) -> nx.Graph: def state_graph(G: nx.Graph, capacity: SoC) -> nx.DiGraph:
node = None """Calculate Graph connecting (Node, Arrival SoC) states."""
get_possible_arriving_soc(G, node, capacity) H: nx.DiGraph = nx.DiGraph()
pass
for u in G.nodes:
c_u = charging_cofficient(G, u)
for v in G.neighbors(u):
w = consumption(G, u, v)
if w <= capacity:
for g in get_possible_arriving_soc(G, u, capacity):
c_v = charging_cofficient(G, v)
if c_v <= c_u and g < w:
weight = (w - g) * c_u + distance(G, u, v)
H.add_edge((u, g), (v, 0), weight=weight)
elif c_v > c_u:
weight = (capacity - g) * c_u + distance(G, u, v)
H.add_edge((u, g), (v, capacity - w), weight=weight)
return H
def compose_result(G: nx.Graph, path: Path) -> dict: def compose_result(G: nx.Graph, path: Path) -> dict:
......
import networkx as nx
import pytest
from evrouting.gasstation.routing import ( from evrouting.gasstation.routing import (
shortest_path, shortest_path,
contract_graph,
dijkstra,
fold_path,
get_possible_arriving_soc
)
from evrouting.graph_tools import (
label,
CONSUMPTION_KEY,
DISTANCE_KEY,
CHARGING_COEFFICIENT_KEY
) )
from tests.config import edge_case, get_graph, gasstation, init_config from tests.config import edge_case, get_graph
class TestDjikstra:
@pytest.mark.parametrize("u,v,min_len", [(0, 1, 1), (0, 2, 3), (0, 4, 2), (0, 6, 3)])
def test_djikstra(self, u, v, min_len):
conf: dict = init_config(gasstation)
shortest_path = dijkstra(conf['G'], u, v, CONSUMPTION_KEY)
assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == min_len
def test_djikstra_via_node(self):
conf: dict = init_config(edge_case)
shortest_path = dijkstra(conf['G'], 0, 2, CONSUMPTION_KEY)
assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == 2
class TestContraction:
def label_map(self, G: nx.Graph) -> dict:
return {label(G, n): n for n in G.nodes}
def test_contraction(self):
conf: dict = init_config(gasstation)
G: nx.Graph = conf['G']
H: nx.Graph = contract_graph(G,
conf['charging_stations'],
conf['capacity'])
# Check available gas stations
assert set(H.nodes) == conf['charging_stations']
@pytest.mark.parametrize('u,v,weight,value', [
('s', 'a', CONSUMPTION_KEY, 1),
('s', 'a', DISTANCE_KEY, 1),
('s', 'f', '', None), # Not exist
('s', 'b', '', None), # Not exist
('s', 'd', '', None), # Not exist
('s', 'c', CONSUMPTION_KEY, 2),
('s', 'c', DISTANCE_KEY, 2),
('s', 'e', '', None)
])
def test_contraction_edges(self, u, v, weight, value):
"""
Test edges and values of its weights.
:param u: Node
:param v: Node
:param weight: Weight key to check.
:param value: Value the edge weight should have.
If None, it tests if the egde does not exist.
"""
conf: dict = init_config(gasstation)
H: nx.Graph = contract_graph(conf['G'],
conf['charging_stations'], conf['capacity'])
label_map = self.label_map(H)
try:
edge = (label_map[u], label_map[v])
except KeyError:
# One of the edges is no an charging station
if value is None:
return True
raise
if value is None:
assert edge not in H.edges
else:
assert H.edges[edge][weight] == value
class TestPossibleArrivingSoC:
U = 4
w_lower = 3
w_greater = 5
@pytest.fixture
def graph_w_below_U(self):
G = nx.Graph()
G.add_edge(0, 1, **{CONSUMPTION_KEY: self.w_lower, DISTANCE_KEY: 2 * self.w_lower})
yield G
del G
@pytest.fixture
def graph_w_gt_U(self):
G = nx.Graph()
G.add_edge(0, 1, **{CONSUMPTION_KEY: self.w_greater, DISTANCE_KEY: 2 * self.w_greater})
yield G
del G
@pytest.fixture
def graph_w_eq_U(self):
G = nx.Graph()
G.add_edge(0, 1, **{CONSUMPTION_KEY: self.U, DISTANCE_KEY: 2 * self.U})
yield G
del G
def test_unequal_charging_coeff_w_eq_U(self, graph_w_eq_U):
G = graph_w_eq_U
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 2})
gv_lower = get_possible_arriving_soc(G, 0, self.U)
assert gv_lower == [0]
gv_higher = get_possible_arriving_soc(G, 1, self.U)
assert gv_higher == [0]
def test_equal_charging_coeff_w_lt_U(self, graph_w_below_U):
G = graph_w_below_U
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 1})
gv_lower = get_possible_arriving_soc(G, 0, self.U)
assert gv_lower == [0]
gv_higher = get_possible_arriving_soc(G, 1, self.U)
assert gv_higher == [0]
def test_unequal_charging_coeff_w_lt_U(self, graph_w_below_U):
G = graph_w_below_U
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 2})
gv_lower = get_possible_arriving_soc(G, 0, self.U)
assert gv_lower == [0]
gv_higher = get_possible_arriving_soc(G, 1, self.U)
assert gv_higher == [0, self.U - self.w_lower]
def test_equal_charging_coeff_w_eq_U(self, graph_w_eq_U):
G = graph_w_eq_U
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 1})
gv_lower = get_possible_arriving_soc(G, 0, self.U)
assert gv_lower == [0]
gv_higher = get_possible_arriving_soc(G, 1, self.U)
assert gv_higher == [0]
@pytest.mark.parametrize('c_0,c_1', [(1, 2), (1, 1)])
def test_equal_unreachable(self, graph_w_gt_U, c_0, c_1):
G: nx.Graph = graph_w_gt_U
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1})
gv_lower = get_possible_arriving_soc(G, 0, self.U)
assert gv_lower == [0]
gv_higher = get_possible_arriving_soc(G, 1, self.U)
assert gv_higher == [0]
class TestRouting: class TestRouting:
......
import networkx as nx
import pytest
from evrouting.gasstation.routing import (
contract_graph,
dijkstra,
fold_path,
get_possible_arriving_soc,
state_graph
)
from evrouting.graph_tools import (
label,
CONSUMPTION_KEY,
DISTANCE_KEY,
CHARGING_COEFFICIENT_KEY
)
from tests.config import edge_case, gasstation, init_config
class TestDjikstra:
@pytest.mark.parametrize("u,v,min_len", [(0, 1, 1), (0, 2, 3), (0, 4, 2), (0, 6, 3)])
def test_djikstra(self, u, v, min_len):
conf: dict = init_config(gasstation)
shortest_path = dijkstra(conf['G'], u, v, CONSUMPTION_KEY)
assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == min_len
def test_djikstra_via_node(self):
conf: dict = init_config(edge_case)
shortest_path = dijkstra(conf['G'], 0, 2, CONSUMPTION_KEY)
assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == 2
class TestContraction:
def label_map(self, G: nx.Graph) -> dict:
return {label(G, n): n for n in G.nodes}
def test_contraction(self):
conf: dict = init_config(gasstation)
G: nx.Graph = conf['G']
H: nx.Graph = contract_graph(G,
conf['charging_stations'],
conf['capacity'])
# Check available gas stations
assert set(H.nodes) == conf['charging_stations']
@pytest.mark.parametrize('u,v,weight,value', [
('s', 'a', CONSUMPTION_KEY, 1),
('s', 'a', DISTANCE_KEY, 1),
('s', 'f', '', None), # Not exist
('s', 'b', '', None), # Not exist
('s', 'd', '', None), # Not exist
('s', 'c', CONSUMPTION_KEY, 2),
('s', 'c', DISTANCE_KEY, 2),
('s', 'e', '', None)
])
def test_contraction_edges(self, u, v, weight, value):
"""
Test edges and values of its weights.
:param u: Node
:param v: Node
:param weight: Weight key to check.
:param value: Value the edge weight should have.
If None, it tests if the egde does not exist.
"""
conf: dict = init_config(gasstation)
H: nx.Graph = contract_graph(conf['G'],
conf['charging_stations'], conf['capacity'])
label_map = self.label_map(H)
try:
edge = (label_map[u], label_map[v])
except KeyError:
# One of the edges is no an charging station
if value is None:
return True
raise
if value is None:
assert edge not in H.edges
else:
assert H.edges[edge][weight] == value
class MinimalExamples:
U = 4
w_lower = 3
w_greater = 5
@staticmethod
def d(w):
return 2 * w
@pytest.fixture
def graph_w_below_U(self):
G = nx.Graph()
G.add_edge(0, 1, **{CONSUMPTION_KEY: self.w_lower, DISTANCE_KEY: self.d(self.w_lower)})
yield G
del G
@pytest.fixture
def graph_w_gt_U(self):
G = nx.Graph()
G.add_edge(0, 1, **{CONSUMPTION_KEY: self.w_greater, DISTANCE_KEY: self.d(self.w_greater)})
yield G
del G
@pytest.fixture
def graph_w_eq_U(self):
G = nx.Graph()
G.add_edge(0, 1, **{CONSUMPTION_KEY: self.U, DISTANCE_KEY: self.d(self.U)})
yield G
del G
class TestPossibleArrivingSoC(MinimalExamples):
def test_unequal_charging_coeff_w_eq_U(self, graph_w_eq_U):
G = graph_w_eq_U
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 2})
gv_lower = get_possible_arriving_soc(G, 0, self.U)
assert gv_lower == [0]
gv_higher = get_possible_arriving_soc(G, 1, self.U)
assert gv_higher == [0]
def test_equal_charging_coeff_w_lt_U(self, graph_w_below_U):
G = graph_w_below_U
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 1})
gv_lower = get_possible_arriving_soc(G, 0, self.U)
assert gv_lower == [0]
gv_higher = get_possible_arriving_soc(G, 1, self.U)
assert gv_higher == [0]
def test_unequal_charging_coeff_w_lt_U(self, graph_w_below_U):
G = graph_w_below_U
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 2})
gv_lower = get_possible_arriving_soc(G, 0, self.U)
assert gv_lower == [0]
gv_higher = get_possible_arriving_soc(G, 1, self.U)
assert gv_higher == [0, self.U - self.w_lower]
def test_equal_charging_coeff_w_eq_U(self, graph_w_eq_U):
G = graph_w_eq_U
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 1})
gv_lower = get_possible_arriving_soc(G, 0, self.U)
assert gv_lower == [0]
gv_higher = get_possible_arriving_soc(G, 1, self.U)
assert gv_higher == [0]
@pytest.mark.parametrize('c_0,c_1', [(1, 2), (1, 1)])
def test_equal_unreachable(self, graph_w_gt_U, c_0, c_1):
G: nx.Graph = graph_w_gt_U
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1})
gv_lower = get_possible_arriving_soc(G, 0, self.U)
assert gv_lower == [0]
gv_higher = get_possible_arriving_soc(G, 1, self.U)
assert gv_higher == [0]
class TestStateGraph(MinimalExamples):
def test_unequal_charging_coeff_w_eq_U(self, graph_w_eq_U):
G = graph_w_eq_U
c_0 = 1
c_1 = 2
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1})
H: nx.Graph = state_graph(G, self.U)
assert set(H.nodes) == {(0, 0), (1, 0)}
assert len(H.edges) == 2
assert H.edges[(0, 0), (1, 0)]['weight'] == self.U * c_0 + self.d(self.U)
assert H.edges[(1, 0), (0, 0)]['weight'] == self.U * c_1 + self.d(self.U)
def test_unequal_charging_coeff_w_lt_U(self, graph_w_below_U):
G = graph_w_below_U
c_0 = 1
c_1 = 2
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1})
H: nx.Graph = state_graph(G, self.U)
assert set(H.nodes) == {(0, 0), (1, 0), (1, 1)}
assert len(H.edges) == 3
assert H.edges[(0, 0), (1, 1)]['weight'] == self.U * c_0 + self.d(self.w_lower)
assert H.edges[(1, 0), (0, 0)]['weight'] == self.w_lower * c_1 + self.d(self.w_lower)
assert H.edges[(1, 1), (0, 0)]['weight'] == (self.w_lower - 1) * c_1 + self.d(self.w_lower)
def test_equal_charging_coeff_w_lt_U(self, graph_w_below_U):
G = graph_w_below_U
c_0 = 1
c_1 = 1
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1})
H: nx.Graph = state_graph(G, self.U)
assert set(H.nodes) == {(0, 0), (1, 0)}
assert len(H.edges) == 2
assert H.edges[(0, 0), (1, 0)]['weight'] == self.w_lower * c_0 + self.d(self.w_lower)
assert H.edges[(1, 0), (0, 0)]['weight'] == self.w_lower * c_1 + self.d(self.w_lower)
def test_equal_charging_coeff_w_eq_U(self, graph_w_eq_U):
G = graph_w_eq_U
c_0 = 1
c_1 = 1
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1})
H: nx.Graph = state_graph(G, self.U)
assert set(H.nodes) == {(0, 0), (1, 0)}
assert len(H.edges) == 2
assert H.edges[(0, 0), (1, 0)]['weight'] == self.U * c_0 + self.d(self.U)
assert H.edges[(1, 0), (0, 0)]['weight'] == self.U * c_1 + self.d(self.U)
@pytest.mark.parametrize('c_0,c_1', [(1, 2), (1, 1)])
def test_equal_unreachable(self, graph_w_gt_U, c_0, c_1):
G: nx.Graph = graph_w_gt_U
G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0})
G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1})
H: nx.Graph = state_graph(G, self.U)
assert len(H.nodes) == 0
assert len(H.edges) == 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment