"""Module contains the main algorithm.""" from typing import Dict, List, Tuple, Set from math import inf import networkx as nx from evrouting.T import Node, SoC from evrouting.utils import PriorityQueue from evrouting.graph_tools import distance from evrouting.charge.T import SoCFunction, Label from evrouting.charge.utils import LabelPriorityQueue, keys from evrouting.charge.factories import ( ChargingFunctionMap, SoCFunctionFactory, SoCProfileFactory ) def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, initial_soc: SoC, final_soc: SoC, capacity: SoC): """ Calculates shortest path using the CHarge algorithm. :param G: Graph to work on :param charging_stations: Set containing identifiers of all charging stations :param s: Start Node :param t: End Node :param initial_soc: SoC at s :param final_soc: SoC at t :param capacity: Battery capacity :return: """ t, factories, queues = _setup( G, charging_stations, capacity, initial_soc, final_soc, s, t ) f_soc_factory: SoCFunctionFactory = factories['f_soc'] soc_profile_factory: SoCProfileFactory = factories['soc_profile'] cf_map: ChargingFunctionMap = factories['cf'] l_set: Dict[int, List[Label]] = queues['settled labels'] l_uns: Dict[int, LabelPriorityQueue] = queues['unsettled labels'] prio_queue: PriorityQueue = queues['priority queue'] while prio_queue: node_min: Node = prio_queue.peak_min() label_node_min: Label = l_uns[node_min].delete_min() l_set[node_min].append(label_node_min) if node_min == t: return f_soc_factory(label_node_min).minimum # Handle charging stations if node_min in charging_stations and node_min != label_node_min.last_cs: f_soc: SoCFunction = f_soc_factory(label_node_min) t_charge = f_soc.calc_optimal_t_charge(cf_map[node_min]) if t_charge is not None: # Spawn new label at t_charge l_uns[node_min].insert( Label( t_trip=label_node_min.t_trip + t_charge, soc_last_cs=f_soc(label_node_min.t_trip + t_charge), last_cs=node_min, soc_profile_cs_v=soc_profile_factory(node_min) ) ) # Update priority queue. This node might have gotten a new # minimum label spawned is th previous step. try: prio_queue.insert( item=node_min, **keys(f_soc_factory(l_uns[node_min].peak_min())) ) except KeyError: # l_uns[v] empty prio_queue.delete_min() # scan outgoing arcs for n in G.neighbors(node_min): # Create SoC Profile for getting from minimum_node to n soc_profile = label_node_min.soc_profile_cs_v + \ soc_profile_factory(node_min, n) if soc_profile(capacity) != -inf: if cf_map[label_node_min.last_cs].is_dummy \ and soc_profile.path_cost > label_node_min.soc_last_cs: # Dummy charging stations cannot increase SoC. # Therefore paths that consume more energy than the SoC # when arriving at the (dummy) station are unfeasible. continue label_neighbour: Label = Label( t_trip=label_node_min.t_trip + distance(G, node_min, n), soc_last_cs=label_node_min.soc_last_cs, last_cs=label_node_min.last_cs, soc_profile_cs_v=soc_profile ) l_uns[n].insert(label_neighbour) # Update queue if entered label is the new minimum label # of the neighbour. try: is_new_min: bool = label_neighbour == l_uns[n].peak_min() except KeyError: continue if is_new_min: prio_queue.insert(n, **keys(f_soc_factory(label_neighbour))) def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, initial_soc: SoC, final_soc: SoC, s: Node, t: Node ) -> Tuple[Node, Dict, Dict]: """ Initialises the data structures and graph setup. :returns: Tupel(t, factories, queues): :t: The new dummy final node taking care of the final SoC. :factories: A dict containing factory functions for: :```factories['f_soc']```: The SoC Functions :```factories['cf']```: The Charging Functions :```factories['soc_profile']```: The SoC Profiles :queues: A dict containing initialized queues for the algorithm. :```queues['settled labels']```: :```queues['unsettled labels']```: :```queues['priority queue'']```: """ # Add node that is only connected to the final node and takes no time # to travel but consumes exactly the amount of energy that should be # left at t (final_soc). The node becomes the new final node. dummy_final_node: Node = len(G) G.add_node(dummy_final_node) G.add_edge(t, dummy_final_node, weight=0, c=final_soc) t = dummy_final_node # Init factories cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc) f_soc_factory = SoCFunctionFactory(cf_map) soc_profile_factory = SoCProfileFactory(G, capacity) # Init maps to manage labels l_set: Dict[int, List[Label]] = {v: [] for v in G} l_uns: Dict[int, LabelPriorityQueue] = { v: LabelPriorityQueue(f_soc_factory, l_set[v]) for v in G } # Add dummy charging station with charging function # cf(t) = initial_soc (ie charging coefficient is zero). dummy_node: Node = len(G.nodes) G.add_node(dummy_node, c=0) charging_stations.add(dummy_node) # Register dummy charging station as the last # seen charging station before s. l_uns[s].insert(Label( t_trip=0, soc_last_cs=initial_soc, last_cs=dummy_node, soc_profile_cs_v=soc_profile_factory(s) )) # A priority queue defines which node to visit next. # The key is the trip time. prio_queue: PriorityQueue = PriorityQueue() prio_queue.insert(s, priority=0, count=0) return (t, # New final Node { # factories 'f_soc': f_soc_factory, 'cf': cf_map, 'soc_profile': soc_profile_factory }, { # queues 'settled labels': l_set, 'unsettled labels': l_uns, 'priority queue': prio_queue } )