From 258a96eb750a370fcc58396e8cc1f07f718f06d5 Mon Sep 17 00:00:00 2001 From: "niehues.mark@gmail.com" <niehues.mark@gmail.com> Date: Thu, 19 Mar 2020 17:57:28 +0100 Subject: [PATCH] git rid of stupid replacement functions --- evrouting/charge/T.py | 22 ++++++- evrouting/charge/routing.py | 120 ++++++++++-------------------------- evrouting/charge/utils.py | 11 ++-- 3 files changed, 55 insertions(+), 98 deletions(-) diff --git a/evrouting/charge/T.py b/evrouting/charge/T.py index 0fffe01..2e813cb 100644 --- a/evrouting/charge/T.py +++ b/evrouting/charge/T.py @@ -1,4 +1,4 @@ -from typing import Callable, NamedTuple +from typing import Callable, NamedTuple, Union from math import inf from evrouting.T import SoC, Wh, ChargingCoefficient, Time, Node @@ -202,9 +202,9 @@ class SoCFunction: self.soc_profile_cs_v: SoCProfile = label.soc_profile_cs_v self.cf_cs: ChargingFunction = cf_cs + self.breakpoints = self.get_breakpoints() - @property - def breakpoints(self): + def get_breakpoints(self): breakpoints = [Breakpoint(self.minimum, 0)] if not self.cf_cs.is_dummy: breakpoints.append( @@ -233,6 +233,22 @@ class SoCFunction: self.cf_cs(t - self.t_trip, self.soc_last_cs) ) + def calc_optimal_t_charge(self, cs: ChargingFunction) -> Union[Time, None]: + capacity: SoC = self.soc_profile_cs_v.capacity + + t_charge = None + + if cs > self.cf_cs: + # Faster charging station -> charge as soon as possible + t_charge = self.breakpoints[0].t - self.t_trip + elif self.breakpoints[-1].soc < capacity: + # Slower charging station might still be dominating + # because the soc cannot be more than the full capacity + # decreased by the trip costs. This will be refilled at this station. + t_charge = self.breakpoints[-1].t - self.t_trip + + return t_charge + def __lt__(self, other: 'SoCFunction') -> bool: """Comparison for dominance check.""" for t_i, soc_i in self.breakpoints: diff --git a/evrouting/charge/routing.py b/evrouting/charge/routing.py index 58ee2be..d1322db 100644 --- a/evrouting/charge/routing.py +++ b/evrouting/charge/routing.py @@ -31,7 +31,13 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, :param capacity: :return: """ - t = _apply_final_constraints(G, t, final_soc) + # Add node that is only connected to the final node and takes no time + # to travel but consumes exactly the amount of energy that should be + # left at t (final_soc). The node becomes the new final node. + dummy_final_node: Node = len(G) + G.add_node(dummy_final_node) + G.add_edge(t, dummy_final_node, weight=0, c=final_soc) + t = dummy_final_node # Init factories cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc) @@ -42,10 +48,20 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, l_set: Dict[int, Set[Label]] = {v: set() for v in G} l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(f_soc_factory, l_set[v]) for v in G} - # Init environment - entry_label = _create_entry_label(G, charging_stations, - s, initial_soc, soc_profile_factory) - l_uns[s].insert(entry_label) + # Add dummy charging station with charging function + # cf(t) = initial_soc (ie charging coefficient is zero). + dummy_node: Node = len(G.nodes) + G.add_node(dummy_node, c=0) + charging_stations.add(dummy_node) + + # Register dummy charging station as the last + # seen charging station before s. + l_uns[s].insert(Label( + t_trip=0, + soc_last_cs=initial_soc, + last_cs=dummy_node, + soc_profile_cs_v=soc_profile_factory(s) + )) # A priority queue defines which node to visit next. # The key is the trip time. @@ -65,10 +81,7 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, if minimum_node in charging_stations and \ not minimum_node == label_minimum_node.last_cs: f_soc: SoCFunction = f_soc_factory(label_minimum_node) - t_charge = _calc_optimal_t_charge( - current_cs=cf_map[minimum_node], - f_soc=f_soc, - capacity=capacity) + t_charge = f_soc.calc_optimal_t_charge(cf_map[minimum_node]) if t_charge is not None: # Spawn new label at t_charge @@ -83,7 +96,14 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, # Update priority queue. This node might have gotten a new # minimum label spawned is th previous step. - _update_priority_queue(f_soc_factory, prio_queue, l_uns, minimum_node) + try: + prio_queue.insert( + item=minimum_node, + **keys(f_soc_factory(l_uns[minimum_node].peak_min())) + ) + except KeyError: + # l_uns[v] empty + prio_queue.delete_min() # scan outgoing arcs for n in G.neighbors(minimum_node): @@ -117,82 +137,4 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, continue if is_new_min_label: - prio_queue.insert(n, **keys(f_soc_factory, l_new)) - - -def _calc_optimal_t_charge(current_cs: ChargingFunction, - f_soc: SoCFunction, - capacity: SoC) -> Union[Time, None]: - f_soc_breakpoints = f_soc.breakpoints - t_charge = None - - if current_cs > f_soc.cf_cs: - # Faster charging station -> charge as soon as possible - t_charge = f_soc_breakpoints[0].t - f_soc.t_trip - elif f_soc_breakpoints[-1].soc < capacity: - # Slower charging station might still be dominating - # because the soc cannot be more than the full capacity - # decreased by the trip costs. This will be refilled at this station. - t_charge = f_soc_breakpoints[-1].t - f_soc.t_trip - - return t_charge - - -def _create_entry_label( - G: nx.Graph, - charging_stations: set, - s: Node, - initial_soc: SoC, - soc_profile_factory: SoCProfileFactory -) -> Label: - """ - Create dummy charging station with initial soc as constant charging - function. - - :param G: Graph - :param charging_stations: Set of charging stations in Graph G - :param s: Starting Node - :param initial_soc: Initial SoC at beginng of the route - :param capacity: The restricting battery capacity - :return: Label for the starting Node - """ - dummy_node: Node = len(G.nodes) - - # Charging coefficient 0 indicates dummy node - G.add_node(dummy_node, c=0) - charging_stations.add(dummy_node) - - # Register dummy charging station as the last - # seen charging station before s. - return Label( - t_trip=0, - soc_last_cs=initial_soc, - last_cs=dummy_node, - soc_profile_cs_v=soc_profile_factory(s) - ) - - -def _update_priority_queue( - f_soc: SoCFunctionFactory, - prio_queue: PriorityQueue, - l_uns: Dict[int, LabelPriorityQueue], - node: Node): - """ - Update key of a node the priority queue according to - its minimum label. - """ - try: - minimum_label: Label = l_uns[node].peak_min() - except KeyError: - # l_uns[v] empty - prio_queue.delete_min() - else: - prio_queue.insert(node, **keys(f_soc, minimum_label)) - - -def _apply_final_constraints(G: nx.Graph, t: Node, final_soc: SoC) -> Node: - temp_final_node = len(G) - G.add_node(temp_final_node) - G.add_edge(t, temp_final_node, weight=0, c=final_soc) - - return temp_final_node + prio_queue.insert(n, **keys(f_soc_factory(l_new))) diff --git a/evrouting/charge/utils.py b/evrouting/charge/utils.py index 23b3b61..6a58d69 100644 --- a/evrouting/charge/utils.py +++ b/evrouting/charge/utils.py @@ -4,7 +4,7 @@ from math import inf from evrouting.utils import PriorityQueue from evrouting.T import SoC, Time -from .T import Label +from .T import Label, SoCFunction from .factories import SoCFunctionFactory @@ -16,7 +16,7 @@ class LabelPriorityQueue(PriorityQueue): def insert(self, label: Label): """Breaking ties with lowest soc at t_min.""" - super().insert(item=label, **keys(self.f_soc_factory, label)) + super().insert(item=label, **keys(self.f_soc_factory(label))) if self.peak_min() == label: self.dominance_check() @@ -40,14 +40,13 @@ class LabelPriorityQueue(PriorityQueue): return -def keys(f_soc_factory: SoCFunctionFactory, label: Label) -> Dict: - soc_function = f_soc_factory(label) - t_min: Time = soc_function.minimum +def keys(f_soc: SoCFunction) -> Dict: + t_min: Time = f_soc.minimum # Might happen because of dummy charge stations if t_min == -inf: raise ValueError('Infeasible label.') - soc_min: SoC = soc_function(t_min) + soc_min: SoC = f_soc(t_min) return {'priority': t_min, 'count': soc_min} -- GitLab