Skip to content
Snippets Groups Projects
Commit 35ac8209 authored by markn92's avatar markn92
Browse files

doku etc.

parent 93324eb0
No related branches found
No related tags found
No related merge requests found
from typing import Callable, NamedTuple, Union """Data structures for the algorithm."""
from typing import Callable, NamedTuple, Union, List
from math import inf from math import inf
from evrouting.T import SoC, Wh, ChargingCoefficient, Time, Node from evrouting.T import SoC, Wh, ChargingCoefficient, Time, Node
...@@ -162,6 +163,7 @@ class Label(NamedTuple): ...@@ -162,6 +163,7 @@ class Label(NamedTuple):
class Breakpoint(NamedTuple): class Breakpoint(NamedTuple):
"""Breakpoint describing a SoC Function."""
t: Time t: Time
soc: SoC soc: SoC
...@@ -194,17 +196,40 @@ class SoCFunction: ...@@ -194,17 +196,40 @@ class SoCFunction:
self.soc_profile_cs_v: SoCProfile = label.soc_profile_cs_v self.soc_profile_cs_v: SoCProfile = label.soc_profile_cs_v
self.cf_cs: ChargingFunction = cf_cs self.cf_cs: ChargingFunction = cf_cs
self.breakpoints = self.get_breakpoints()
def get_breakpoints(self): self.minimum = self._calc_minimum()
breakpoints = [Breakpoint(self.minimum, 0)] self.breakpoints = self._calc_breakpoints()
if not self.cf_cs.is_dummy:
breakpoints.append( def _calc_breakpoints(self) -> List[Breakpoint]:
"""
Since all charging functions are linear functions, every SoC Function
can be represented using not more than two breakpoints.
If the last charging station is a dummy station (that does not change
the battery's SoC), the only breakpoint is at the minimum trip time
where the battery's SoC will be the SoC at the last charging station
decreased by the costs of the path.
For regular charging stations holds, that the SoC at the minimum
feasible trip time to a node means, that the car will arrive with
zero capacity left. The maximum SoC at the arriving node, which does
not change after even longer trip times and thereby defines the other
breakpoint, is when the battery has been fully charged at the last
station.
Todo: Think about alternative ways of calculation, to minimize function
calls.
"""
if self.cf_cs.is_dummy:
breakpoints = [Breakpoint(self.minimum, self(self.minimum))]
else:
breakpoints = [
Breakpoint(self.minimum, 0),
Breakpoint( Breakpoint(
self.minimum + self.cf_cs.inverse(self.soc_profile_cs_v.out), self.minimum + self.cf_cs.inverse(self.soc_profile_cs_v.out),
self.soc_profile_cs_v.out self.soc_profile_cs_v.out
) )
) ]
return breakpoints return breakpoints
def __call__(self, t: Time) -> SoC: def __call__(self, t: Time) -> SoC:
...@@ -226,6 +251,46 @@ class SoCFunction: ...@@ -226,6 +251,46 @@ class SoCFunction:
) )
def calc_optimal_t_charge(self, cs: ChargingFunction) -> Union[Time, None]: def calc_optimal_t_charge(self, cs: ChargingFunction) -> Union[Time, None]:
"""
Calculates the optimal time to charge at the last charging station
given a charging function ```cs``` of another (the current) charging
station.
Given the fact, that all charging stations have linear charging
functions we have to seperate two cases:
1. The current charging stations performs better than the last one.
In this case, it is optimal to charge at the last stop
only the amount necessary to reach the current charging station which
is the minimum of the SoC Function::
t_charge = t_min(f)
2. Because of battery constraints, the battery's maximum capacity
at this station from charging at the previous station is given by the
battery's maximum capacity decreased by the costs of the path to
the current station.
Therefore (if the path cost is > 0), it is beneficial to charge
at the current charging station even if it charges slower than the
previous one.
The optimal procedure then is to charge the battery up to the maximum
at the previous station and continue charging at the current
station when the SoC Function reaches its maximum. In breakpoint
representation the maximum is given by the last breakpoint of the
piecewise linear SoC Function.
..Note:
The possibility not to charge is also kept in the algorithm
since the according label is not thrown away.
:param cs: Charging function of the current charging station.
:return: None if it is optimal not to charge else the charging time.
"""
capacity: SoC = self.soc_profile_cs_v.capacity capacity: SoC = self.soc_profile_cs_v.capacity
t_charge = None t_charge = None
...@@ -258,8 +323,7 @@ class SoCFunction: ...@@ -258,8 +323,7 @@ class SoCFunction:
return True return True
@property def _calc_minimum(self) -> Time:
def minimum(self) -> Time:
""" """
:returns: Least feasible trip time. That is, the minimum time when :returns: Least feasible trip time. That is, the minimum time when
the SoC functions yields a SoC value that is not -inf. the SoC functions yields a SoC value that is not -inf.
......
"""Module contains the main algorithm.""" """
Implementation of the CHArge algorithm [0] with two further constraints:
1. There are no negative path costs (ie no recurpation).
2. All charging stations have linear charging functions.
[0] https://dl.acm.org/doi/10.1145/2820783.2820826
"""
from typing import Dict, List, Tuple, Set from typing import Dict, List, Tuple, Set
from math import inf from math import inf
...@@ -7,7 +15,7 @@ from evrouting.T import Node, SoC ...@@ -7,7 +15,7 @@ from evrouting.T import Node, SoC
from evrouting.utils import PriorityQueue from evrouting.utils import PriorityQueue
from evrouting.graph_tools import distance from evrouting.graph_tools import distance
from evrouting.charge.T import SoCFunction, Label from evrouting.charge.T import SoCFunction, Label
from evrouting.charge.utils import LabelPriorityQueue, keys from evrouting.charge.utils import LabelPriorityQueue
from evrouting.charge.factories import ( from evrouting.charge.factories import (
ChargingFunctionMap, ChargingFunctionMap,
SoCFunctionFactory, SoCFunctionFactory,
...@@ -43,6 +51,9 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, ...@@ -43,6 +51,9 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
l_uns: Dict[int, LabelPriorityQueue] = queues['unsettled labels'] l_uns: Dict[int, LabelPriorityQueue] = queues['unsettled labels']
prio_queue: PriorityQueue = queues['priority queue'] prio_queue: PriorityQueue = queues['priority queue']
# Shortcut for key function
keys = LabelPriorityQueue.keys
while prio_queue: while prio_queue:
node_min: Node = prio_queue.peak_min() node_min: Node = prio_queue.peak_min()
......
"""Holding the queue structure for unsettled Labels."""
from typing import Any, Dict, List from typing import Any, Dict, List
from evrouting.utils import PriorityQueue from evrouting.utils import PriorityQueue
...@@ -7,24 +8,52 @@ from evrouting.charge.factories import SoCFunctionFactory ...@@ -7,24 +8,52 @@ from evrouting.charge.factories import SoCFunctionFactory
class LabelPriorityQueue(PriorityQueue): class LabelPriorityQueue(PriorityQueue):
"""
Implementation of a variant of priority queue to store the
***unsettled*** labels of a vertex and efficiently extract
the minimum label in the algorithm.
The priority of a label is the minimum feasible time of it's
according SoC Function. Tie breaker is the SoC at this time.
It maintains the invariant:
The queue is empty or the SoC Function of the minimum label
is not dominated by any SoC Function of a ***settled*** label.
"""
def __init__(self, f_soc: SoCFunctionFactory, l_set: List[Label]): def __init__(self, f_soc: SoCFunctionFactory, l_set: List[Label]):
"""
:param f_soc: SoC Function Factory to create SoC Functions of
inserted labels for testing the invariant.
:param l_set: Set of settled labels.
"""
super().__init__() super().__init__()
self.f_soc_factory: SoCFunctionFactory = f_soc self.f_soc_factory: SoCFunctionFactory = f_soc
self.l_set: List[Label] = l_set self.l_set: List[Label] = l_set
def insert(self, label: Label): def insert(self, label: Label):
"""Breaking ties with lowest soc at t_min.""" """Breaking ties with lowest soc at t_min."""
super().insert(item=label, **keys(self.f_soc_factory(label))) super().insert(item=label, **self.keys(self.f_soc_factory(label)))
# If the minimum element has changed, check the invariant.
if self.peak_min() == label: if self.peak_min() == label:
self.dominance_check() self.dominance_check()
def delete_min(self) -> Any: def delete_min(self) -> Any:
"""Delete and check the invariant."""
min_label = super().delete_min() min_label = super().delete_min()
self.dominance_check() self.dominance_check()
return min_label return min_label
def dominance_check(self): def dominance_check(self):
"""
Compare the SoC Function of the minimum label with all
SoC Functions of the already settled labels. If any settled label
dominates the minimum label, the minimum label is removed, since
it cannot lead to a better solution.
"""
try: try:
label: Label = self.peak_min() label: Label = self.peak_min()
except KeyError: except KeyError:
...@@ -36,8 +65,9 @@ class LabelPriorityQueue(PriorityQueue): ...@@ -36,8 +65,9 @@ class LabelPriorityQueue(PriorityQueue):
if any(self.f_soc_factory(label).dominates(soc) for label in self.l_set): if any(self.f_soc_factory(label).dominates(soc) for label in self.l_set):
self.remove_item(label) self.remove_item(label)
@staticmethod
def keys(f_soc: SoCFunction) -> Dict: def keys(f_soc: SoCFunction) -> Dict:
t_min: Time = f_soc.minimum """Return the keys for insertion. See class description."""
soc_min: SoC = f_soc(t_min) t_min: Time = f_soc.minimum
return {'priority': t_min, 'count': soc_min} soc_min: SoC = f_soc(t_min)
return {'priority': t_min, 'count': soc_min}
...@@ -51,7 +51,7 @@ class TestSoCFunction: ...@@ -51,7 +51,7 @@ class TestSoCFunction:
# Needs to charge 2 Wh because path has costs # Needs to charge 2 Wh because path has costs
# of 2. Charging takes additional 2 Wh / charging_coefficient # of 2. Charging takes additional 2 Wh / charging_coefficient
# of time compared to the sole trip time. # of time compared to the sole trip time.
assert soc_function.minimum == \ assert soc_function._calc_minimum() == \
2 / soc_function.cf_cs.c + soc_function.t_trip 2 / soc_function.cf_cs.c + soc_function.t_trip
def test_minimum_with_and_soc_charge(self, soc_function): def test_minimum_with_and_soc_charge(self, soc_function):
...@@ -61,7 +61,7 @@ class TestSoCFunction: ...@@ -61,7 +61,7 @@ class TestSoCFunction:
# Needs to charge 1 Wh because path has costs # Needs to charge 1 Wh because path has costs
# of 2. Charging takes additional 1 Wh / charging_coefficient # of 2. Charging takes additional 1 Wh / charging_coefficient
# of time compared to the sole trip time. # of time compared to the sole trip time.
assert soc_function.minimum == \ assert soc_function._calc_minimum() == \
1 / soc_function.cf_cs.c + soc_function.t_trip 1 / soc_function.cf_cs.c + soc_function.t_trip
def test_below_t_trip(self, soc_function): def test_below_t_trip(self, soc_function):
...@@ -109,7 +109,7 @@ class TestSoCFunction: ...@@ -109,7 +109,7 @@ class TestSoCFunction:
soc_function.cf_cs.c = 0 soc_function.cf_cs.c = 0
soc_function.soc_last_cs = 1 soc_function.soc_last_cs = 1
assert soc_function.minimum == -inf assert soc_function._calc_minimum() == -inf
class TestChargingFunction: class TestChargingFunction:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment