diff --git a/evrouting/charge/T.py b/evrouting/charge/T.py index dbf7c1d54662b4c42abf3503c3c041848a826d33..7ba45fbcbf9ad9c98a637570c9dc0a74e6f916bd 100644 --- a/evrouting/charge/T.py +++ b/evrouting/charge/T.py @@ -181,6 +181,11 @@ class Label(NamedTuple): soc_profile_cs_v: SoCProfile +class Breakpoint(NamedTuple): + t: Time + soc: SoC + + class SoCFunction: """ SoC Function of a node's label maps trip time plus an additional charging @@ -210,6 +215,18 @@ class SoCFunction: self.cf_cs: ChargingFunction = cf_cs + @property + def breakpoints(self): + breakpoints = [Breakpoint(self.minimum, 0)] + if not self.cf_cs.is_dummy: + breakpoints.append( + Breakpoint( + self.minimum + self.cf_cs.inverse(self.soc_profile_cs_v.out), + self.soc_profile_cs_v.out + ) + ) + return breakpoints + def __call__(self, t: Time) -> SoC: """ Maps a new trip time to a SoC at the current node. The new trip time @@ -236,7 +253,7 @@ class SoCFunction: This is either the trip time, or if energy needs to be charged at the previous charging station to traverse the path, trip time - plus charging time until the battery holds the minimum energie to + plus charging time until the battery holds the minimum energy to traverse the path to the current node (which is the cost of the path). This time is: diff --git a/evrouting/charge/factories.py b/evrouting/charge/factories.py index 3393a564dc254f00d9b7e19165d9d4e4dce625b2..3cdf2f8972b68a1f949e2094d43321cdb028519f 100644 --- a/evrouting/charge/factories.py +++ b/evrouting/charge/factories.py @@ -74,26 +74,21 @@ class LabelsFactory: def __init__(self, G: nx.Graph, capacity: SoC, - cf: ChargingFunctionMap, + f_soc: SoCFunctionMap, initial_soc: SoC = None): self.G: nx.Graph = G self.capacity: SoC = capacity - self.cf: ChargingFunctionMap = cf + self.f_soc: SoCFunctionMap = f_soc self.initial_soc: SoC = initial_soc - def spawn_label(self, current_node: Node, current_label: Label): + def spawn_label(self, current_node: Node, current_label: Label, t_charge: Time): # Only charge the minimum at the last charge station # and continue charging at this station. - soc_function: SoCFunction = SoCFunction( - current_label, self.cf[current_label.last_cs] - ) - - t_trip_old = current_label.t_trip - t_charge: Time = soc_function.minimum - t_trip_old + soc_function: SoCFunction = self.f_soc[current_label] return Label( - t_trip=t_trip_old + t_charge, - soc_last_cs=soc_function(t_trip_old + t_charge), + t_trip=current_label.t_trip + t_charge, + soc_last_cs=soc_function(current_label.t_trip + t_charge), last_cs=current_node, soc_profile_cs_v=soc_profile_factory( self.G, self.capacity, current_node) diff --git a/evrouting/charge/routing.py b/evrouting/charge/routing.py index b0ad483bfcf54ecba2935bd3d7e84e2c3dfb131c..7a63bbff94d25b23367ded2d95b2dd742c19c990 100644 --- a/evrouting/charge/routing.py +++ b/evrouting/charge/routing.py @@ -1,8 +1,8 @@ -from typing import Dict +from typing import Dict, List from math import inf import networkx as nx -from evrouting.T import Node, SoC +from evrouting.T import Node, SoC, Time from evrouting.utils import PriorityQueue from evrouting.charge.factories import ( LabelsFactory, @@ -12,7 +12,7 @@ from evrouting.charge.factories import ( ) from ..graph_tools import distance -from .T import SoCProfile, Label +from .T import SoCProfile, SoCFunction, Label from .utils import LabelPriorityQueue __all__ = ['shortest_path'] @@ -36,7 +36,7 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc) f_soc = SoCFunctionMap(cf) - label_factory = LabelsFactory(G, capacity, cf, initial_soc) + label_factory = LabelsFactory(G, capacity, f_soc, initial_soc) # Init maps to manage labels l_set: Dict[int, set] = {v: set() for v in G} @@ -68,9 +68,10 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, # handle charging stations if minimum_node in charging_stations and \ not minimum_node == label_minimum_node.last_cs: - if cf[minimum_node] > cf[label_minimum_node.last_cs]: + for t_charge in _calc_optimal_t_charge(cf, label_minimum_node, minimum_node, capacity): label_new = label_factory.spawn_label(minimum_node, - label_minimum_node) + label_minimum_node, + t_charge) l_uns[minimum_node].insert(label_new) # Update priority queue. This node might have gotten a new @@ -108,6 +109,22 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, prio_queue.insert(n, priority=key, count=count) +def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, capacity: SoC) -> List[Time]: + f_soc_breakpoints = SoCFunction(label_v, cf[label_v.last_cs]).breakpoints + t_charge = [] + + if cf[v] > cf[label_v.last_cs]: + # Faster charging station -> charge as soon as possible + t_charge.append(f_soc_breakpoints[0].t - label_v.t_trip) + elif f_soc_breakpoints[-1].soc < capacity: + # Slower charging station might still be dominating + # because the soc cannot be more than the full capacity + # decreased by the trip costs. This will be refilled at this station. + t_charge.append(f_soc_breakpoints[-1].t - label_v.t_trip) + + return t_charge + + def _key(label, f_soc): soc_function = f_soc[label] diff --git a/tests/charge/test_charge_routing.py b/tests/charge/test_charge_routing.py index 901335e3481c576a1a78612c7f8947a88c6de6d5..1f3a8f3df5674f1f77f52b2d9a013a3919ff31d9 100644 --- a/tests/charge/test_charge_routing.py +++ b/tests/charge/test_charge_routing.py @@ -55,7 +55,7 @@ class TestWithFinalSoC: conf['final_soc'] = 3 path = shortest_path(**conf) - assert path == 4 + assert path == 5 def test_shortest_path_no_charge_s_path_t(self): """No charging at s but initial soc."""