From 7b60d0f05d5289298c8eddfde1cfaf750453357d Mon Sep 17 00:00:00 2001 From: "niehues.mark@gmail.com" <niehues.mark@gmail.com> Date: Mon, 30 Mar 2020 12:48:36 +0200 Subject: [PATCH] state graph creation --- evrouting/gasstation/routing.py | 26 ++- tests/gasstation/test_gasstation_routing.py | 164 +------------- tests/gasstation/test_transformations.py | 238 ++++++++++++++++++++ 3 files changed, 260 insertions(+), 168 deletions(-) create mode 100644 tests/gasstation/test_transformations.py diff --git a/evrouting/gasstation/routing.py b/evrouting/gasstation/routing.py index 576fd95..2160d93 100644 --- a/evrouting/gasstation/routing.py +++ b/evrouting/gasstation/routing.py @@ -7,6 +7,7 @@ from evrouting.graph_tools import ( DISTANCE_KEY, CHARGING_COEFFICIENT_KEY, consumption, + distance, charging_cofficient ) @@ -87,17 +88,32 @@ def get_possible_arriving_soc(G: nx.Graph, u: Node, capacity: SoC) -> List[SoC]: for n in G.neighbors(u): arriving_soc = capacity - consumption(G, u, n) - if arriving_soc >= 0 and charging_cofficient(G, n) < c_u and \ + if arriving_soc > 0 and charging_cofficient(G, n) < c_u and \ arriving_soc not in possible_arriving_soc: possible_arriving_soc.add(arriving_soc) return list(possible_arriving_soc) -def state_graph(G: nx.Graph, capacity: SoC) -> nx.Graph: - node = None - get_possible_arriving_soc(G, node, capacity) - pass +def state_graph(G: nx.Graph, capacity: SoC) -> nx.DiGraph: + """Calculate Graph connecting (Node, Arrival SoC) states.""" + H: nx.DiGraph = nx.DiGraph() + + for u in G.nodes: + c_u = charging_cofficient(G, u) + for v in G.neighbors(u): + w = consumption(G, u, v) + if w <= capacity: + for g in get_possible_arriving_soc(G, u, capacity): + c_v = charging_cofficient(G, v) + if c_v <= c_u and g < w: + weight = (w - g) * c_u + distance(G, u, v) + H.add_edge((u, g), (v, 0), weight=weight) + elif c_v > c_u: + weight = (capacity - g) * c_u + distance(G, u, v) + H.add_edge((u, g), (v, capacity - w), weight=weight) + + return H def compose_result(G: nx.Graph, path: Path) -> dict: diff --git a/tests/gasstation/test_gasstation_routing.py b/tests/gasstation/test_gasstation_routing.py index 8cfa66c..da7e6cd 100644 --- a/tests/gasstation/test_gasstation_routing.py +++ b/tests/gasstation/test_gasstation_routing.py @@ -1,169 +1,7 @@ -import networkx as nx - -import pytest from evrouting.gasstation.routing import ( shortest_path, - contract_graph, - dijkstra, - fold_path, - get_possible_arriving_soc -) -from evrouting.graph_tools import ( - label, - CONSUMPTION_KEY, - DISTANCE_KEY, - CHARGING_COEFFICIENT_KEY ) -from tests.config import edge_case, get_graph, gasstation, init_config - - -class TestDjikstra: - @pytest.mark.parametrize("u,v,min_len", [(0, 1, 1), (0, 2, 3), (0, 4, 2), (0, 6, 3)]) - def test_djikstra(self, u, v, min_len): - conf: dict = init_config(gasstation) - shortest_path = dijkstra(conf['G'], u, v, CONSUMPTION_KEY) - assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == min_len - - def test_djikstra_via_node(self): - conf: dict = init_config(edge_case) - shortest_path = dijkstra(conf['G'], 0, 2, CONSUMPTION_KEY) - - assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == 2 - - -class TestContraction: - def label_map(self, G: nx.Graph) -> dict: - return {label(G, n): n for n in G.nodes} - - def test_contraction(self): - conf: dict = init_config(gasstation) - G: nx.Graph = conf['G'] - H: nx.Graph = contract_graph(G, - conf['charging_stations'], - conf['capacity']) - - # Check available gas stations - assert set(H.nodes) == conf['charging_stations'] - - @pytest.mark.parametrize('u,v,weight,value', [ - ('s', 'a', CONSUMPTION_KEY, 1), - ('s', 'a', DISTANCE_KEY, 1), - ('s', 'f', '', None), # Not exist - ('s', 'b', '', None), # Not exist - ('s', 'd', '', None), # Not exist - ('s', 'c', CONSUMPTION_KEY, 2), - ('s', 'c', DISTANCE_KEY, 2), - ('s', 'e', '', None) - ]) - def test_contraction_edges(self, u, v, weight, value): - """ - Test edges and values of its weights. - :param u: Node - :param v: Node - :param weight: Weight key to check. - :param value: Value the edge weight should have. - If None, it tests if the egde does not exist. - """ - conf: dict = init_config(gasstation) - H: nx.Graph = contract_graph(conf['G'], - conf['charging_stations'], conf['capacity']) - - label_map = self.label_map(H) - try: - edge = (label_map[u], label_map[v]) - except KeyError: - # One of the edges is no an charging station - if value is None: - return True - raise - - if value is None: - assert edge not in H.edges - else: - assert H.edges[edge][weight] == value - - -class TestPossibleArrivingSoC: - U = 4 - w_lower = 3 - w_greater = 5 - - @pytest.fixture - def graph_w_below_U(self): - G = nx.Graph() - G.add_edge(0, 1, **{CONSUMPTION_KEY: self.w_lower, DISTANCE_KEY: 2 * self.w_lower}) - yield G - del G - - @pytest.fixture - def graph_w_gt_U(self): - G = nx.Graph() - G.add_edge(0, 1, **{CONSUMPTION_KEY: self.w_greater, DISTANCE_KEY: 2 * self.w_greater}) - yield G - del G - - @pytest.fixture - def graph_w_eq_U(self): - G = nx.Graph() - G.add_edge(0, 1, **{CONSUMPTION_KEY: self.U, DISTANCE_KEY: 2 * self.U}) - yield G - del G - - def test_unequal_charging_coeff_w_eq_U(self, graph_w_eq_U): - G = graph_w_eq_U - G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1}) - G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 2}) - - gv_lower = get_possible_arriving_soc(G, 0, self.U) - assert gv_lower == [0] - - gv_higher = get_possible_arriving_soc(G, 1, self.U) - assert gv_higher == [0] - - def test_equal_charging_coeff_w_lt_U(self, graph_w_below_U): - G = graph_w_below_U - G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1}) - G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 1}) - - gv_lower = get_possible_arriving_soc(G, 0, self.U) - assert gv_lower == [0] - - gv_higher = get_possible_arriving_soc(G, 1, self.U) - assert gv_higher == [0] - - def test_unequal_charging_coeff_w_lt_U(self, graph_w_below_U): - G = graph_w_below_U - G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1}) - G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 2}) - - gv_lower = get_possible_arriving_soc(G, 0, self.U) - assert gv_lower == [0] - - gv_higher = get_possible_arriving_soc(G, 1, self.U) - assert gv_higher == [0, self.U - self.w_lower] - - def test_equal_charging_coeff_w_eq_U(self, graph_w_eq_U): - G = graph_w_eq_U - G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1}) - G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 1}) - - gv_lower = get_possible_arriving_soc(G, 0, self.U) - assert gv_lower == [0] - - gv_higher = get_possible_arriving_soc(G, 1, self.U) - assert gv_higher == [0] - - @pytest.mark.parametrize('c_0,c_1', [(1, 2), (1, 1)]) - def test_equal_unreachable(self, graph_w_gt_U, c_0, c_1): - G: nx.Graph = graph_w_gt_U - G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0}) - G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1}) - - gv_lower = get_possible_arriving_soc(G, 0, self.U) - assert gv_lower == [0] - - gv_higher = get_possible_arriving_soc(G, 1, self.U) - assert gv_higher == [0] +from tests.config import edge_case, get_graph class TestRouting: diff --git a/tests/gasstation/test_transformations.py b/tests/gasstation/test_transformations.py new file mode 100644 index 0000000..62364ab --- /dev/null +++ b/tests/gasstation/test_transformations.py @@ -0,0 +1,238 @@ +import networkx as nx + +import pytest +from evrouting.gasstation.routing import ( + contract_graph, + dijkstra, + fold_path, + get_possible_arriving_soc, + state_graph +) +from evrouting.graph_tools import ( + label, + CONSUMPTION_KEY, + DISTANCE_KEY, + CHARGING_COEFFICIENT_KEY +) +from tests.config import edge_case, gasstation, init_config + + +class TestDjikstra: + @pytest.mark.parametrize("u,v,min_len", [(0, 1, 1), (0, 2, 3), (0, 4, 2), (0, 6, 3)]) + def test_djikstra(self, u, v, min_len): + conf: dict = init_config(gasstation) + shortest_path = dijkstra(conf['G'], u, v, CONSUMPTION_KEY) + assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == min_len + + def test_djikstra_via_node(self): + conf: dict = init_config(edge_case) + shortest_path = dijkstra(conf['G'], 0, 2, CONSUMPTION_KEY) + + assert fold_path(conf['G'], shortest_path, CONSUMPTION_KEY) == 2 + + +class TestContraction: + def label_map(self, G: nx.Graph) -> dict: + return {label(G, n): n for n in G.nodes} + + def test_contraction(self): + conf: dict = init_config(gasstation) + G: nx.Graph = conf['G'] + H: nx.Graph = contract_graph(G, + conf['charging_stations'], + conf['capacity']) + + # Check available gas stations + assert set(H.nodes) == conf['charging_stations'] + + @pytest.mark.parametrize('u,v,weight,value', [ + ('s', 'a', CONSUMPTION_KEY, 1), + ('s', 'a', DISTANCE_KEY, 1), + ('s', 'f', '', None), # Not exist + ('s', 'b', '', None), # Not exist + ('s', 'd', '', None), # Not exist + ('s', 'c', CONSUMPTION_KEY, 2), + ('s', 'c', DISTANCE_KEY, 2), + ('s', 'e', '', None) + ]) + def test_contraction_edges(self, u, v, weight, value): + """ + Test edges and values of its weights. + :param u: Node + :param v: Node + :param weight: Weight key to check. + :param value: Value the edge weight should have. + If None, it tests if the egde does not exist. + """ + conf: dict = init_config(gasstation) + H: nx.Graph = contract_graph(conf['G'], + conf['charging_stations'], conf['capacity']) + + label_map = self.label_map(H) + try: + edge = (label_map[u], label_map[v]) + except KeyError: + # One of the edges is no an charging station + if value is None: + return True + raise + + if value is None: + assert edge not in H.edges + else: + assert H.edges[edge][weight] == value + + +class MinimalExamples: + U = 4 + w_lower = 3 + w_greater = 5 + + @staticmethod + def d(w): + return 2 * w + + @pytest.fixture + def graph_w_below_U(self): + G = nx.Graph() + G.add_edge(0, 1, **{CONSUMPTION_KEY: self.w_lower, DISTANCE_KEY: self.d(self.w_lower)}) + yield G + del G + + @pytest.fixture + def graph_w_gt_U(self): + G = nx.Graph() + G.add_edge(0, 1, **{CONSUMPTION_KEY: self.w_greater, DISTANCE_KEY: self.d(self.w_greater)}) + yield G + del G + + @pytest.fixture + def graph_w_eq_U(self): + G = nx.Graph() + G.add_edge(0, 1, **{CONSUMPTION_KEY: self.U, DISTANCE_KEY: self.d(self.U)}) + yield G + del G + + +class TestPossibleArrivingSoC(MinimalExamples): + + def test_unequal_charging_coeff_w_eq_U(self, graph_w_eq_U): + G = graph_w_eq_U + G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1}) + G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 2}) + + gv_lower = get_possible_arriving_soc(G, 0, self.U) + assert gv_lower == [0] + + gv_higher = get_possible_arriving_soc(G, 1, self.U) + assert gv_higher == [0] + + def test_equal_charging_coeff_w_lt_U(self, graph_w_below_U): + G = graph_w_below_U + G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1}) + G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 1}) + + gv_lower = get_possible_arriving_soc(G, 0, self.U) + assert gv_lower == [0] + + gv_higher = get_possible_arriving_soc(G, 1, self.U) + assert gv_higher == [0] + + def test_unequal_charging_coeff_w_lt_U(self, graph_w_below_U): + G = graph_w_below_U + G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1}) + G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 2}) + + gv_lower = get_possible_arriving_soc(G, 0, self.U) + assert gv_lower == [0] + + gv_higher = get_possible_arriving_soc(G, 1, self.U) + assert gv_higher == [0, self.U - self.w_lower] + + def test_equal_charging_coeff_w_eq_U(self, graph_w_eq_U): + G = graph_w_eq_U + G.add_node(0, **{CHARGING_COEFFICIENT_KEY: 1}) + G.add_node(1, **{CHARGING_COEFFICIENT_KEY: 1}) + + gv_lower = get_possible_arriving_soc(G, 0, self.U) + assert gv_lower == [0] + + gv_higher = get_possible_arriving_soc(G, 1, self.U) + assert gv_higher == [0] + + @pytest.mark.parametrize('c_0,c_1', [(1, 2), (1, 1)]) + def test_equal_unreachable(self, graph_w_gt_U, c_0, c_1): + G: nx.Graph = graph_w_gt_U + G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0}) + G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1}) + + gv_lower = get_possible_arriving_soc(G, 0, self.U) + assert gv_lower == [0] + + gv_higher = get_possible_arriving_soc(G, 1, self.U) + assert gv_higher == [0] + + +class TestStateGraph(MinimalExamples): + def test_unequal_charging_coeff_w_eq_U(self, graph_w_eq_U): + G = graph_w_eq_U + c_0 = 1 + c_1 = 2 + G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0}) + G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1}) + + H: nx.Graph = state_graph(G, self.U) + assert set(H.nodes) == {(0, 0), (1, 0)} + assert len(H.edges) == 2 + assert H.edges[(0, 0), (1, 0)]['weight'] == self.U * c_0 + self.d(self.U) + assert H.edges[(1, 0), (0, 0)]['weight'] == self.U * c_1 + self.d(self.U) + + def test_unequal_charging_coeff_w_lt_U(self, graph_w_below_U): + G = graph_w_below_U + c_0 = 1 + c_1 = 2 + G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0}) + G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1}) + + H: nx.Graph = state_graph(G, self.U) + assert set(H.nodes) == {(0, 0), (1, 0), (1, 1)} + assert len(H.edges) == 3 + assert H.edges[(0, 0), (1, 1)]['weight'] == self.U * c_0 + self.d(self.w_lower) + assert H.edges[(1, 0), (0, 0)]['weight'] == self.w_lower * c_1 + self.d(self.w_lower) + assert H.edges[(1, 1), (0, 0)]['weight'] == (self.w_lower - 1) * c_1 + self.d(self.w_lower) + + def test_equal_charging_coeff_w_lt_U(self, graph_w_below_U): + G = graph_w_below_U + c_0 = 1 + c_1 = 1 + G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0}) + G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1}) + + H: nx.Graph = state_graph(G, self.U) + assert set(H.nodes) == {(0, 0), (1, 0)} + assert len(H.edges) == 2 + assert H.edges[(0, 0), (1, 0)]['weight'] == self.w_lower * c_0 + self.d(self.w_lower) + assert H.edges[(1, 0), (0, 0)]['weight'] == self.w_lower * c_1 + self.d(self.w_lower) + + def test_equal_charging_coeff_w_eq_U(self, graph_w_eq_U): + G = graph_w_eq_U + c_0 = 1 + c_1 = 1 + G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0}) + G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1}) + + H: nx.Graph = state_graph(G, self.U) + assert set(H.nodes) == {(0, 0), (1, 0)} + assert len(H.edges) == 2 + assert H.edges[(0, 0), (1, 0)]['weight'] == self.U * c_0 + self.d(self.U) + assert H.edges[(1, 0), (0, 0)]['weight'] == self.U * c_1 + self.d(self.U) + + @pytest.mark.parametrize('c_0,c_1', [(1, 2), (1, 1)]) + def test_equal_unreachable(self, graph_w_gt_U, c_0, c_1): + G: nx.Graph = graph_w_gt_U + G.add_node(0, **{CHARGING_COEFFICIENT_KEY: c_0}) + G.add_node(1, **{CHARGING_COEFFICIENT_KEY: c_1}) + + H: nx.Graph = state_graph(G, self.U) + assert len(H.nodes) == 0 + assert len(H.edges) == 0 -- GitLab