From 35ac8209ca39a20b610bcd0b1302b2c41f809766 Mon Sep 17 00:00:00 2001 From: "niehues.mark@gmail.com" <niehues.mark@gmail.com> Date: Fri, 20 Mar 2020 12:55:47 +0100 Subject: [PATCH] doku etc. --- evrouting/charge/T.py | 82 +++++++++++++++++++++--- evrouting/charge/routing.py | 15 ++++- evrouting/charge/utils.py | 42 ++++++++++-- tests/charge/test_soc_data_structures.py | 6 +- 4 files changed, 125 insertions(+), 20 deletions(-) diff --git a/evrouting/charge/T.py b/evrouting/charge/T.py index ff39183..6484fcc 100644 --- a/evrouting/charge/T.py +++ b/evrouting/charge/T.py @@ -1,4 +1,5 @@ -from typing import Callable, NamedTuple, Union +"""Data structures for the algorithm.""" +from typing import Callable, NamedTuple, Union, List from math import inf from evrouting.T import SoC, Wh, ChargingCoefficient, Time, Node @@ -162,6 +163,7 @@ class Label(NamedTuple): class Breakpoint(NamedTuple): + """Breakpoint describing a SoC Function.""" t: Time soc: SoC @@ -194,17 +196,40 @@ class SoCFunction: self.soc_profile_cs_v: SoCProfile = label.soc_profile_cs_v self.cf_cs: ChargingFunction = cf_cs - self.breakpoints = self.get_breakpoints() - def get_breakpoints(self): - breakpoints = [Breakpoint(self.minimum, 0)] - if not self.cf_cs.is_dummy: - breakpoints.append( + self.minimum = self._calc_minimum() + self.breakpoints = self._calc_breakpoints() + + def _calc_breakpoints(self) -> List[Breakpoint]: + """ + Since all charging functions are linear functions, every SoC Function + can be represented using not more than two breakpoints. + + If the last charging station is a dummy station (that does not change + the battery's SoC), the only breakpoint is at the minimum trip time + where the battery's SoC will be the SoC at the last charging station + decreased by the costs of the path. + + For regular charging stations holds, that the SoC at the minimum + feasible trip time to a node means, that the car will arrive with + zero capacity left. The maximum SoC at the arriving node, which does + not change after even longer trip times and thereby defines the other + breakpoint, is when the battery has been fully charged at the last + station. + + Todo: Think about alternative ways of calculation, to minimize function + calls. + """ + if self.cf_cs.is_dummy: + breakpoints = [Breakpoint(self.minimum, self(self.minimum))] + else: + breakpoints = [ + Breakpoint(self.minimum, 0), Breakpoint( self.minimum + self.cf_cs.inverse(self.soc_profile_cs_v.out), self.soc_profile_cs_v.out ) - ) + ] return breakpoints def __call__(self, t: Time) -> SoC: @@ -226,6 +251,46 @@ class SoCFunction: ) def calc_optimal_t_charge(self, cs: ChargingFunction) -> Union[Time, None]: + """ + Calculates the optimal time to charge at the last charging station + given a charging function ```cs``` of another (the current) charging + station. + + Given the fact, that all charging stations have linear charging + functions we have to seperate two cases: + + 1. The current charging stations performs better than the last one. + + In this case, it is optimal to charge at the last stop + only the amount necessary to reach the current charging station which + is the minimum of the SoC Function:: + + t_charge = t_min(f) + + 2. Because of battery constraints, the battery's maximum capacity + at this station from charging at the previous station is given by the + battery's maximum capacity decreased by the costs of the path to + the current station. + + Therefore (if the path cost is > 0), it is beneficial to charge + at the current charging station even if it charges slower than the + previous one. + + The optimal procedure then is to charge the battery up to the maximum + at the previous station and continue charging at the current + station when the SoC Function reaches its maximum. In breakpoint + representation the maximum is given by the last breakpoint of the + piecewise linear SoC Function. + + ..Note: + + The possibility not to charge is also kept in the algorithm + since the according label is not thrown away. + + + :param cs: Charging function of the current charging station. + :return: None if it is optimal not to charge else the charging time. + """ capacity: SoC = self.soc_profile_cs_v.capacity t_charge = None @@ -258,8 +323,7 @@ class SoCFunction: return True - @property - def minimum(self) -> Time: + def _calc_minimum(self) -> Time: """ :returns: Least feasible trip time. That is, the minimum time when the SoC functions yields a SoC value that is not -inf. diff --git a/evrouting/charge/routing.py b/evrouting/charge/routing.py index 02d74cc..1dae50c 100644 --- a/evrouting/charge/routing.py +++ b/evrouting/charge/routing.py @@ -1,4 +1,12 @@ -"""Module contains the main algorithm.""" +""" +Implementation of the CHArge algorithm [0] with two further constraints: + + 1. There are no negative path costs (ie no recurpation). + 2. All charging stations have linear charging functions. + +[0] https://dl.acm.org/doi/10.1145/2820783.2820826 + +""" from typing import Dict, List, Tuple, Set from math import inf @@ -7,7 +15,7 @@ from evrouting.T import Node, SoC from evrouting.utils import PriorityQueue from evrouting.graph_tools import distance from evrouting.charge.T import SoCFunction, Label -from evrouting.charge.utils import LabelPriorityQueue, keys +from evrouting.charge.utils import LabelPriorityQueue from evrouting.charge.factories import ( ChargingFunctionMap, SoCFunctionFactory, @@ -43,6 +51,9 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, l_uns: Dict[int, LabelPriorityQueue] = queues['unsettled labels'] prio_queue: PriorityQueue = queues['priority queue'] + # Shortcut for key function + keys = LabelPriorityQueue.keys + while prio_queue: node_min: Node = prio_queue.peak_min() diff --git a/evrouting/charge/utils.py b/evrouting/charge/utils.py index 217ed4c..a8060d5 100644 --- a/evrouting/charge/utils.py +++ b/evrouting/charge/utils.py @@ -1,3 +1,4 @@ +"""Holding the queue structure for unsettled Labels.""" from typing import Any, Dict, List from evrouting.utils import PriorityQueue @@ -7,24 +8,52 @@ from evrouting.charge.factories import SoCFunctionFactory class LabelPriorityQueue(PriorityQueue): + """ + Implementation of a variant of priority queue to store the + ***unsettled*** labels of a vertex and efficiently extract + the minimum label in the algorithm. + + The priority of a label is the minimum feasible time of it's + according SoC Function. Tie breaker is the SoC at this time. + + It maintains the invariant: + + The queue is empty or the SoC Function of the minimum label + is not dominated by any SoC Function of a ***settled*** label. + + """ + def __init__(self, f_soc: SoCFunctionFactory, l_set: List[Label]): + """ + :param f_soc: SoC Function Factory to create SoC Functions of + inserted labels for testing the invariant. + :param l_set: Set of settled labels. + """ super().__init__() self.f_soc_factory: SoCFunctionFactory = f_soc self.l_set: List[Label] = l_set def insert(self, label: Label): """Breaking ties with lowest soc at t_min.""" - super().insert(item=label, **keys(self.f_soc_factory(label))) + super().insert(item=label, **self.keys(self.f_soc_factory(label))) + # If the minimum element has changed, check the invariant. if self.peak_min() == label: self.dominance_check() def delete_min(self) -> Any: + """Delete and check the invariant.""" min_label = super().delete_min() self.dominance_check() return min_label def dominance_check(self): + """ + Compare the SoC Function of the minimum label with all + SoC Functions of the already settled labels. If any settled label + dominates the minimum label, the minimum label is removed, since + it cannot lead to a better solution. + """ try: label: Label = self.peak_min() except KeyError: @@ -36,8 +65,9 @@ class LabelPriorityQueue(PriorityQueue): if any(self.f_soc_factory(label).dominates(soc) for label in self.l_set): self.remove_item(label) - -def keys(f_soc: SoCFunction) -> Dict: - t_min: Time = f_soc.minimum - soc_min: SoC = f_soc(t_min) - return {'priority': t_min, 'count': soc_min} + @staticmethod + def keys(f_soc: SoCFunction) -> Dict: + """Return the keys for insertion. See class description.""" + t_min: Time = f_soc.minimum + soc_min: SoC = f_soc(t_min) + return {'priority': t_min, 'count': soc_min} diff --git a/tests/charge/test_soc_data_structures.py b/tests/charge/test_soc_data_structures.py index 60c0485..e6dc4cf 100644 --- a/tests/charge/test_soc_data_structures.py +++ b/tests/charge/test_soc_data_structures.py @@ -51,7 +51,7 @@ class TestSoCFunction: # Needs to charge 2 Wh because path has costs # of 2. Charging takes additional 2 Wh / charging_coefficient # of time compared to the sole trip time. - assert soc_function.minimum == \ + assert soc_function._calc_minimum() == \ 2 / soc_function.cf_cs.c + soc_function.t_trip def test_minimum_with_and_soc_charge(self, soc_function): @@ -61,7 +61,7 @@ class TestSoCFunction: # Needs to charge 1 Wh because path has costs # of 2. Charging takes additional 1 Wh / charging_coefficient # of time compared to the sole trip time. - assert soc_function.minimum == \ + assert soc_function._calc_minimum() == \ 1 / soc_function.cf_cs.c + soc_function.t_trip def test_below_t_trip(self, soc_function): @@ -109,7 +109,7 @@ class TestSoCFunction: soc_function.cf_cs.c = 0 soc_function.soc_last_cs = 1 - assert soc_function.minimum == -inf + assert soc_function._calc_minimum() == -inf class TestChargingFunction: -- GitLab