diff --git a/evrouting/charge/routing.py b/evrouting/charge/routing.py index 713b63737b2fd62e13e0de55617869aa31b2e2cb..02d74cc7e4697be5148d10e933837f1070343b2b 100644 --- a/evrouting/charge/routing.py +++ b/evrouting/charge/routing.py @@ -1,4 +1,5 @@ -from typing import Dict, List +"""Module contains the main algorithm.""" +from typing import Dict, List, Tuple, Set from math import inf import networkx as nx @@ -14,58 +15,33 @@ from evrouting.charge.factories import ( ) -def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, +def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, initial_soc: SoC, final_soc: SoC, capacity: SoC): """ Calculates shortest path using the CHarge algorithm. - :param G: - :param charging_stations: - :param s: - :param t: - :param initial_soc: - :param final_soc: - :param capacity: + :param G: Graph to work on + :param charging_stations: Set containing identifiers of all + charging stations + :param s: Start Node + :param t: End Node + :param initial_soc: SoC at s + :param final_soc: SoC at t + :param capacity: Battery capacity + :return: """ - # Add node that is only connected to the final node and takes no time - # to travel but consumes exactly the amount of energy that should be - # left at t (final_soc). The node becomes the new final node. - dummy_final_node: Node = len(G) - G.add_node(dummy_final_node) - G.add_edge(t, dummy_final_node, weight=0, c=final_soc) - t = dummy_final_node - - # Init factories - cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc) - f_soc_factory = SoCFunctionFactory(cf_map) - soc_profile_factory = SoCProfileFactory(G, capacity) - - # Init maps to manage labels - l_set: Dict[int, List[Label]] = {v: [] for v in G} - l_uns: Dict[int, LabelPriorityQueue] = { - v: LabelPriorityQueue(f_soc_factory, l_set[v]) for v in G - } - - # Add dummy charging station with charging function - # cf(t) = initial_soc (ie charging coefficient is zero). - dummy_node: Node = len(G.nodes) - G.add_node(dummy_node, c=0) - charging_stations.add(dummy_node) + t, factories, queues = _setup( + G, charging_stations, capacity, initial_soc, final_soc, s, t + ) - # Register dummy charging station as the last - # seen charging station before s. - l_uns[s].insert(Label( - t_trip=0, - soc_last_cs=initial_soc, - last_cs=dummy_node, - soc_profile_cs_v=soc_profile_factory(s) - )) + f_soc_factory: SoCFunctionFactory = factories['f_soc'] + soc_profile_factory: SoCProfileFactory = factories['soc_profile'] + cf_map: ChargingFunctionMap = factories['cf'] - # A priority queue defines which node to visit next. - # The key is the trip time. - prio_queue = PriorityQueue() - prio_queue.insert(s, priority=0, count=0) + l_set: Dict[int, List[Label]] = queues['settled labels'] + l_uns: Dict[int, LabelPriorityQueue] = queues['unsettled labels'] + prio_queue: PriorityQueue = queues['priority queue'] while prio_queue: node_min: Node = prio_queue.peak_min() @@ -128,9 +104,79 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, # Update queue if entered label is the new minimum label # of the neighbour. try: - is_new_min_label: bool = label_neighbour == l_uns[n].peak_min() + is_new_min: bool = label_neighbour == l_uns[n].peak_min() except KeyError: continue - if is_new_min_label: + if is_new_min: prio_queue.insert(n, **keys(f_soc_factory(label_neighbour))) + + +def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, + initial_soc: SoC, final_soc: SoC, s: Node, t: Node + ) -> Tuple[Node, Dict, Dict]: + """ + Initialises the data structures and graph setup. + + :returns: Tupel(t, factories, queues): + :t: The new dummy final node taking care of the final SoC. + :factories: A dict containing factory functions for: + :```factories['f_soc']```: The SoC Functions + :```factories['cf']```: The Charging Functions + :```factories['soc_profile']```: The SoC Profiles + :queues: A dict containing initialized queues for the algorithm. + :```queues['settled labels']```: + :```queues['unsettled labels']```: + :```queues['priority queue'']```: + """ + # Add node that is only connected to the final node and takes no time + # to travel but consumes exactly the amount of energy that should be + # left at t (final_soc). The node becomes the new final node. + dummy_final_node: Node = len(G) + G.add_node(dummy_final_node) + G.add_edge(t, dummy_final_node, weight=0, c=final_soc) + t = dummy_final_node + + # Init factories + cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc) + f_soc_factory = SoCFunctionFactory(cf_map) + soc_profile_factory = SoCProfileFactory(G, capacity) + + # Init maps to manage labels + l_set: Dict[int, List[Label]] = {v: [] for v in G} + l_uns: Dict[int, LabelPriorityQueue] = { + v: LabelPriorityQueue(f_soc_factory, l_set[v]) for v in G + } + + # Add dummy charging station with charging function + # cf(t) = initial_soc (ie charging coefficient is zero). + dummy_node: Node = len(G.nodes) + G.add_node(dummy_node, c=0) + charging_stations.add(dummy_node) + + # Register dummy charging station as the last + # seen charging station before s. + l_uns[s].insert(Label( + t_trip=0, + soc_last_cs=initial_soc, + last_cs=dummy_node, + soc_profile_cs_v=soc_profile_factory(s) + )) + + # A priority queue defines which node to visit next. + # The key is the trip time. + prio_queue: PriorityQueue = PriorityQueue() + prio_queue.insert(s, priority=0, count=0) + + return (t, # New final Node + { # factories + 'f_soc': f_soc_factory, + 'cf': cf_map, + 'soc_profile': soc_profile_factory + }, + { # queues + 'settled labels': l_set, + 'unsettled labels': l_uns, + 'priority queue': prio_queue + } + )