Skip to content
Snippets Groups Projects
routing.py 5.25 KiB
Newer Older
markn92's avatar
markn92 committed
from typing import Dict
markn92's avatar
markn92 committed
from math import inf

markn92's avatar
markn92 committed
import networkx as nx
markn92's avatar
markn92 committed
from evrouting.T import Node, SoC
markn92's avatar
markn92 committed
from evrouting.utils import PriorityQueue
markn92's avatar
markn92 committed
from evrouting.charge.factories import LabelsFactory, ChargingFunctionMap, soc_profile_factory
markn92's avatar
markn92 committed
from ..graph_tools import distance
markn92's avatar
markn92 committed
from .T import SoCFunction, SoCProfile, Label
from .utils import LabelPriorityQueue

__all__ = ['shortest_path']
markn92's avatar
markn92 committed


markn92's avatar
markn92 committed
def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
                  initial_soc: SoC, final_soc: SoC, capacity: SoC):
markn92's avatar
markn92 committed
    """
    Calculates shortest path using the CHarge algorithm.

    :param G: Input Graph
    :param s: Start Node identifier
    :param t: End Node identifier
markn92's avatar
markn92 committed
    :param beta_s: Start SoC
    :param beta_t: End SoC
markn92's avatar
markn92 committed
    :param U: Capacity
    :return:
    """
markn92's avatar
markn92 committed
    cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
markn92's avatar
markn92 committed
    label_factory = LabelsFactory(G, capacity, cf, initial_soc)
markn92's avatar
markn92 committed
    # Init maps to manage labels
markn92's avatar
markn92 committed
    l_set: Dict[int, set] = {v: set() for v in G}
markn92's avatar
markn92 committed
    l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue() for v in G}
markn92's avatar
markn92 committed
    # Init environment
    entry_label = _create_entry_label(G, charging_stations, s, initial_soc, capacity)
    l_uns[s].insert(entry_label, cf[entry_label.last_cs])
markn92's avatar
markn92 committed
    # A priority queue defines which node to visit next.
    # The key is the trip time.
    prio_queue = PriorityQueue()
    prio_queue.insert(s, 0)
markn92's avatar
markn92 committed

    while True:
        try:
markn92's avatar
markn92 committed
            minimum_node: Node = prio_queue.peak_min()
markn92's avatar
markn92 committed
        except KeyError:
            # empty queue
            break

markn92's avatar
markn92 committed
        label_minimum_node: Label = l_uns[minimum_node].delete_min()
        l_set[minimum_node].add(label_minimum_node)
markn92's avatar
markn92 committed
        if minimum_node == t:
markn92's avatar
markn92 committed
            return SoCFunction(label_minimum_node,
                               cf[label_minimum_node.last_cs]
                               ).minimum
markn92's avatar
markn92 committed

        # handle charging stations
markn92's avatar
markn92 committed
        if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs:
markn92's avatar
markn92 committed
            if cf[minimum_node] > cf[label_minimum_node.last_cs]:
                label_new = label_factory.spawn_label(minimum_node, label_minimum_node)
                l_uns[minimum_node].insert(label_new, cf[minimum_node])
markn92's avatar
markn92 committed
        # Update priority queue. This node might have gotten a new
        # minimum label spawned is th previous step.
markn92's avatar
markn92 committed
        _update_priority_queue(prio_queue, l_uns, minimum_node)
markn92's avatar
markn92 committed

        # scan outgoing arcs
markn92's avatar
markn92 committed
        for n in G.neighbors(minimum_node):
            # Create SoC Profile for getting from minimum_node to n
            soc_profile = label_minimum_node.soc_profile_cs_v + \
markn92's avatar
markn92 committed
                          soc_profile_factory(G, capacity, minimum_node, n)
markn92's avatar
markn92 committed

            if _is_feasible_path(soc_profile, capacity):
markn92's avatar
markn92 committed
                l_new = Label(
markn92's avatar
markn92 committed
                    t_trip=label_minimum_node.t_trip + distance(G, minimum_node, n),
                    soc_last_cs=label_minimum_node.soc_last_cs,
                    last_cs=label_minimum_node.last_cs,
                    soc_profile_cs_v=soc_profile
markn92's avatar
markn92 committed
                )
                try:
markn92's avatar
markn92 committed
                    l_uns[n].insert(l_new, cf[l_new.last_cs])
markn92's avatar
markn92 committed
                except ValueError:
markn92's avatar
markn92 committed
                    # Infeasible because last_cs might be an
                    # dummy charging station. Therefore, the path might
                    # be infeasible even though one could reach it with a full
                    # battery, because charging is not possible at dummy
                    # stations.
                    #
                    # That means, the SoC and thereby the range is restricted
                    # to the SoC at the last cs (soc_last_cs).
markn92's avatar
markn92 committed
                    pass
                else:
                    if l_new == l_uns[n].peak_min():
markn92's avatar
markn92 committed
                        prio_queue.insert(n, l_new.key)
markn92's avatar
markn92 committed


def _create_entry_label(
        G: nx.Graph,
        charging_stations: set,
        s: Node,
        initial_soc: SoC,
        capacity: SoC) -> Label:
    """
    Create dummy charging station with initial soc as constant charging
    function.

    :param G: Graph
    :param charging_stations: Set of charging stations in Graph G
    :param s: Starting Node
    :param initial_soc: Initial SoC at beginng of the route
    :param capacity: The restricting battery capacity
    :return: Label for the starting Node
    """
    dummy_node: Node = len(G.nodes)

    # Charging coefficient 0 indicates dummy node
    G.add_node(dummy_node, c=0)
    charging_stations.add(dummy_node)

    # Register dummy charging station as the last
    # seen charging station before s.
    return Label(
        t_trip=0,
        soc_last_cs=initial_soc,
        last_cs=dummy_node,
        soc_profile_cs_v=soc_profile_factory(G, capacity, s)
    )


def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool:
    """Check, if possible to traverse path at least with full battery."""
    return not soc_profile(capacity) == -inf


def _update_priority_queue(
markn92's avatar
markn92 committed
        prio_queue: PriorityQueue,
markn92's avatar
markn92 committed
        l_uns: Dict[int, LabelPriorityQueue],
        node: Node):
    """
    Update key of a node the priority queue according to
    its minimum label.
    """
    try:
        minimum_label = l_uns[node].peak_min()
    except KeyError:
        # l_uns[v] empty
markn92's avatar
markn92 committed
        prio_queue.delete_min()
markn92's avatar
markn92 committed
    else:
markn92's avatar
markn92 committed
        prio_queue.insert(node, minimum_label.key)