Skip to content
Snippets Groups Projects
routing.py 9.14 KiB
Newer Older
markn92's avatar
markn92 committed
"""
Implementation of the CHArge algorithm [0] with two further constraints:

    1. There are no negative path costs (ie no recurpation).
    2. All charging stations have linear charging functions.

[0] https://dl.acm.org/doi/10.1145/2820783.2820826

"""
markn92's avatar
markn92 committed
from typing import Dict, List, Tuple, Set
markn92's avatar
markn92 committed
from math import inf

markn92's avatar
markn92 committed
import networkx as nx
markn92's avatar
markn92 committed
from evrouting.T import Node, SoC, Time, Result, EmptyResult, ConsumptionFunction
markn92's avatar
markn92 committed
from evrouting.utils import PriorityQueue
markn92's avatar
markn92 committed
from evrouting.graph_tools import distance, consumption, DISTANCE_KEY, CONSUMPTION_KEY
markn92's avatar
markn92 committed
from evrouting.charge.T import SoCFunction, Label
markn92's avatar
markn92 committed
from evrouting.charge.utils import LabelPriorityQueue
markn92's avatar
markn92 committed
from evrouting.charge.factories import (
    ChargingFunctionMap,
markn92's avatar
markn92 committed
    SoCFunctionFactory,
    SoCProfileFactory
markn92's avatar
markn92 committed
)
markn92's avatar
markn92 committed

markn92's avatar
markn92 committed
def shortest_path(G: nx.DiGraph, charging_stations: Set[Node], s: Node, t: Node,
markn92's avatar
markn92 committed
                  initial_soc: SoC, final_soc: SoC, capacity: SoC, c=consumption) -> Result:
markn92's avatar
markn92 committed
    """
    Calculates shortest path using the CHarge algorithm.

markn92's avatar
markn92 committed
    :param G: Graph to work on
    :param charging_stations: Set containing identifiers of all
        charging stations
    :param s: Start Node
    :param t: End Node
    :param initial_soc: SoC at s
    :param final_soc: SoC at t
    :param capacity: Battery capacity

markn92's avatar
markn92 committed
    :return:
    """
    t, dummy_cs, factories, queues = _setup(
markn92's avatar
markn92 committed
        G=G,
        charging_stations=charging_stations,
        capacity=capacity,
        initial_soc=initial_soc,
        final_soc=final_soc,
        s=s,
        t=t,
        c=c
markn92's avatar
markn92 committed
    )
markn92's avatar
markn92 committed
    f_soc_factory: SoCFunctionFactory = factories['f_soc']
    soc_profile_factory: SoCProfileFactory = factories['soc_profile']
    cf_map: ChargingFunctionMap = factories['cf']
markn92's avatar
markn92 committed
    l_set: Dict[int, List[Label]] = queues['settled labels']
    l_uns: Dict[int, LabelPriorityQueue] = queues['unsettled labels']
    prio_queue: PriorityQueue = queues['priority queue']
markn92's avatar
markn92 committed
    # Shortcut for key function
    keys = LabelPriorityQueue.keys

markn92's avatar
markn92 committed
    while prio_queue:
markn92's avatar
markn92 committed
        node_min: Node = prio_queue.peak_min()
markn92's avatar
markn92 committed
        label_node_min: Label = l_uns[node_min].delete_min()
markn92's avatar
markn92 committed
        l_set[node_min].append(label_node_min)
markn92's avatar
markn92 committed
        if node_min == t:
markn92's avatar
markn92 committed
            _cleanup(G, t, charging_stations, dummy_cs)
markn92's avatar
markn92 committed
            return _result(
markn92's avatar
markn92 committed
                label_node_min, f_soc_factory(label_node_min).minimum
            )
markn92's avatar
markn92 committed
        # Handle charging stations
markn92's avatar
markn92 committed
        if node_min in charging_stations and node_min != label_node_min.last_cs:
markn92's avatar
markn92 committed
            f_soc: SoCFunction = f_soc_factory(label_node_min)
            t_charge = f_soc.calc_optimal_t_charge(cf_map[node_min])
markn92's avatar
markn92 committed

            if t_charge is not None:
markn92's avatar
markn92 committed
                # Spawn new label at t_charge
markn92's avatar
markn92 committed
                l_uns[node_min].insert(
markn92's avatar
markn92 committed
                    Label(
markn92's avatar
markn92 committed
                        t_trip=label_node_min.t_trip + t_charge,
                        soc_last_cs=f_soc(label_node_min.t_trip + t_charge),
                        last_cs=node_min,
markn92's avatar
markn92 committed
                        soc_profile_cs_v=soc_profile_factory(node_min),
                        parent_node=node_min,
                        parent_label=label_node_min
markn92's avatar
markn92 committed
                    )
                )
markn92's avatar
markn92 committed
        # Update priority queue. This node might have gotten a new
markn92's avatar
markn92 committed
        # minimum label spawned is the previous step.
        try:
            prio_queue.insert(
markn92's avatar
markn92 committed
                item=node_min,
                **keys(f_soc_factory(l_uns[node_min].peak_min()))
            )
        except KeyError:
            # l_uns[v] empty
            prio_queue.delete_min()
markn92's avatar
markn92 committed

        # scan outgoing arcs
markn92's avatar
markn92 committed
        for n in G.neighbors(node_min):
markn92's avatar
markn92 committed
            # Create SoC Profile for getting from minimum_node to n
markn92's avatar
markn92 committed
            soc_profile = label_node_min.soc_profile_cs_v + \
                          soc_profile_factory(node_min, n)
markn92's avatar
markn92 committed

markn92's avatar
markn92 committed
            if soc_profile(capacity) != -inf:
markn92's avatar
markn92 committed
                if cf_map[label_node_min.last_cs].is_dummy \
                        and soc_profile.path_cost > label_node_min.soc_last_cs:
                    # Dummy charging stations cannot increase SoC.
                    # Therefore paths that consume more energy than the SoC
                    # when arriving at the (dummy) station are unfeasible.
                    continue

                label_neighbour: Label = Label(
markn92's avatar
markn92 committed
                    t_trip=label_node_min.t_trip + distance(G, node_min, n),
                    soc_last_cs=label_node_min.soc_last_cs,
                    last_cs=label_node_min.last_cs,
markn92's avatar
markn92 committed
                    soc_profile_cs_v=soc_profile,
                    parent_node=node_min,
                    parent_label=label_node_min
markn92's avatar
markn92 committed
                )
markn92's avatar
markn92 committed
                l_uns[n].insert(label_neighbour)
markn92's avatar
markn92 committed

markn92's avatar
markn92 committed
                # Update queue if entered label is the new minimum label
                # of the neighbour.
markn92's avatar
markn92 committed
                try:
markn92's avatar
markn92 committed
                    is_new_min: bool = label_neighbour == l_uns[n].peak_min()
markn92's avatar
markn92 committed
                except KeyError:
                    continue

markn92's avatar
markn92 committed
                if is_new_min:
markn92's avatar
markn92 committed
                    prio_queue.insert(n, **keys(f_soc_factory(label_neighbour)))
markn92's avatar
markn92 committed

markn92's avatar
markn92 committed
    _cleanup(G, t, charging_stations, dummy_cs)
    return EmptyResult()


def _cleanup(G, t, charging_stations, dummy_cs):
markn92's avatar
markn92 committed
    G.remove_node(t)
markn92's avatar
markn92 committed
    G.remove_node(dummy_cs)
    charging_stations.remove(dummy_cs)
markn92's avatar
markn92 committed

markn92's avatar
markn92 committed

def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
markn92's avatar
markn92 committed
           initial_soc: SoC, final_soc: SoC, s: Node, t: Node,
           c: ConsumptionFunction
           ) -> Tuple[Node, Node, Dict, Dict]:
markn92's avatar
markn92 committed
    """
    Initialises the data structures and graph setup.

    :returns: Tupel(t, factories, queues):
        :t: The new dummy final node taking care of the final SoC.
        :factories: A dict containing factory functions for:
            :```factories['f_soc']```: The SoC Functions
            :```factories['cf']```: The Charging Functions
            :```factories['soc_profile']```: The SoC Profiles
        :queues: A dict containing initialized queues for the algorithm.
            :```queues['settled labels']```:
            :```queues['unsettled labels']```:
            :```queues['priority queue'']```:
    """
    # Add node that is only connected to the final node and takes no time
    # to travel but consumes exactly the amount of energy that should be
    # left at t (final_soc). The node becomes the new final node.
    dummy_final_node: Node = len(G)
    G.add_node(dummy_final_node)
markn92's avatar
markn92 committed
    G.add_edge(t, dummy_final_node, **{
        DISTANCE_KEY: 0,
        CONSUMPTION_KEY: final_soc
    })
markn92's avatar
markn92 committed
    t = dummy_final_node

    # Init factories
    cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
    f_soc_factory = SoCFunctionFactory(cf_map)
markn92's avatar
markn92 committed
    soc_profile_factory = SoCProfileFactory(G, capacity, c)
markn92's avatar
markn92 committed

    # Init maps to manage labels
    l_set: Dict[int, List[Label]] = {v: [] for v in G}
    l_uns: Dict[int, LabelPriorityQueue] = {
        v: LabelPriorityQueue(f_soc_factory, l_set[v]) for v in G
    }

    # Add dummy charging station with charging function
    # cf(t) = initial_soc (ie charging coefficient is zero).
    dummy_node: Node = len(G.nodes)
    G.add_node(dummy_node, c=0)
    charging_stations.add(dummy_node)

    # Register dummy charging station as the last
    # seen charging station before s.
    l_uns[s].insert(Label(
        t_trip=0,
        soc_last_cs=initial_soc,
        last_cs=dummy_node,
markn92's avatar
markn92 committed
        soc_profile_cs_v=soc_profile_factory(s),
        parent_node=None,
        parent_label=None
markn92's avatar
markn92 committed
    ))

    # A priority queue defines which node to visit next.
    # The key is the trip time.
    prio_queue: PriorityQueue = PriorityQueue()
    prio_queue.insert(s, priority=0, count=0)

    return (t,  # New final Node
            dummy_node,
markn92's avatar
markn92 committed
            {  # factories
                'f_soc': f_soc_factory,
                'cf': cf_map,
                'soc_profile': soc_profile_factory
            },
            {  # queues
                'settled labels': l_set,
                'unsettled labels': l_uns,
                'priority queue': prio_queue
            }
            )
markn92's avatar
markn92 committed
def _result(label: Label, f_soc_min: Time) -> Result:
markn92's avatar
markn92 committed
    """
    Returns a dict with two fields, as described below.

    :param label: The final label of the algorithm
    :param f_soc_min: The min time of the SoC Function of the final label
    :param node: The final node.

    :return Time result['trip_time']: The overall trip time ```f_soc_min```
markn92's avatar
markn92 committed
    :return List[Tuple[Node, Time]] result['path']: List of Nodes and their
        according charging time along the path.
markn92's avatar
markn92 committed
    """
markn92's avatar
markn92 committed
    # Remember where charging time applies
    # First entry comes from the time necessary to charge at the last
    # charging stop to reach the goal.
markn92's avatar
markn92 committed
    t_charge_map = {label.last_cs: f_soc_min - label.t_trip}

    # Skip inserted extra node
    node = label.parent_node
    label = label.parent_label
markn92's avatar
markn92 committed

    path = []
markn92's avatar
markn92 committed
    while label is not None:
        if node == label.parent_node:
            # Label got spawned at fixing t_charge of the parent's label
            # last_cs. For the current label holds: label.last_cs == node
            t_charge_map[label.parent_label.last_cs] = label.t_trip - label.parent_label.t_trip
        else:
            path.append((node, t_charge_map.get(node, 0)))
        node = label.parent_node
        label = label.parent_label

markn92's avatar
markn92 committed
    return Result(trip_time=f_soc_min, charge_path=path[::-1])