Skip to content
Snippets Groups Projects
routing.py 2.15 KiB
Newer Older
markn92's avatar
markn92 committed
from typing import List
from math import inf

markn92's avatar
markn92 committed
import networkx as nx
markn92's avatar
markn92 committed
from evrouting.T import Node, SoC, Time, ChargingCoefficient
from evrouting.utils import PriorityQueue

from .T import SoCProfile, ChargingFunction, Label
markn92's avatar
markn92 committed


markn92's avatar
markn92 committed
def shortest_path(G: nx.Graph, S: set, s: Node, t: Node, beta_s: SoC, beta_t: SoC, U: SoC):
markn92's avatar
markn92 committed
    """
    Calculates shortest path using the CHarge algorithm.

    :param G: Input Graph
    :param s: Start Node identifier
    :param t: End Node identifier
markn92's avatar
markn92 committed
    :param beta_s: Start SoC
    :param beta_t: End SoC
markn92's avatar
markn92 committed
    :param U: Capacity
    :return:
    """
markn92's avatar
markn92 committed
    q = PriorityQueue()
    l_set = {v: set() for v in G}
    l_uns = {v: PriorityQueue() for v in G}

    # Dummy vertex without incident edges that is (temporarily) added to G
    v_0: Node = Node(len(G.nodes))
    G.add_node(v_0)

    S.add(v_0)

    cf_v_0 = [(0, beta_s)]
    l_uns[s] = PriorityQueue()

    l = Label(0, beta_s, v_0, SoCProfile(G, U, s))
    l_uns[s].insert(item=l, priority=key(l))

    q.insert(s, 0)

    # run main loop
    while True:
        try:
            v = q.peak_min()
        except KeyError:
            # empty queue
            break

        l = l_uns[v].delete_min()
        l_set[v].add(l)

        if v == t:
            return ChargingFunction(G, l).get_minimum()

        # handle charging stations
        t_trip, beta_u, u, b_u_v = l
        if v in S and not v == u:
            # TODO !!!
            for t_charge in t_breaks(l):
                l_uns[v].insert(new_label(l), priority=)  # prio??

        # update priority queue
        if l_uns[v]:
            l_new = l_uns[v].peak_min()
            q.insert(v, key(l_new))
        else:
            q.delete_min()

        # scan outgoing arcs
        for x, y in G[v]:
            b_x_y = b_u_v + SoCProfile(G, U, x, y)
            if not b_x_y(beta_max_u) == -inf:
                l_new = (t_trip + G.edges[x, y]['weight'], beta_u, u, b_x_y)
                l_uns[y].insert(l_new)
                if l_new == l_uns[y].peak_min():
                    q.insert(y, key(l_new))


def key(l: Label) -> Time:
    return l.t_trip


def t_breaks(c_old: ChargingCoefficient, c_new: ChargingCoefficient) -> List[Time]:
markn92's avatar
markn92 committed
    pass