From 5c664e3432ee449b6c366dcc297368533417d6c2 Mon Sep 17 00:00:00 2001 From: "niehues.mark@gmail.com" <niehues.mark@gmail.com> Date: Wed, 18 Mar 2020 16:05:05 +0100 Subject: [PATCH] little cleanup --- evrouting/charge/T.py | 20 +++++ evrouting/charge/factories.py | 70 +++++++++++++-- evrouting/charge/routing.py | 155 ++++++++++++++++++---------------- evrouting/charge/utils.py | 29 ------- 4 files changed, 168 insertions(+), 106 deletions(-) diff --git a/evrouting/charge/T.py b/evrouting/charge/T.py index 292e6b3..e85d78b 100644 --- a/evrouting/charge/T.py +++ b/evrouting/charge/T.py @@ -134,6 +134,26 @@ class ChargingFunction: return cf_inverse + def __lt__(self, other) -> bool: + """Comparison for dominance check.""" + return self.c < other.c + + def __le__(self, other) -> bool: + """Comparison for dominance check.""" + return self.c <= other.c + + def __eq__(self, other) -> bool: + """Comparison for dominance check.""" + return self.c == other.c + + def __ge__(self, other): + """Comparison for dominance check.""" + return self.c >= other.c + + def __gt__(self, other): + """Comparison for dominance check.""" + return self.c > other.c + class Label(NamedTuple): """ diff --git a/evrouting/charge/factories.py b/evrouting/charge/factories.py index be8927e..5d60978 100644 --- a/evrouting/charge/factories.py +++ b/evrouting/charge/factories.py @@ -1,11 +1,12 @@ import networkx as nx +from typing import Dict -from .T import SoCProfile, ChargingFunction -from ..T import Node, SoC -from ..graph_tools import charging_cofficient, consumption +from .T import SoCProfile, SoCFunction, ChargingFunction, Label +from ..T import Node, SoC, Time +from ..graph_tools import charging_cofficient, consumption, distance -def charging_function( +def charging_function_factory( G: nx.Graph, n: Node, capacity: SoC, @@ -14,7 +15,7 @@ def charging_function( return ChargingFunction(charging_cofficient(G, n), capacity, initial_soc) -def soc_profile( +def soc_profile_factory( G: nx.Graph, capacity: SoC, u: Node, @@ -28,3 +29,62 @@ def soc_profile( """ path_cost = 0 if v is None else consumption(G, u, v) return SoCProfile(path_cost, capacity) + + +class ChargingFunctionMap: + """Maps Nodes to their charging functions.""" + + def __init__(self, G: nx.Graph, capacity: SoC, initial_soc: SoC = None): + self.map: Dict[Node, ChargingFunction] = {} + self.G: nx.Graph = G + self.capacity: SoC = capacity + self.initial_soc: SoC = initial_soc + + def __getitem__(self, node: Node) -> ChargingFunction: + """ + Try to get charging function from cache, + else create function and add to cache. + """ + try: + cf = self.map[node] + except KeyError: + cf = charging_function_factory( + G=self.G, + n=node, + capacity=self.capacity, + initial_soc=self.initial_soc + ) + self.map[node] = cf + + return cf + + +class LabelsFactory: + + def __init__(self, + G: nx.Graph, + capacity: SoC, + cf: ChargingFunctionMap, + initial_soc: SoC = None): + self.G: nx.Graph = G + self.capacity: SoC = capacity + self.cf: ChargingFunctionMap = cf + self.initial_soc: SoC = initial_soc + + def spawn_label(self, current_node: Node, current_label: Label): + # Only charge the minimum at the last charge station + # and continue charging at this station. + soc_function: SoCFunction = SoCFunction( + current_label, self.cf[current_label.last_cs] + ) + + t_trip_old = current_label.t_trip + t_charge: Time = soc_function.minimum - t_trip_old + + return Label( + t_trip=t_trip_old + t_charge, + soc_last_cs=soc_function(t_trip_old + t_charge), + last_cs=current_node, + soc_profile_cs_v=soc_profile_factory( + self.G, self.capacity, current_node) + ) diff --git a/evrouting/charge/routing.py b/evrouting/charge/routing.py index 28eda64..eb1a24b 100644 --- a/evrouting/charge/routing.py +++ b/evrouting/charge/routing.py @@ -2,13 +2,15 @@ from typing import Dict from math import inf import networkx as nx -from evrouting.T import Node, SoC, Time +from evrouting.T import Node, SoC from evrouting.utils import PriorityQueue -from evrouting.charge.factories import soc_profile as soc_profile_factory +from evrouting.charge.factories import LabelsFactory, ChargingFunctionMap, soc_profile_factory from ..graph_tools import distance -from .T import SoCFunction, Label -from .utils import LabelPriorityQueue, ChargingFunctionMap +from .T import SoCFunction, SoCProfile, Label +from .utils import LabelPriorityQueue + +__all__ = ['shortest_path'] def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, @@ -25,34 +27,18 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, :return: """ cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc) + label_factory = LabelsFactory(G, capacity, cf, initial_soc) - q = PriorityQueue() l_set: Dict[int, set] = {v: set() for v in G} - l_uns: Dict[int, LabelPriorityQueue] = { - v: LabelPriorityQueue() for v in G - } + l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue() for v in G} - # Dummy vertex without incident edges that is (temporarily) added to G - dummy_node: Node = len(G.nodes) - # Charging coefficient 0 indicates dummy node - G.add_node(dummy_node, c=0) - charging_stations.add(dummy_node) - - l: Label = Label( - t_trip=0, - soc_last_cs=initial_soc, - last_cs=dummy_node, - soc_profile_cs_v=soc_profile_factory(G, capacity, s) - ) - - l_uns[s].insert( - l, - cf[l.last_cs] - ) + # Init environment + entry_label = _create_entry_label(G, charging_stations, s, initial_soc, capacity) + l_uns[s].insert(entry_label, cf[entry_label.last_cs]) + q = PriorityQueue() q.insert(s, 0) - # run main loop while True: try: minimum_node: Node = q.peak_min() @@ -64,65 +50,35 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, l_set[minimum_node].add(label_minimum_node) if minimum_node == t: - return SoCFunction( - label_minimum_node, - cf[label_minimum_node.last_cs] - ).minimum + return SoCFunction(label_minimum_node, + cf[label_minimum_node.last_cs] + ).minimum # handle charging stations if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs: - cf_last_cs = cf[label_minimum_node.last_cs] - cf_minimum_node = cf[minimum_node] - - if cf_minimum_node.c > cf_last_cs.c: - # Only charge the minimum at the last charge station - # and continue charging at this station. - old_soc_function: SoCFunction = SoCFunction( - label_minimum_node, cf_last_cs - ) - t_trip_old = label_minimum_node.t_trip - t_charge: Time = old_soc_function.minimum - t_trip_old - - label_new = Label( - t_trip=t_trip_old + t_charge, - soc_last_cs=old_soc_function(t_trip_old + t_charge), - last_cs=minimum_node, - soc_profile_cs_v=soc_profile_factory( - G, capacity, minimum_node - ) - ) - l_uns[minimum_node].insert( - label_new, - cf_minimum_node - ) + if cf[minimum_node] > cf[label_minimum_node.last_cs]: + label_new = label_factory.spawn_label(minimum_node, label_minimum_node) + l_uns[minimum_node].insert(label_new, cf[minimum_node]) - # update priority queue - try: - label_minimum_node = l_uns[minimum_node].peak_min() - except KeyError: - # l_uns[v] empty - q.delete_min() - else: - q.insert(minimum_node, label_minimum_node.key) + # Update priority queue. This node might have gotten a new + # minimum label spawned is th previous step. + _update_priority_queue(q, l_uns, minimum_node) # scan outgoing arcs for n in G.neighbors(minimum_node): # Create SoC Profile for getting from minimum_node to n soc_profile = label_minimum_node.soc_profile_cs_v + \ soc_profile_factory(G, capacity, minimum_node, n) - if not soc_profile(capacity) == -inf: - # It is possible to get from minimum_node to n + + if _is_feasible_path(soc_profile, capacity): l_new = Label( - label_minimum_node.t_trip + distance(G, minimum_node, n), - label_minimum_node.soc_last_cs, - label_minimum_node.last_cs, - soc_profile + t_trip=label_minimum_node.t_trip + distance(G, minimum_node, n), + soc_last_cs=label_minimum_node.soc_last_cs, + last_cs=label_minimum_node.last_cs, + soc_profile_cs_v=soc_profile ) try: - l_uns[n].insert( - l_new, - cf[l_new.last_cs] - ) + l_uns[n].insert(l_new, cf[l_new.last_cs]) except ValueError: # Infeasible because last_cs might be an # dummy charging station. Therefore, the path might @@ -136,3 +92,58 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, else: if l_new == l_uns[n].peak_min(): q.insert(n, l_new.key) + + +def _create_entry_label( + G: nx.Graph, + charging_stations: set, + s: Node, + initial_soc: SoC, + capacity: SoC) -> Label: + """ + Create dummy charging station with initial soc as constant charging + function. + + :param G: Graph + :param charging_stations: Set of charging stations in Graph G + :param s: Starting Node + :param initial_soc: Initial SoC at beginng of the route + :param capacity: The restricting battery capacity + :return: Label for the starting Node + """ + dummy_node: Node = len(G.nodes) + + # Charging coefficient 0 indicates dummy node + G.add_node(dummy_node, c=0) + charging_stations.add(dummy_node) + + # Register dummy charging station as the last + # seen charging station before s. + return Label( + t_trip=0, + soc_last_cs=initial_soc, + last_cs=dummy_node, + soc_profile_cs_v=soc_profile_factory(G, capacity, s) + ) + + +def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool: + """Check, if possible to traverse path at least with full battery.""" + return not soc_profile(capacity) == -inf + + +def _update_priority_queue( + q: PriorityQueue, + l_uns: Dict[int, LabelPriorityQueue], + node: Node): + """ + Update key of a node the priority queue according to + its minimum label. + """ + try: + minimum_label = l_uns[node].peak_min() + except KeyError: + # l_uns[v] empty + q.delete_min() + else: + q.insert(node, minimum_label.key) diff --git a/evrouting/charge/utils.py b/evrouting/charge/utils.py index 890e9b1..0ebd923 100644 --- a/evrouting/charge/utils.py +++ b/evrouting/charge/utils.py @@ -1,11 +1,8 @@ -from typing import Dict from math import inf -import networkx as nx from evrouting.utils import PriorityQueue from evrouting.T import SoC, Time, Node -from .factories import charging_function from .T import Label, SoCFunction, ChargingFunction @@ -32,29 +29,3 @@ class LabelPriorityQueue(PriorityQueue): ) -class ChargingFunctionMap: - """Maps Nodes to their charging functions.""" - - def __init__(self, G: nx.Graph, capacity: SoC, initial_soc: SoC = None): - self.map: Dict[Node, ChargingFunction] = {} - self.G: nx.Graph = G - self.capacity: SoC = capacity - self.initial_soc: SoC = initial_soc - - def __getitem__(self, node: Node) -> ChargingFunction: - """ - Try to get charging function from cache, - else create function and add to cache. - """ - try: - cf = self.map[node] - except KeyError: - cf = charging_function( - G=self.G, - n=node, - capacity=self.capacity, - initial_soc=self.initial_soc - ) - self.map[node] = cf - - return cf -- GitLab