From 1b819739f33bb152f5643b46cc6ff2f379110a92 Mon Sep 17 00:00:00 2001 From: "niehues.mark@gmail.com" <niehues.mark@gmail.com> Date: Tue, 31 Mar 2020 13:12:08 +0200 Subject: [PATCH] little refactor --- evrouting/gasstation/T.py | 10 ++++++ evrouting/gasstation/routing.py | 59 +++++++++++++++------------------ evrouting/gasstation/utils.py | 14 ++++++++ 3 files changed, 51 insertions(+), 32 deletions(-) create mode 100644 evrouting/gasstation/T.py create mode 100644 evrouting/gasstation/utils.py diff --git a/evrouting/gasstation/T.py b/evrouting/gasstation/T.py new file mode 100644 index 0000000..bcf52ed --- /dev/null +++ b/evrouting/gasstation/T.py @@ -0,0 +1,10 @@ +from typing import Tuple, TypeVar, Callable, List + +import networkx as nx +from evrouting.T import Node, SoC + +State = Tuple[Node, SoC] + +N = TypeVar('N') + +DistFunction = Callable[[nx.Graph, N, N, str], List[N]] diff --git a/evrouting/gasstation/routing.py b/evrouting/gasstation/routing.py index 7f26a2a..e55f2bf 100644 --- a/evrouting/gasstation/routing.py +++ b/evrouting/gasstation/routing.py @@ -1,7 +1,9 @@ -from typing import Set, Callable, List, Any, Tuple +from typing import Set, List, Tuple import networkx as nx -from evrouting.T import Node, SoC, Result, EmptyResult +from evrouting.T import Node, SoC, Result, EmptyResult, Time +from evrouting.gasstation.T import State, DistFunction +from evrouting.gasstation.utils import dijkstra, fold_path from evrouting.graph_tools import ( CONSUMPTION_KEY, DISTANCE_KEY, @@ -10,20 +12,6 @@ from evrouting.graph_tools import ( charging_cofficient ) -Path = List[Node] -DistFunction = Callable[[nx.Graph, Node, Node, str], Path] - - -def dijkstra(G: nx.Graph, u: Any, v: Any, weight: str = 'weight') -> list: - try: - return nx.algorithms.shortest_path(G, u, v, weight=weight) - except nx.NetworkXNoPath: - return [] - - -def fold_path(G: nx.Graph, path: Path, weight: str): - return sum([G.edges[u, v][weight] for u, v in zip(path[:-1], path[1:])]) - def insert_start_node(s: Node, graph_core: nx.Graph, @@ -36,8 +24,9 @@ def insert_start_node(s: Node, ) -> nx.DiGraph: """Insert s into extended graph an create states and edges as necessary.""" graph_extended.add_node((s, initial_soc)) + v: Node for v in gas_stations: - shortest_p = dist(graph_core, s, v, weight=CONSUMPTION_KEY) + shortest_p: List[Node] = dist(graph_core, s, v, weight=CONSUMPTION_KEY) w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY) if w > initial_soc: continue @@ -77,8 +66,9 @@ def insert_final_node(t: Node, ) -> nx.DiGraph: """Insert terminal node into extended graph an create states and edges as necessary.""" graph_extended.add_node((t, final_soc)) + u: Node for u in gas_stations: - shortest_p = dist(graph_core, t, u, weight=CONSUMPTION_KEY) + shortest_p: List[Node] = dist(graph_core, t, u, weight=CONSUMPTION_KEY) w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY) if w + final_soc > capacity: continue @@ -115,7 +105,7 @@ def contract_graph(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, H.add_node(cs, **G.nodes[cs]) # Iterate unvisited charging stations for n_cs in [n for n in charging_stations if (n, cs) not in H.edges and n != cs]: - min_path: Path = dist(G, cs, n_cs) + min_path: List[Node] = dist(G, cs, n_cs) consumption: SoC = fold_path(G, min_path, weight=CONSUMPTION_KEY) if consumption <= capacity: H.add_edge( @@ -166,16 +156,20 @@ def state_graph(G: nx.Graph, capacity: SoC) -> nx.DiGraph: return H -def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph, path: List[Tuple[Node, Node]], - dist=dijkstra) -> Result: - trip_time = sum([extended_graph.edges[u, v]['weight'] for u, v in zip(path[:-1], path[1:])]) - +def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph, + path: List[State], dist=dijkstra) -> Result: + trip_time: Time = 0 charge_path = [] + u: Node + v: Node + g_u: SoC + g_v: SoC for i in range(len(path) - 1): u, g_u = path[i] v, g_v = path[i + 1] - t = extended_graph.edges[(u, g_u), (v, g_v)]['weight'] - charge_time_u = t - fold_path( + t: Time = extended_graph.edges[(u, g_u), (v, g_v)]['weight'] + trip_time += t + charge_time_u: Time = t - fold_path( graph_core, dist(graph_core, u, v, weight=DISTANCE_KEY), weight=DISTANCE_KEY @@ -192,12 +186,13 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, """ Calculates shortest path using a generalized gas station algorithm. - :param G: Input Graph - :param s: Start Node identifier - :param t: End Node identifier - :param b_0: Start SoC - :param b_t: End SoC - :param U: Capacity + :param G: + :param charging_stations: + :param s: + :param t: + :param initial_soc: + :param final_soc: + :param capacity: :return: """ # Check if t is reachable from s @@ -231,7 +226,7 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, final_soc=final_soc ) - path: List[Tuple[Node, Node]] = dijkstra(extended_graph, (s, initial_soc), (t, final_soc)) + path: List[State] = dijkstra(extended_graph, (s, initial_soc), (t, final_soc)) return EmptyResult() if not path else compose_result( graph_core=G, diff --git a/evrouting/gasstation/utils.py b/evrouting/gasstation/utils.py new file mode 100644 index 0000000..854d41e --- /dev/null +++ b/evrouting/gasstation/utils.py @@ -0,0 +1,14 @@ +from typing import List +import networkx as nx +from evrouting.gasstation.T import N + + +def dijkstra(G: nx.Graph, u: N, v: N, weight: str = 'weight') -> List[N]: + try: + return nx.algorithms.shortest_path(G, u, v, weight=weight) + except nx.NetworkXNoPath: + return [] + + +def fold_path(G: nx.Graph, path: List[N], weight: str): + return sum([G.edges[u, v][weight] for u, v in zip(path[:-1], path[1:])]) -- GitLab