Skip to content
Snippets Groups Projects
Commit 8d5ac7e5 authored by markn92's avatar markn92
Browse files

doc

parent a80c8c01
No related branches found
No related tags found
No related merge requests found
from typing import Dict, List """Module contains the main algorithm."""
from typing import Dict, List, Tuple, Set
from math import inf from math import inf
import networkx as nx import networkx as nx
...@@ -14,58 +15,33 @@ from evrouting.charge.factories import ( ...@@ -14,58 +15,33 @@ from evrouting.charge.factories import (
) )
def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
initial_soc: SoC, final_soc: SoC, capacity: SoC): initial_soc: SoC, final_soc: SoC, capacity: SoC):
""" """
Calculates shortest path using the CHarge algorithm. Calculates shortest path using the CHarge algorithm.
:param G: :param G: Graph to work on
:param charging_stations: :param charging_stations: Set containing identifiers of all
:param s: charging stations
:param t: :param s: Start Node
:param initial_soc: :param t: End Node
:param final_soc: :param initial_soc: SoC at s
:param capacity: :param final_soc: SoC at t
:param capacity: Battery capacity
:return: :return:
""" """
# Add node that is only connected to the final node and takes no time t, factories, queues = _setup(
# to travel but consumes exactly the amount of energy that should be G, charging_stations, capacity, initial_soc, final_soc, s, t
# left at t (final_soc). The node becomes the new final node. )
dummy_final_node: Node = len(G)
G.add_node(dummy_final_node)
G.add_edge(t, dummy_final_node, weight=0, c=final_soc)
t = dummy_final_node
# Init factories
cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
f_soc_factory = SoCFunctionFactory(cf_map)
soc_profile_factory = SoCProfileFactory(G, capacity)
# Init maps to manage labels
l_set: Dict[int, List[Label]] = {v: [] for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {
v: LabelPriorityQueue(f_soc_factory, l_set[v]) for v in G
}
# Add dummy charging station with charging function
# cf(t) = initial_soc (ie charging coefficient is zero).
dummy_node: Node = len(G.nodes)
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Register dummy charging station as the last f_soc_factory: SoCFunctionFactory = factories['f_soc']
# seen charging station before s. soc_profile_factory: SoCProfileFactory = factories['soc_profile']
l_uns[s].insert(Label( cf_map: ChargingFunctionMap = factories['cf']
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(s)
))
# A priority queue defines which node to visit next. l_set: Dict[int, List[Label]] = queues['settled labels']
# The key is the trip time. l_uns: Dict[int, LabelPriorityQueue] = queues['unsettled labels']
prio_queue = PriorityQueue() prio_queue: PriorityQueue = queues['priority queue']
prio_queue.insert(s, priority=0, count=0)
while prio_queue: while prio_queue:
node_min: Node = prio_queue.peak_min() node_min: Node = prio_queue.peak_min()
...@@ -128,9 +104,79 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, ...@@ -128,9 +104,79 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
# Update queue if entered label is the new minimum label # Update queue if entered label is the new minimum label
# of the neighbour. # of the neighbour.
try: try:
is_new_min_label: bool = label_neighbour == l_uns[n].peak_min() is_new_min: bool = label_neighbour == l_uns[n].peak_min()
except KeyError: except KeyError:
continue continue
if is_new_min_label: if is_new_min:
prio_queue.insert(n, **keys(f_soc_factory(label_neighbour))) prio_queue.insert(n, **keys(f_soc_factory(label_neighbour)))
def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
initial_soc: SoC, final_soc: SoC, s: Node, t: Node
) -> Tuple[Node, Dict, Dict]:
"""
Initialises the data structures and graph setup.
:returns: Tupel(t, factories, queues):
:t: The new dummy final node taking care of the final SoC.
:factories: A dict containing factory functions for:
:```factories['f_soc']```: The SoC Functions
:```factories['cf']```: The Charging Functions
:```factories['soc_profile']```: The SoC Profiles
:queues: A dict containing initialized queues for the algorithm.
:```queues['settled labels']```:
:```queues['unsettled labels']```:
:```queues['priority queue'']```:
"""
# Add node that is only connected to the final node and takes no time
# to travel but consumes exactly the amount of energy that should be
# left at t (final_soc). The node becomes the new final node.
dummy_final_node: Node = len(G)
G.add_node(dummy_final_node)
G.add_edge(t, dummy_final_node, weight=0, c=final_soc)
t = dummy_final_node
# Init factories
cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
f_soc_factory = SoCFunctionFactory(cf_map)
soc_profile_factory = SoCProfileFactory(G, capacity)
# Init maps to manage labels
l_set: Dict[int, List[Label]] = {v: [] for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {
v: LabelPriorityQueue(f_soc_factory, l_set[v]) for v in G
}
# Add dummy charging station with charging function
# cf(t) = initial_soc (ie charging coefficient is zero).
dummy_node: Node = len(G.nodes)
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Register dummy charging station as the last
# seen charging station before s.
l_uns[s].insert(Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(s)
))
# A priority queue defines which node to visit next.
# The key is the trip time.
prio_queue: PriorityQueue = PriorityQueue()
prio_queue.insert(s, priority=0, count=0)
return (t, # New final Node
{ # factories
'f_soc': f_soc_factory,
'cf': cf_map,
'soc_profile': soc_profile_factory
},
{ # queues
'settled labels': l_set,
'unsettled labels': l_uns,
'priority queue': prio_queue
}
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment