diff --git a/evrouting/charge/T.py b/evrouting/charge/T.py index 292e6b3d8e43511bed7925812598e5c74718b5df..7ba45fbcbf9ad9c98a637570c9dc0a74e6f916bd 100644 --- a/evrouting/charge/T.py +++ b/evrouting/charge/T.py @@ -134,6 +134,26 @@ class ChargingFunction: return cf_inverse + def __lt__(self, other) -> bool: + """Comparison for dominance check.""" + return self.c < other.c + + def __le__(self, other) -> bool: + """Comparison for dominance check.""" + return self.c <= other.c + + def __eq__(self, other) -> bool: + """Comparison for dominance check.""" + return self.c == other.c + + def __ge__(self, other): + """Comparison for dominance check.""" + return self.c >= other.c + + def __gt__(self, other): + """Comparison for dominance check.""" + return self.c > other.c + class Label(NamedTuple): """ @@ -160,10 +180,10 @@ class Label(NamedTuple): last_cs: Node soc_profile_cs_v: SoCProfile - @property - def key(self): - """Key for sorting.""" - return self.t_trip + +class Breakpoint(NamedTuple): + t: Time + soc: SoC class SoCFunction: @@ -195,6 +215,18 @@ class SoCFunction: self.cf_cs: ChargingFunction = cf_cs + @property + def breakpoints(self): + breakpoints = [Breakpoint(self.minimum, 0)] + if not self.cf_cs.is_dummy: + breakpoints.append( + Breakpoint( + self.minimum + self.cf_cs.inverse(self.soc_profile_cs_v.out), + self.soc_profile_cs_v.out + ) + ) + return breakpoints + def __call__(self, t: Time) -> SoC: """ Maps a new trip time to a SoC at the current node. The new trip time @@ -221,7 +253,7 @@ class SoCFunction: This is either the trip time, or if energy needs to be charged at the previous charging station to traverse the path, trip time - plus charging time until the battery holds the minimum energie to + plus charging time until the battery holds the minimum energy to traverse the path to the current node (which is the cost of the path). This time is: diff --git a/evrouting/charge/factories.py b/evrouting/charge/factories.py index be8927eaad9ad551d9c4afce915fc0bd8dd2343e..3cdf2f8972b68a1f949e2094d43321cdb028519f 100644 --- a/evrouting/charge/factories.py +++ b/evrouting/charge/factories.py @@ -1,11 +1,12 @@ import networkx as nx +from typing import Dict -from .T import SoCProfile, ChargingFunction -from ..T import Node, SoC +from .T import SoCProfile, SoCFunction, ChargingFunction, Label +from ..T import Node, SoC, Time from ..graph_tools import charging_cofficient, consumption -def charging_function( +def charging_function_factory( G: nx.Graph, n: Node, capacity: SoC, @@ -14,7 +15,7 @@ def charging_function( return ChargingFunction(charging_cofficient(G, n), capacity, initial_soc) -def soc_profile( +def soc_profile_factory( G: nx.Graph, capacity: SoC, u: Node, @@ -28,3 +29,67 @@ def soc_profile( """ path_cost = 0 if v is None else consumption(G, u, v) return SoCProfile(path_cost, capacity) + + +class ChargingFunctionMap: + """Maps Nodes to their charging functions.""" + + def __init__(self, G: nx.Graph, capacity: SoC, initial_soc: SoC = None): + self.map: Dict[Node, ChargingFunction] = {} + self.G: nx.Graph = G + self.capacity: SoC = capacity + self.initial_soc: SoC = initial_soc + + def __getitem__(self, node: Node) -> ChargingFunction: + """ + Try to get charging function from cache, + else create function and add to cache. + """ + try: + cf = self.map[node] + except KeyError: + cf = charging_function_factory( + G=self.G, + n=node, + capacity=self.capacity, + initial_soc=self.initial_soc + ) + self.map[node] = cf + + return cf + + +class SoCFunctionMap: + """Maps Nodes to their charging functions.""" + + def __init__(self, cf: ChargingFunctionMap): + self.cf: ChargingFunctionMap = cf + + def __getitem__(self, label: Label) -> SoCFunction: + return SoCFunction(label, self.cf[label.last_cs]) + + +class LabelsFactory: + + def __init__(self, + G: nx.Graph, + capacity: SoC, + f_soc: SoCFunctionMap, + initial_soc: SoC = None): + self.G: nx.Graph = G + self.capacity: SoC = capacity + self.f_soc: SoCFunctionMap = f_soc + self.initial_soc: SoC = initial_soc + + def spawn_label(self, current_node: Node, current_label: Label, t_charge: Time): + # Only charge the minimum at the last charge station + # and continue charging at this station. + soc_function: SoCFunction = self.f_soc[current_label] + + return Label( + t_trip=current_label.t_trip + t_charge, + soc_last_cs=soc_function(current_label.t_trip + t_charge), + last_cs=current_node, + soc_profile_cs_v=soc_profile_factory( + self.G, self.capacity, current_node) + ) diff --git a/evrouting/charge/routing.py b/evrouting/charge/routing.py index 28eda64d2d3e0b91ed339fd471d9139d093a0f26..7a63bbff94d25b23367ded2d95b2dd742c19c990 100644 --- a/evrouting/charge/routing.py +++ b/evrouting/charge/routing.py @@ -1,14 +1,21 @@ -from typing import Dict +from typing import Dict, List from math import inf import networkx as nx from evrouting.T import Node, SoC, Time from evrouting.utils import PriorityQueue -from evrouting.charge.factories import soc_profile as soc_profile_factory +from evrouting.charge.factories import ( + LabelsFactory, + ChargingFunctionMap, + SoCFunctionMap, + soc_profile_factory +) from ..graph_tools import distance -from .T import SoCFunction, Label -from .utils import LabelPriorityQueue, ChargingFunctionMap +from .T import SoCProfile, SoCFunction, Label +from .utils import LabelPriorityQueue + +__all__ = ['shortest_path'] def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, @@ -16,46 +23,38 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, """ Calculates shortest path using the CHarge algorithm. - :param G: Input Graph - :param s: Start Node identifier - :param t: End Node identifier - :param beta_s: Start SoC - :param beta_t: End SoC - :param U: Capacity + :param G: + :param charging_stations: + :param s: + :param t: + :param initial_soc: + :param final_soc: + :param capacity: :return: """ + t = _apply_final_constraints(G, t, final_soc) + cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc) + f_soc = SoCFunctionMap(cf) + label_factory = LabelsFactory(G, capacity, f_soc, initial_soc) - q = PriorityQueue() + # Init maps to manage labels l_set: Dict[int, set] = {v: set() for v in G} - l_uns: Dict[int, LabelPriorityQueue] = { - v: LabelPriorityQueue() for v in G - } + l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(cf) for v in G} - # Dummy vertex without incident edges that is (temporarily) added to G - dummy_node: Node = len(G.nodes) - # Charging coefficient 0 indicates dummy node - G.add_node(dummy_node, c=0) - charging_stations.add(dummy_node) + # Init environment + entry_label = _create_entry_label(G, charging_stations, + s, initial_soc, capacity) + l_uns[s].insert(entry_label) - l: Label = Label( - t_trip=0, - soc_last_cs=initial_soc, - last_cs=dummy_node, - soc_profile_cs_v=soc_profile_factory(G, capacity, s) - ) - - l_uns[s].insert( - l, - cf[l.last_cs] - ) + # A priority queue defines which node to visit next. + # The key is the trip time. + prio_queue = PriorityQueue() + prio_queue.insert(s, priority=0, count=0) - q.insert(s, 0) - - # run main loop while True: try: - minimum_node: Node = q.peak_min() + minimum_node: Node = prio_queue.peak_min() except KeyError: # empty queue break @@ -64,65 +63,36 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, l_set[minimum_node].add(label_minimum_node) if minimum_node == t: - return SoCFunction( - label_minimum_node, - cf[label_minimum_node.last_cs] - ).minimum + return f_soc[label_minimum_node].minimum # handle charging stations - if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs: - cf_last_cs = cf[label_minimum_node.last_cs] - cf_minimum_node = cf[minimum_node] - - if cf_minimum_node.c > cf_last_cs.c: - # Only charge the minimum at the last charge station - # and continue charging at this station. - old_soc_function: SoCFunction = SoCFunction( - label_minimum_node, cf_last_cs - ) - t_trip_old = label_minimum_node.t_trip - t_charge: Time = old_soc_function.minimum - t_trip_old - - label_new = Label( - t_trip=t_trip_old + t_charge, - soc_last_cs=old_soc_function(t_trip_old + t_charge), - last_cs=minimum_node, - soc_profile_cs_v=soc_profile_factory( - G, capacity, minimum_node - ) - ) - l_uns[minimum_node].insert( - label_new, - cf_minimum_node - ) - - # update priority queue - try: - label_minimum_node = l_uns[minimum_node].peak_min() - except KeyError: - # l_uns[v] empty - q.delete_min() - else: - q.insert(minimum_node, label_minimum_node.key) + if minimum_node in charging_stations and \ + not minimum_node == label_minimum_node.last_cs: + for t_charge in _calc_optimal_t_charge(cf, label_minimum_node, minimum_node, capacity): + label_new = label_factory.spawn_label(minimum_node, + label_minimum_node, + t_charge) + l_uns[minimum_node].insert(label_new) + + # Update priority queue. This node might have gotten a new + # minimum label spawned is th previous step. + _update_priority_queue(f_soc, prio_queue, l_uns, minimum_node) # scan outgoing arcs for n in G.neighbors(minimum_node): # Create SoC Profile for getting from minimum_node to n soc_profile = label_minimum_node.soc_profile_cs_v + \ soc_profile_factory(G, capacity, minimum_node, n) - if not soc_profile(capacity) == -inf: - # It is possible to get from minimum_node to n + + if _is_feasible_path(soc_profile, capacity): l_new = Label( - label_minimum_node.t_trip + distance(G, minimum_node, n), - label_minimum_node.soc_last_cs, - label_minimum_node.last_cs, - soc_profile + t_trip=label_minimum_node.t_trip + distance(G, minimum_node, n), + soc_last_cs=label_minimum_node.soc_last_cs, + last_cs=label_minimum_node.last_cs, + soc_profile_cs_v=soc_profile ) try: - l_uns[n].insert( - l_new, - cf[l_new.last_cs] - ) + l_uns[n].insert(l_new) except ValueError: # Infeasible because last_cs might be an # dummy charging station. Therefore, the path might @@ -135,4 +105,95 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, pass else: if l_new == l_uns[n].peak_min(): - q.insert(n, l_new.key) + key, count = _key(l_new, f_soc) + prio_queue.insert(n, priority=key, count=count) + + +def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, capacity: SoC) -> List[Time]: + f_soc_breakpoints = SoCFunction(label_v, cf[label_v.last_cs]).breakpoints + t_charge = [] + + if cf[v] > cf[label_v.last_cs]: + # Faster charging station -> charge as soon as possible + t_charge.append(f_soc_breakpoints[0].t - label_v.t_trip) + elif f_soc_breakpoints[-1].soc < capacity: + # Slower charging station might still be dominating + # because the soc cannot be more than the full capacity + # decreased by the trip costs. This will be refilled at this station. + t_charge.append(f_soc_breakpoints[-1].t - label_v.t_trip) + + return t_charge + + +def _key(label, f_soc): + soc_function = f_soc[label] + + t_min = soc_function.minimum + soc_min = soc_function(t_min) + + return t_min, soc_min + + +def _create_entry_label( + G: nx.Graph, + charging_stations: set, + s: Node, + initial_soc: SoC, + capacity: SoC) -> Label: + """ + Create dummy charging station with initial soc as constant charging + function. + + :param G: Graph + :param charging_stations: Set of charging stations in Graph G + :param s: Starting Node + :param initial_soc: Initial SoC at beginng of the route + :param capacity: The restricting battery capacity + :return: Label for the starting Node + """ + dummy_node: Node = len(G.nodes) + + # Charging coefficient 0 indicates dummy node + G.add_node(dummy_node, c=0) + charging_stations.add(dummy_node) + + # Register dummy charging station as the last + # seen charging station before s. + return Label( + t_trip=0, + soc_last_cs=initial_soc, + last_cs=dummy_node, + soc_profile_cs_v=soc_profile_factory(G, capacity, s) + ) + + +def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool: + """Check, if possible to traverse path at least with full battery.""" + return not soc_profile(capacity) == -inf + + +def _update_priority_queue( + f_soc: SoCFunctionMap, + prio_queue: PriorityQueue, + l_uns: Dict[int, LabelPriorityQueue], + node: Node): + """ + Update key of a node the priority queue according to + its minimum label. + """ + try: + minimum_label: Label = l_uns[node].peak_min() + except KeyError: + # l_uns[v] empty + prio_queue.delete_min() + else: + key, count = _key(minimum_label, f_soc) + prio_queue.insert(node, priority=key, count=count) + + +def _apply_final_constraints(G: nx.Graph, t: Node, final_soc: SoC) -> Node: + temp_final_node = len(G) + G.add_node(temp_final_node) + G.add_edge(t, temp_final_node, weight=0, c=final_soc) + + return temp_final_node diff --git a/evrouting/charge/utils.py b/evrouting/charge/utils.py index 890e9b14105b1436f67780817e5670844fc5acab..376dd8337178a2836b2ed08302f7d3c09877c12f 100644 --- a/evrouting/charge/utils.py +++ b/evrouting/charge/utils.py @@ -1,20 +1,22 @@ -from typing import Dict from math import inf -import networkx as nx from evrouting.utils import PriorityQueue -from evrouting.T import SoC, Time, Node +from evrouting.T import SoC, Time -from .factories import charging_function -from .T import Label, SoCFunction, ChargingFunction +from .T import Label, SoCFunction +from .factories import ChargingFunctionMap class LabelPriorityQueue(PriorityQueue): - def insert(self, label: Label, cf: ChargingFunction): + def __init__(self, cf: ChargingFunctionMap): + super().__init__() + self.cf: ChargingFunctionMap = cf + + def insert(self, label: Label): """Breaking ties with lowest soc at t_min.""" soc_function = SoCFunction( label, - cf + self.cf[label.last_cs] ) t_min: Time = soc_function.minimum @@ -30,31 +32,3 @@ class LabelPriorityQueue(PriorityQueue): priority=t_min, count=soc_min ) - - -class ChargingFunctionMap: - """Maps Nodes to their charging functions.""" - - def __init__(self, G: nx.Graph, capacity: SoC, initial_soc: SoC = None): - self.map: Dict[Node, ChargingFunction] = {} - self.G: nx.Graph = G - self.capacity: SoC = capacity - self.initial_soc: SoC = initial_soc - - def __getitem__(self, node: Node) -> ChargingFunction: - """ - Try to get charging function from cache, - else create function and add to cache. - """ - try: - cf = self.map[node] - except KeyError: - cf = charging_function( - G=self.G, - n=node, - capacity=self.capacity, - initial_soc=self.initial_soc - ) - self.map[node] = cf - - return cf diff --git a/tests/charge/test_charge_routing.py b/tests/charge/test_charge_routing.py index 6d6a5a28166937fb2e985f86df6c635ebbbdd8de..1f3a8f3df5674f1f77f52b2d9a013a3919ff31d9 100644 --- a/tests/charge/test_charge_routing.py +++ b/tests/charge/test_charge_routing.py @@ -8,33 +8,60 @@ from ..config import ( ) -def test_shortest_path_charge_at_s_and_a(): - """Charging at s.""" - path = shortest_path(**init_config(edge_case)) +class TestRoutes: - assert path == 3.5 + def test_shortest_path_charge_at_s_and_a(self): + """Charging at s.""" + path = shortest_path(**init_config(edge_case)) + assert path == 3.5 -def test_shortest_path_charge_at_s_only(): - """Charging at s.""" - path = shortest_path(**init_config(edge_case_a_slow)) + def test_shortest_path_charge_at_s_only(self): + """Charging at s.""" + path = shortest_path(**init_config(edge_case_a_slow)) - assert path == 3 + assert path == 3 + def test_shortest_path_no_charge_s_path_t(self): + """No charging at s but enough initial SoC to go to t directly.""" + conf = init_config(edge_case_start_node_no_cs) + conf['initial_soc'] = 4 + path = shortest_path(**conf) -def test_shortest_path_no_charge_s_path_t(): - """No charging at s but enough initial SoC to go to t directly.""" - conf = init_config(edge_case_start_node_no_cs) - conf['initial_soc'] = 4 - path = shortest_path(**conf) + assert path == 1 - assert path == 1 + def test_shortest_path_no_charge_s_path_a(self): + """No charging at s but just enough SoC to go to t via a.""" + conf = init_config(edge_case_start_node_no_cs) + conf['initial_soc'] = 2 + path = shortest_path(**conf) + assert path == 2 -def test_shortest_path_no_charge_s_path_a(): - """No charging at s but just enough SoC to go to t via a.""" - conf = init_config(edge_case_start_node_no_cs) - conf['initial_soc'] = 2 - path = shortest_path(**conf) - assert path == 2 +class TestWithFinalSoC: + + def test_shortest_path_charge_at_s_and_a(self): + """Charging at s.""" + conf = init_config(edge_case) + conf['final_soc'] = 3 + path = shortest_path(**conf) + + assert path == 5 + + def test_shortest_path_charge_at_s_only(self): + """Charging at s and a to reach final_soc.""" + conf = init_config(edge_case_a_slow) + conf['final_soc'] = 3 + path = shortest_path(**conf) + + assert path == 5 + + def test_shortest_path_no_charge_s_path_t(self): + """No charging at s but initial soc.""" + conf = init_config(edge_case_start_node_no_cs) + conf['initial_soc'] = 4 + conf['final_soc'] = 3 + path = shortest_path(**conf) + + assert path == 2.5 diff --git a/tests/charge/test_utils.py b/tests/charge/test_utils.py index 3220019d3d0332f5241592e5cf6b938480108098..d5bb2a20b0a595a1bd0777f8defaa254b33aa755 100644 --- a/tests/charge/test_utils.py +++ b/tests/charge/test_utils.py @@ -6,8 +6,9 @@ from evrouting.charge.T import Label @pytest.fixture def q(label, ch_function): _, _, cf = ch_function - q = LabelPriorityQueue() - q.insert(label, cf) + dummy_cf = {label.last_cs: cf} + q = LabelPriorityQueue(dummy_cf) + q.insert(label) # create min label = Label( @@ -17,7 +18,8 @@ def q(label, ch_function): last_cs=1 ) - q.insert(label, cf) + dummy_cf[label.last_cs] = cf + q.insert(label) yield q del q @@ -39,4 +41,4 @@ class TestProrityQueue: _, _, cf = ch_function label = q.peak_min() q.remove_item(label) - q.insert(label, cf) + q.insert(label)