Skip to content
Snippets Groups Projects
Commit 1b819739 authored by markn92's avatar markn92
Browse files

little refactor

parent 22a1894f
No related branches found
No related tags found
No related merge requests found
from typing import Tuple, TypeVar, Callable, List
import networkx as nx
from evrouting.T import Node, SoC
State = Tuple[Node, SoC]
N = TypeVar('N')
DistFunction = Callable[[nx.Graph, N, N, str], List[N]]
from typing import Set, Callable, List, Any, Tuple
from typing import Set, List, Tuple
import networkx as nx
from evrouting.T import Node, SoC, Result, EmptyResult
from evrouting.T import Node, SoC, Result, EmptyResult, Time
from evrouting.gasstation.T import State, DistFunction
from evrouting.gasstation.utils import dijkstra, fold_path
from evrouting.graph_tools import (
CONSUMPTION_KEY,
DISTANCE_KEY,
......@@ -10,20 +12,6 @@ from evrouting.graph_tools import (
charging_cofficient
)
Path = List[Node]
DistFunction = Callable[[nx.Graph, Node, Node, str], Path]
def dijkstra(G: nx.Graph, u: Any, v: Any, weight: str = 'weight') -> list:
try:
return nx.algorithms.shortest_path(G, u, v, weight=weight)
except nx.NetworkXNoPath:
return []
def fold_path(G: nx.Graph, path: Path, weight: str):
return sum([G.edges[u, v][weight] for u, v in zip(path[:-1], path[1:])])
def insert_start_node(s: Node,
graph_core: nx.Graph,
......@@ -36,8 +24,9 @@ def insert_start_node(s: Node,
) -> nx.DiGraph:
"""Insert s into extended graph an create states and edges as necessary."""
graph_extended.add_node((s, initial_soc))
v: Node
for v in gas_stations:
shortest_p = dist(graph_core, s, v, weight=CONSUMPTION_KEY)
shortest_p: List[Node] = dist(graph_core, s, v, weight=CONSUMPTION_KEY)
w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY)
if w > initial_soc:
continue
......@@ -77,8 +66,9 @@ def insert_final_node(t: Node,
) -> nx.DiGraph:
"""Insert terminal node into extended graph an create states and edges as necessary."""
graph_extended.add_node((t, final_soc))
u: Node
for u in gas_stations:
shortest_p = dist(graph_core, t, u, weight=CONSUMPTION_KEY)
shortest_p: List[Node] = dist(graph_core, t, u, weight=CONSUMPTION_KEY)
w = fold_path(graph_core, shortest_p, weight=CONSUMPTION_KEY)
if w + final_soc > capacity:
continue
......@@ -115,7 +105,7 @@ def contract_graph(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
H.add_node(cs, **G.nodes[cs])
# Iterate unvisited charging stations
for n_cs in [n for n in charging_stations if (n, cs) not in H.edges and n != cs]:
min_path: Path = dist(G, cs, n_cs)
min_path: List[Node] = dist(G, cs, n_cs)
consumption: SoC = fold_path(G, min_path, weight=CONSUMPTION_KEY)
if consumption <= capacity:
H.add_edge(
......@@ -166,16 +156,20 @@ def state_graph(G: nx.Graph, capacity: SoC) -> nx.DiGraph:
return H
def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph, path: List[Tuple[Node, Node]],
dist=dijkstra) -> Result:
trip_time = sum([extended_graph.edges[u, v]['weight'] for u, v in zip(path[:-1], path[1:])])
def compose_result(graph_core: nx.Graph, extended_graph: nx.DiGraph,
path: List[State], dist=dijkstra) -> Result:
trip_time: Time = 0
charge_path = []
u: Node
v: Node
g_u: SoC
g_v: SoC
for i in range(len(path) - 1):
u, g_u = path[i]
v, g_v = path[i + 1]
t = extended_graph.edges[(u, g_u), (v, g_v)]['weight']
charge_time_u = t - fold_path(
t: Time = extended_graph.edges[(u, g_u), (v, g_v)]['weight']
trip_time += t
charge_time_u: Time = t - fold_path(
graph_core,
dist(graph_core, u, v, weight=DISTANCE_KEY),
weight=DISTANCE_KEY
......@@ -192,12 +186,13 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
"""
Calculates shortest path using a generalized gas station algorithm.
:param G: Input Graph
:param s: Start Node identifier
:param t: End Node identifier
:param b_0: Start SoC
:param b_t: End SoC
:param U: Capacity
:param G:
:param charging_stations:
:param s:
:param t:
:param initial_soc:
:param final_soc:
:param capacity:
:return:
"""
# Check if t is reachable from s
......@@ -231,7 +226,7 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
final_soc=final_soc
)
path: List[Tuple[Node, Node]] = dijkstra(extended_graph, (s, initial_soc), (t, final_soc))
path: List[State] = dijkstra(extended_graph, (s, initial_soc), (t, final_soc))
return EmptyResult() if not path else compose_result(
graph_core=G,
......
from typing import List
import networkx as nx
from evrouting.gasstation.T import N
def dijkstra(G: nx.Graph, u: N, v: N, weight: str = 'weight') -> List[N]:
try:
return nx.algorithms.shortest_path(G, u, v, weight=weight)
except nx.NetworkXNoPath:
return []
def fold_path(G: nx.Graph, path: List[N], weight: str):
return sum([G.edges[u, v][weight] for u, v in zip(path[:-1], path[1:])])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment