Skip to content
Snippets Groups Projects
routing.py 4.71 KiB
Newer Older
markn92's avatar
markn92 committed
from typing import Dict
markn92's avatar
markn92 committed
from math import inf

markn92's avatar
markn92 committed
import networkx as nx
markn92's avatar
markn92 committed
from evrouting.T import Node, SoC, Time
markn92's avatar
markn92 committed
from evrouting.utils import PriorityQueue
markn92's avatar
markn92 committed
from evrouting.charge.factories import soc_profile as soc_profile_factory
markn92's avatar
markn92 committed
from ..graph_tools import distance
from .T import SoCFunction, Label
markn92's avatar
markn92 committed
from .utils import LabelPriorityQueue, ChargingFunctionMap
markn92's avatar
markn92 committed


markn92's avatar
markn92 committed
def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
                  initial_soc: SoC, final_soc: SoC, capacity: SoC):
markn92's avatar
markn92 committed
    """
    Calculates shortest path using the CHarge algorithm.

    :param G: Input Graph
    :param s: Start Node identifier
    :param t: End Node identifier
markn92's avatar
markn92 committed
    :param beta_s: Start SoC
    :param beta_t: End SoC
markn92's avatar
markn92 committed
    :param U: Capacity
    :return:
    """
markn92's avatar
markn92 committed
    cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)

markn92's avatar
markn92 committed
    q = PriorityQueue()
markn92's avatar
markn92 committed
    l_set: Dict[int, set] = {v: set() for v in G}
    l_uns: Dict[int, LabelPriorityQueue] = {
        v: LabelPriorityQueue() for v in G
    }
markn92's avatar
markn92 committed

    # Dummy vertex without incident edges that is (temporarily) added to G
markn92's avatar
markn92 committed
    dummy_node: Node = len(G.nodes)
    # Charging coefficient 0 indicates dummy node
    G.add_node(dummy_node, c=0)
    charging_stations.add(dummy_node)

    l: Label = Label(
        t_trip=0,
        soc_last_cs=initial_soc,
        last_cs=dummy_node,
markn92's avatar
markn92 committed
        soc_profile_cs_v=soc_profile_factory(G, capacity, s)
markn92's avatar
markn92 committed
    )

    l_uns[s].insert(
        l,
markn92's avatar
markn92 committed
        cf[l.last_cs]
markn92's avatar
markn92 committed
    )
markn92's avatar
markn92 committed

    q.insert(s, 0)

    # run main loop
    while True:
        try:
markn92's avatar
markn92 committed
            minimum_node: Node = q.peak_min()
markn92's avatar
markn92 committed
        except KeyError:
            # empty queue
            break

markn92's avatar
markn92 committed
        label_minimum_node: Label = l_uns[minimum_node].delete_min()
        l_set[minimum_node].add(label_minimum_node)
markn92's avatar
markn92 committed
        if minimum_node == t:
            return SoCFunction(
                label_minimum_node,
markn92's avatar
markn92 committed
                cf[label_minimum_node.last_cs]
markn92's avatar
markn92 committed
            ).minimum
markn92's avatar
markn92 committed

        # handle charging stations
markn92's avatar
markn92 committed
        if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs:
markn92's avatar
markn92 committed
            cf_last_cs = cf[label_minimum_node.last_cs]
            cf_minimum_node = cf[minimum_node]
markn92's avatar
markn92 committed

            if cf_minimum_node.c > cf_last_cs.c:
                # Only charge the minimum at the last charge station
                # and continue charging at this station.
                old_soc_function: SoCFunction = SoCFunction(
                    label_minimum_node, cf_last_cs
                )
                t_trip_old = label_minimum_node.t_trip
                t_charge: Time = old_soc_function.minimum - t_trip_old

                label_new = Label(
                    t_trip=t_trip_old + t_charge,
                    soc_last_cs=old_soc_function(t_trip_old + t_charge),
                    last_cs=minimum_node,
markn92's avatar
markn92 committed
                    soc_profile_cs_v=soc_profile_factory(
markn92's avatar
markn92 committed
                        G, capacity, minimum_node
                    )
                )
                l_uns[minimum_node].insert(
                    label_new,
                    cf_minimum_node
                )
markn92's avatar
markn92 committed

        # update priority queue
markn92's avatar
markn92 committed
        try:
            label_minimum_node = l_uns[minimum_node].peak_min()
        except KeyError:
            # l_uns[v] empty
markn92's avatar
markn92 committed
            q.delete_min()
markn92's avatar
markn92 committed
        else:
markn92's avatar
markn92 committed
            q.insert(minimum_node, label_minimum_node.key)
markn92's avatar
markn92 committed

        # scan outgoing arcs
markn92's avatar
markn92 committed
        for n in G.neighbors(minimum_node):
            # Create SoC Profile for getting from minimum_node to n
            soc_profile = label_minimum_node.soc_profile_cs_v + \
markn92's avatar
markn92 committed
                          soc_profile_factory(G, capacity, minimum_node, n)
markn92's avatar
markn92 committed
            if not soc_profile(capacity) == -inf:
                # It is possible to get from minimum_node to n
                l_new = Label(
                    label_minimum_node.t_trip + distance(G, minimum_node, n),
                    label_minimum_node.soc_last_cs,
                    label_minimum_node.last_cs,
                    soc_profile
                )
                try:
                    l_uns[n].insert(
                        l_new,
markn92's avatar
markn92 committed
                        cf[l_new.last_cs]
markn92's avatar
markn92 committed
                    )
                except ValueError:
markn92's avatar
markn92 committed
                    # Infeasible because last_cs might be an
                    # dummy charging station. Therefore, the path might
                    # be infeasible even though one could reach it with a full
                    # battery, because charging is not possible at dummy
                    # stations.
                    #
                    # That means, the SoC and thereby the range is restricted
                    # to the SoC at the last cs (soc_last_cs).
markn92's avatar
markn92 committed
                    pass
                else:
                    if l_new == l_uns[n].peak_min():
markn92's avatar
markn92 committed
                        q.insert(n, l_new.key)