From 2d0e5f0eb100c37068fbbc45f6ba521996faaf04 Mon Sep 17 00:00:00 2001 From: markn92 <markn92@mi.fu-berlin.de> Date: Wed, 18 Mar 2020 11:13:10 +0000 Subject: [PATCH] Working np Algorithm --- evrouting/T.py | 11 +- evrouting/charge/T.py | 258 +++++++++++++++++++---- evrouting/charge/__init__.py | 2 +- evrouting/charge/factories.py | 30 +++ evrouting/charge/routing.py | 150 +++++++++---- evrouting/charge/utils.py | 29 +++ evrouting/graph_tools.py | 18 +- evrouting/utils.py | 15 +- tests/charge/__init__.py | 0 tests/charge/conftest.py | 43 ++++ tests/charge/test_charge_routing.py | 32 +++ tests/charge/test_soc_data_structures.py | 183 ++++++++++++++++ tests/charge/test_utils.py | 42 ++++ tests/config.py | 78 +++++-- tests/test_charge_routing.py | 18 -- 15 files changed, 780 insertions(+), 129 deletions(-) create mode 100644 evrouting/charge/factories.py create mode 100644 evrouting/charge/utils.py create mode 100644 tests/charge/__init__.py create mode 100644 tests/charge/conftest.py create mode 100644 tests/charge/test_charge_routing.py create mode 100644 tests/charge/test_soc_data_structures.py create mode 100644 tests/charge/test_utils.py delete mode 100644 tests/test_charge_routing.py diff --git a/evrouting/T.py b/evrouting/T.py index e95dbd2..6c19201 100644 --- a/evrouting/T.py +++ b/evrouting/T.py @@ -1,12 +1,15 @@ -from typing import Tuple, Union, NewType -from math import inf +from dataclasses import dataclass +from typing import Tuple, Union, NewType, Dict, Any Node = int Edge = Tuple[Node, Node] +NodeData = Dict[str, Any] +EdgeData = Dict[str, Any] + Wh = NewType('Wh', Union[float, int]) -SoC = NewType('SoC', Union[-inf, Wh]) +SoC = NewType('SoC', Wh) -ChargingCoefficient = float +ChargingCoefficient = Union[float, int, None] Time = Union[float, int] diff --git a/evrouting/charge/T.py b/evrouting/charge/T.py index 069c6cf..079bcfc 100644 --- a/evrouting/charge/T.py +++ b/evrouting/charge/T.py @@ -1,59 +1,241 @@ -from copy import copy -from collections import namedtuple +from typing import Callable, NamedTuple from math import inf -import networkx as nx - from evrouting.T import SoC, Wh, ChargingCoefficient, Time, Node -from evrouting.graph_tools import charging_cofficient, consumption -Label = namedtuple('Label', ['t_trip', 'beta_u', 'u', 'SoCProfile_u_v']) +class SoCProfile: + """ + The SoC Profile maps an initial SoC to the SoC after + traversing a path. -class ChargingFunction: + SoC profile of a path is fully described with two parameters: + - cost: Cost of going from u to v. + - out: Maximal SoC after passing the path from u to v. + """ - def __init__(self, G: nx.Graph, l: Label): - self.t_trip: Time = l.t_trip - self.beta_u: SoC = l.beta_u - self.u: Node = l.u - self.b_u_v: SoCProfile = l.SoCProfile_u_v - self.c_u: ChargingCoefficient = charging_cofficient(G, l.u) + def __init__(self, path_cost: Wh, capacity: SoC): + """ + Calculates the maximum SoC after traversing the path, which is + (for non-negative path costs) U - cost. - def __call__(self, t) -> SoC: - if t < self.t_trip: + :param path_cost: Cost of the path. The SoC Profile of + a single vertex has no cost. + :param capacity: The battery's capacity. + """ + self.capacity: SoC = capacity + self.path_cost: Wh = path_cost + self.out: SoC = capacity - self.path_cost + + def __call__(self, initial_soc: SoC) -> SoC: + """ + :param initial_soc: The initial SoC. + :returns: The SoC after traversing the path. + """ + if initial_soc < self.path_cost: return -inf + final_soc = initial_soc - self.path_cost + + return final_soc if final_soc < self.capacity else self.capacity - return self.beta_u(self.beta_u + self.c_u * (t - self.t_trip)) + def __add__(self, other: 'SoCProfile') -> 'SoCProfile': + """ + Combines to SoC Profiles. - def get_minimum(self) -> Time: - """TODO: Explain.""" - cost_p = self.b_u_v.cost - return max(self.t_trip, (cost_p - self.beta_u) / self.c_u + self.t_trip) + The for the combined SoC Profile, given a SoC Profile for a path a + and a path b, holds: + cost = cost_a + cost_b + out = U - cost -class SoCProfile: + :return: Combined SoC Profile. + """ + return SoCProfile(self.path_cost + other.path_cost, self.capacity) + + +class ChargingFunction: """ - Describe SoC profile with two parameters: - - cost: Cost of going from u to v. - - out: Maximal SoC after passing the path from u to v. + Charging functions map charging time to resulting SoCs. Since they are + monotonic, there exists also an inverse mapping SoC to charging time. + + Dummy Charging Station + ====================== + + Special case is a so called dummy charging station. This is the case + if the charging coefficient is 0. Then there is no actual charging and + the value of the charging station is always the same as the arrival SoC. + + A dummy charging station is necessary for initialization of the problem + to trigger spawning of new labels. """ - def __init__(self, G: nx.Graph, U: SoC, u: Node, v: Node = None): - if v is None: - self.cost: Wh = 0 - self.out: Wh = U + def __init__(self, c: ChargingCoefficient, capacity: SoC, + initial_soc: SoC = None): + """ + + :param c: The stations charging coefficient. If c=0, the this becomes + a so called dummy charging station, that exists to trigger label + creation in the algorithm. + :param capacity: The battery's capacity, respectively the maximum of + the charging function. + :param initial_soc: The SoC at arriving at the charging station. This is + optional for dummy charging stations, that do not actually charge: + + cf(b) = soc + + """ + self.c: ChargingCoefficient = c + self.capacity: SoC = capacity + self.initial_soc: SoC = initial_soc + + @property + def is_dummy(self) -> bool: + """Tell if function is a dummy function.""" + return self.c == 0 + + def __call__(self, t: Time, initial_soc: SoC = 0) -> SoC: + """ + :param t: Charging time + :param initial_soc: Initial SoC when starting to charge + :return: SoC after charging. + """ + if self.is_dummy: + if self.initial_soc is None: + raise ValueError( + 'Charging coefficient is 0 but no initial SoC given.' + ) + soc = self.initial_soc else: - self.cost: Wh = consumption(G, u, v) - self.out: Wh = U - self.cost + soc = initial_soc + self.c * t + + return soc if soc < self.capacity else self.capacity + + @property + def inverse(self) -> Callable[[SoC], Time]: + """ + :returns: Inverse charging function mapping SoC to charging + time (assuming initial SoC=0) + """ + if self.is_dummy: + raise ValueError('Dummy Carging function has no inverse.') + + c = self.c + capacity = self.capacity + + def cf_inverse(soc: SoC) -> Time: + if soc > capacity: + # Return max charge time + return capacity / c + elif soc < 0: + # Return min charge time + return 0 + + return soc / c + + return cf_inverse - def __call__(self, beta) -> SoC: - if beta < self.cost: + +class Label(NamedTuple): + """ + Label used for the Algorithm. + + The (paleto optimal) label where u=t is the + contains the minimum trip time necessary to get to t. + + The label of node v consists of a tuple: + + (t_trip, soc_last_cs, last_cs, soc_profile_cs_v) + + Where: + + :t_trip: Trip time to node v without charging time at the last + charging station. + :soc_last_cs: Arrival SoC at the last charging station. + :last_cs: (Node ID of) The last charging station. + :soc_profile_cs_v: The SoC Profile describing the costs going from + the charging station to current node v. + """ + t_trip: Time + soc_last_cs: SoC + last_cs: Node + soc_profile_cs_v: SoCProfile + + +class SoCFunction: + """ + SoC Function of a node's label maps trip time plus an additional charging + time at the last charging station to a SoC at the node. + + """ + + def __init__(self, label: Label, cf_cs: ChargingFunction): + """ + :param label: Label containing to a node. It has the fields: + + :t_trip: Trip time. Includes time to reach v from the last + charging station in the path but excludes charging at u. + :soc_last_cs: The SoC at reaching the last charging station + before any charging. + :last_cs: The last charging station in the path to current node. + :soc_profile_cs_v: The SoC Profile of the path cs -> v. + :cf_cs: Charging function of cs. + + :param cf_cs: The charging function of the last charging station. + """ + # Unpack label + self.t_trip: Time = label.t_trip + self.last_cs: Node = label.last_cs + self.soc_last_cs: SoC = label.soc_last_cs + self.soc_profile_cs_v: SoCProfile = label.soc_profile_cs_v + + self.cf_cs: ChargingFunction = cf_cs + + def __call__(self, t: Time) -> SoC: + """ + Maps a new trip time to a SoC at the current node. The new trip time + then includes and fixes charging time at the last charging station. + + :param t: t = trip time + charging at the last charging station + :return: SoC at current node at time t after spending t - t_trip at + the last charging station to charge the battery. + + """ + if t < self.t_trip: + # Impossible to reach the current node for times < trip time return -inf - return beta - self.cost - def __add__(self, other: 'SoCProfile') -> 'SoCProfile': - new = copy(self) - new.cost = self.cost + other.cost - new.out = self.out - other.cost + return self.soc_profile_cs_v( + self.cf_cs(t - self.t_trip, self.soc_last_cs) + ) + + @property + def minimum(self) -> Time: + """ + :returns: Least feasible trip time. That is, the minimum time when + the SoC functions yields a SoC value that is not -inf. + + This is either the trip time, or if energy needs to be charged + at the previous charging station to traverse the path, trip time + plus charging time until the battery holds the minimum energie to + traverse the path to the current node (which is the cost of the + path). This time is: + + t = t_trip + cf_cs^(-1)(cost_cs_v) - cf_cs^(-1)(soc_last_sc) + + Thus, for t_min holds: + + t_min = max(t_trip, t) + + """ + cost_p = self.soc_profile_cs_v.path_cost + + if cost_p > self.soc_last_cs: + if self.cf_cs.is_dummy: + # Not feasible. Dummy stations do not load. + return -inf + + return self.t_trip + \ + self.cf_cs.inverse(cost_p) - \ + self.cf_cs.inverse(self.soc_last_cs) - return new + return self.t_trip diff --git a/evrouting/charge/__init__.py b/evrouting/charge/__init__.py index 0e1bcfd..3133abe 100644 --- a/evrouting/charge/__init__.py +++ b/evrouting/charge/__init__.py @@ -1 +1 @@ -from .routing import shortest_path \ No newline at end of file +from .routing import shortest_path diff --git a/evrouting/charge/factories.py b/evrouting/charge/factories.py new file mode 100644 index 0000000..be8927e --- /dev/null +++ b/evrouting/charge/factories.py @@ -0,0 +1,30 @@ +import networkx as nx + +from .T import SoCProfile, ChargingFunction +from ..T import Node, SoC +from ..graph_tools import charging_cofficient, consumption + + +def charging_function( + G: nx.Graph, + n: Node, + capacity: SoC, + initial_soc: SoC = None) -> ChargingFunction: + """Create charging function of node.""" + return ChargingFunction(charging_cofficient(G, n), capacity, initial_soc) + + +def soc_profile( + G: nx.Graph, + capacity: SoC, + u: Node, + v: Node = None, +) -> SoCProfile: + """ + Return SoC Profile of the path from u to v. + + If no v is provided, the path of u is definded as no cost path. + + """ + path_cost = 0 if v is None else consumption(G, u, v) + return SoCProfile(path_cost, capacity) diff --git a/evrouting/charge/routing.py b/evrouting/charge/routing.py index c4809de..8935107 100644 --- a/evrouting/charge/routing.py +++ b/evrouting/charge/routing.py @@ -1,14 +1,18 @@ -from typing import List +from typing import Dict from math import inf import networkx as nx -from evrouting.T import Node, SoC, Time, ChargingCoefficient +from evrouting.T import Node, SoC, Time from evrouting.utils import PriorityQueue +from evrouting.charge import factories as factories -from .T import SoCProfile, ChargingFunction, Label +from ..graph_tools import distance +from .T import SoCFunction, Label +from .utils import LabelPriorityQueue -def shortest_path(G: nx.Graph, S: set, s: Node, t: Node, beta_s: SoC, beta_t: SoC, U: SoC): +def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, + initial_soc: SoC, final_soc: SoC, capacity: SoC): """ Calculates shortest path using the CHarge algorithm. @@ -21,64 +25,128 @@ def shortest_path(G: nx.Graph, S: set, s: Node, t: Node, beta_s: SoC, beta_t: So :return: """ q = PriorityQueue() - l_set = {v: set() for v in G} - l_uns = {v: PriorityQueue() for v in G} + l_set: Dict[int, set] = {v: set() for v in G} + l_uns: Dict[int, LabelPriorityQueue] = { + v: LabelPriorityQueue() for v in G + } # Dummy vertex without incident edges that is (temporarily) added to G - v_0: Node = Node(len(G.nodes)) - G.add_node(v_0) - - S.add(v_0) - - cf_v_0 = [(0, beta_s)] - l_uns[s] = PriorityQueue() - - l = Label(0, beta_s, v_0, SoCProfile(G, U, s)) - l_uns[s].insert(item=l, priority=key(l)) + dummy_node: Node = len(G.nodes) + # Charging coefficient 0 indicates dummy node + G.add_node(dummy_node, c=0) + charging_stations.add(dummy_node) + + l: Label = Label( + t_trip=0, + soc_last_cs=initial_soc, + last_cs=dummy_node, + soc_profile_cs_v=factories.soc_profile(G, capacity, s) + ) + + l_uns[s].insert( + l, + factories.charging_function(G, l.last_cs, capacity, initial_soc) + ) q.insert(s, 0) # run main loop while True: try: - v = q.peak_min() + minimum_node: Node = q.peak_min() except KeyError: # empty queue break - l = l_uns[v].delete_min() - l_set[v].add(l) + label_minimum_node: Label = l_uns[minimum_node].delete_min() + l_set[minimum_node].add(label_minimum_node) - if v == t: - return ChargingFunction(G, l).get_minimum() + if minimum_node == t: + return SoCFunction( + label_minimum_node, + factories.charging_function( + G, + label_minimum_node.last_cs, + capacity, + initial_soc + ) + ).minimum # handle charging stations - t_trip, beta_u, u, b_u_v = l - if v in S and not v == u: - # TODO !!! - for t_charge in t_breaks(l): - l_uns[v].insert(new_label(l), priority=) # prio?? + if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs: + cf_last_cs = factories.charging_function( + G, + label_minimum_node.last_cs, + capacity, + initial_soc # Use here in case cs is a dummy station + ) + cf_minimum_node = factories.charging_function( + G, + minimum_node, + capacity, + initial_soc # Use here in case cs is a dummy station + ) + + if cf_minimum_node.c > cf_last_cs.c: + # Only charge the minimum at the last charge station + # and continue charging at this station. + old_soc_function: SoCFunction = SoCFunction( + label_minimum_node, cf_last_cs + ) + t_trip_old = label_minimum_node.t_trip + t_charge: Time = old_soc_function.minimum - t_trip_old + + label_new = Label( + t_trip=t_trip_old + t_charge, + soc_last_cs=old_soc_function(t_trip_old + t_charge), + last_cs=minimum_node, + soc_profile_cs_v=factories.soc_profile( + G, capacity, minimum_node + ) + ) + l_uns[minimum_node].insert( + label_new, + cf_minimum_node + ) # update priority queue - if l_uns[v]: - l_new = l_uns[v].peak_min() - q.insert(v, key(l_new)) - else: + try: + label_minimum_node = l_uns[minimum_node].peak_min() + except KeyError: + # l_uns[v] empty q.delete_min() + else: + q.insert(minimum_node, key(label_minimum_node)) # scan outgoing arcs - for x, y in G[v]: - b_x_y = b_u_v + SoCProfile(G, U, x, y) - if not b_x_y(beta_max_u) == -inf: - l_new = (t_trip + G.edges[x, y]['weight'], beta_u, u, b_x_y) - l_uns[y].insert(l_new) - if l_new == l_uns[y].peak_min(): - q.insert(y, key(l_new)) + for n in G.neighbors(minimum_node): + # Create SoC Profile for getting from minimum_node to n + soc_profile = label_minimum_node.soc_profile_cs_v + \ + factories.soc_profile(G, capacity, minimum_node, n) + if not soc_profile(capacity) == -inf: + # It is possible to get from minimum_node to n + l_new = Label( + label_minimum_node.t_trip + distance(G, minimum_node, n), + label_minimum_node.soc_last_cs, + label_minimum_node.last_cs, + soc_profile + ) + try: + l_uns[n].insert( + l_new, + factories.charging_function( + G, + l_new.last_cs, + capacity, + initial_soc + ) + ) + except ValueError: + pass + else: + if l_new == l_uns[n].peak_min(): + q.insert(n, key(l_new)) def key(l: Label) -> Time: return l.t_trip - - -def t_breaks(c_old: ChargingCoefficient, c_new: ChargingCoefficient) -> List[Time]: - pass diff --git a/evrouting/charge/utils.py b/evrouting/charge/utils.py new file mode 100644 index 0000000..30ebdb2 --- /dev/null +++ b/evrouting/charge/utils.py @@ -0,0 +1,29 @@ +from math import inf + +from evrouting.utils import PriorityQueue +from evrouting.T import SoC, Time + +from .T import Label, SoCFunction, ChargingFunction + + +class LabelPriorityQueue(PriorityQueue): + def insert(self, label: Label, cf: ChargingFunction): + """Breaking ties with lowest soc at t_min.""" + soc_function = SoCFunction( + label, + cf + ) + + t_min: Time = soc_function.minimum + + # Might happen because of dummy charge stations + if t_min == -inf: + raise ValueError('Infeasible label.') + + soc_min: SoC = soc_function(t_min) + + super().insert( + item=label, + priority=t_min, + count=soc_min + ) diff --git a/evrouting/graph_tools.py b/evrouting/graph_tools.py index 2d92456..11338e3 100644 --- a/evrouting/graph_tools.py +++ b/evrouting/graph_tools.py @@ -1,17 +1,12 @@ -from typing import Dict, Tuple from collections import namedtuple import networkx as nx -from evrouting.T import Wh, ChargingCoefficient +from evrouting.T import Wh, ChargingCoefficient, Time, Node, NodeData, EdgeData TemplateEdge = namedtuple('Edge', ['u', 'v', 'distance', 'consumption']) -TemplateNode = namedtuple('Node', ['label', 'charging_coeff'], defaults=(None, None)) - -NodeData = Dict -EdgeData = Dict - -Node = int -Edge = Tuple[int, int] +TemplateNode = namedtuple( + 'Node', ['label', 'charging_coeff'], defaults=(None, None) +) def node_convert(n: TemplateNode) -> NodeData: @@ -26,5 +21,10 @@ def consumption(G: nx.Graph, u: Node, v: Node) -> Wh: return G.edges[u, v]['c'] +def distance(G: nx.Graph, u: Node, v: Node) -> Time: + return G.edges[u, v]['weight'] + + def charging_cofficient(G: nx.Graph, n: Node) -> ChargingCoefficient: return G.nodes[n]['c'] + diff --git a/evrouting/utils.py b/evrouting/utils.py index 8aa001f..1bd77e1 100644 --- a/evrouting/utils.py +++ b/evrouting/utils.py @@ -11,12 +11,17 @@ class PriorityQueue: self.entry_finder = {} # mapping of tasks to entries self.counter = itertools.count() # unique sequence count as tie break - def insert(self, item: Any, priority=0): + def insert(self, item: Any, priority: Any = 0, count: Any = None): """Add a new task or update the priority of an existing task""" if item in self.entry_finder: self.remove_item(item) - count = next(self.counter) - entry = [priority, count, item] + + # Additional manual set tie break + if count is not None: + entry = [priority, count, next(self.counter), item] + else: + entry = [priority, next(self.counter), item] + self.entry_finder[item] = entry heappush(self.pq, entry) @@ -28,7 +33,7 @@ class PriorityQueue: def delete_min(self) -> Any: """Remove and return the lowest priority task. Raise KeyError if empty.""" while self.pq: - priority, count, item = heappop(self.pq) + item = heappop(self.pq)[-1] if item is not self.REMOVED: del self.entry_finder[item] return item @@ -37,7 +42,7 @@ class PriorityQueue: def peak_min(self) -> Any: """Return minimum item without removing it from the queue.""" while self.pq: - priority, count, item = self.pq[0] + item = self.pq[0][-1] if item is not self.REMOVED: return item else: diff --git a/tests/charge/__init__.py b/tests/charge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/charge/conftest.py b/tests/charge/conftest.py new file mode 100644 index 0000000..7ad081b --- /dev/null +++ b/tests/charge/conftest.py @@ -0,0 +1,43 @@ +import pytest +from evrouting.charge.T import SoCProfile, SoCFunction, ChargingFunction, Label + + +@pytest.fixture +def soc_profile(): + cost = 1 + capacity = 4 + + return cost, capacity, SoCProfile(cost, capacity) + + +@pytest.fixture +def soc_profile_2(): + cost = 2 + capacity = 4 + + return cost, capacity, SoCProfile(cost, capacity) + + +@pytest.fixture +def ch_function(): + c = 2 + capacity = 4 + + return c, capacity, ChargingFunction(c, capacity) + + +@pytest.fixture +def label(soc_profile_2): + _, _, profile = soc_profile_2 + return Label( + t_trip=10, + soc_last_cs=2, + soc_profile_cs_v=profile, + last_cs=1 + ) + + +@pytest.fixture +def soc_function(label, ch_function): + _, _, cf = ch_function + return SoCFunction(label, cf) diff --git a/tests/charge/test_charge_routing.py b/tests/charge/test_charge_routing.py new file mode 100644 index 0000000..cd627ba --- /dev/null +++ b/tests/charge/test_charge_routing.py @@ -0,0 +1,32 @@ +from evrouting.charge import shortest_path + +from ..config import ( + edge_case, + edge_case_start_node_no_cs, + init_config +) + + +def test_shortest_path_charge_at_s(): + """Charging at s.""" + path = shortest_path(**init_config(edge_case)) + + assert path == 3.5 + + +def test_shortest_path_no_charge_s_path_t(): + """No charging at s but enough initial SoC to go to t directly.""" + conf = init_config(edge_case_start_node_no_cs) + conf['initial_soc'] = 4 + path = shortest_path(**conf) + + assert path == 1 + + +def test_shortest_path_no_charge_s_path_a(): + """No charging at s but just enough SoC to go to t via a.""" + conf = init_config(edge_case_start_node_no_cs) + conf['initial_soc'] = 2 + path = shortest_path(**conf) + + assert path == 2 diff --git a/tests/charge/test_soc_data_structures.py b/tests/charge/test_soc_data_structures.py new file mode 100644 index 0000000..60c0485 --- /dev/null +++ b/tests/charge/test_soc_data_structures.py @@ -0,0 +1,183 @@ +from math import inf + +import pytest +from evrouting.charge.T import SoCProfile, ChargingFunction + + +class TestSoCProfile: + def test_profile_limit(self, soc_profile): + cost, capacity, p = soc_profile + assert p(capacity) == capacity - cost + + def test_profile_above_limit(self, soc_profile): + cost, capacity, p = soc_profile + assert p(capacity + 10) == capacity + + def test_profile_under_limit(self, soc_profile): + cost, capacity, p = soc_profile + assert p(cost - 1) == -inf + + def test_profile_lower_limit(self, soc_profile): + cost, capacity, p = soc_profile + assert p(cost) == 0 + + def test_profile_within_limit(self, soc_profile): + cost, capacity, p = soc_profile + assert p(1.5) == 1.5 - cost + + def test_compound(self): + cost_a = 1 + cost_b = 3 + capacity = 2 + + p_a = SoCProfile(cost_a, capacity) + p_b = SoCProfile(cost_b, capacity) + + p = p_a + p_b + + assert p.path_cost == cost_a + cost_b + assert p.capacity == capacity + + +class TestSoCFunction: + def test_minimum(self, soc_function): + """Not necessary to charge so minimum is trip time.""" + assert soc_function.minimum == soc_function.t_trip + + def test_minimum_with_charge(self, soc_function): + """Empty battery at charge station.""" + soc_function.soc_last_cs = 0 + + # Needs to charge 2 Wh because path has costs + # of 2. Charging takes additional 2 Wh / charging_coefficient + # of time compared to the sole trip time. + assert soc_function.minimum == \ + 2 / soc_function.cf_cs.c + soc_function.t_trip + + def test_minimum_with_and_soc_charge(self, soc_function): + """Empty battery at charge station.""" + soc_function.soc_last_cs = 1 + + # Needs to charge 1 Wh because path has costs + # of 2. Charging takes additional 1 Wh / charging_coefficient + # of time compared to the sole trip time. + assert soc_function.minimum == \ + 1 / soc_function.cf_cs.c + soc_function.t_trip + + def test_below_t_trip(self, soc_function): + """For t < t_trip, the current vertex is not feasible.""" + assert soc_function(9) == -inf + + def test_t_trip(self, soc_function): + """ + Reaches the vertex without charging with the initial SoC 2 Wh + reduced by th cost 2 Wh = 0 Wh. + """ + assert soc_function(10) == 0 + + def test_above_t_trip(self, soc_function): + """Adds 1 h of charging to the case above.""" + assert soc_function(11) == 2 + + def test_below_t_trip_need_to_fill(self, soc_function): + """For t < t_trip, the current vertex is not feasible.""" + soc_function.soc_last_cs = 0 + assert soc_function(10) == -inf + + def test_t_trip_need_to_fill(self, soc_function): + """Needs to charge to reach the current vertex.""" + soc_function.soc_last_cs = 0 + t_fill = 2 / soc_function.cf_cs.c + assert soc_function(10 + t_fill) == 0. + + def test_above_t_trip_need_to_fill(self, soc_function): + """Adds 1 h of charging.""" + soc_function.soc_last_cs = 0 + t_fill = 2 / soc_function.cf_cs.c + assert soc_function(11 + t_fill) == 2 + + def test_dummy_feasible(self, soc_function): + """Cost < SoC at (dummy) Charging Station.""" + # Make dummy + soc_function.cf_cs.c = 0 + + assert soc_function.minimum == 10 + + def test_dummy_not_feasible(self, soc_function): + """Cost higher than soc at c.""" + # Make dummy + soc_function.cf_cs.c = 0 + soc_function.soc_last_cs = 1 + + assert soc_function.minimum == -inf + + +class TestChargingFunction: + def test_creation(self, ch_function): + c, capacity, cf = ch_function + assert not cf.is_dummy + + def test_above_limit(self, ch_function): + """Charge over capacity.""" + c, capacity, cf = ch_function + + # Above by time + assert cf(3, 0) == capacity + + # Above by initial soc + assert cf(1, 5) == capacity + + # Above by combination of time and initial soc + assert cf(1, 3) == capacity + + def test_within_limit(self, ch_function): + c, capacity, cf = ch_function + + assert cf(1) == 2 + assert cf(1, initial_soc=1) == 3 + + def test_dummy_no_initial_soc_given(self): + c = 0 + capacity = 4 + + cf = ChargingFunction(c, capacity) + + assert cf.is_dummy + + with pytest.raises(ValueError): + cf(3) + + def test_dummy(self): + c = 0 + capacity = 4 + initial_soc = 3 + + cf = ChargingFunction(c, capacity, initial_soc) + + assert cf(5) == initial_soc + assert cf(0) == initial_soc + assert cf(1) == initial_soc + + def test_inverse(self, ch_function): + c, capacity, cf = ch_function + + assert cf.inverse(1) == 0.5 + assert cf.inverse(4) == 2 + + def test_inverse_above_limit(self, ch_function): + c, capacity, cf = ch_function + + assert cf.inverse(-1) == 0 + + # Maximum time to charge is to load completely + assert cf.inverse(capacity + 1) == capacity / c + + def test_inverse_dummy(self): + c = 0 + capacity = 4 + initial_soc = 3 + + cf = ChargingFunction(c, capacity, initial_soc) + + with pytest.raises(ValueError): + cf.inverse(0) diff --git a/tests/charge/test_utils.py b/tests/charge/test_utils.py new file mode 100644 index 0000000..3220019 --- /dev/null +++ b/tests/charge/test_utils.py @@ -0,0 +1,42 @@ +import pytest +from evrouting.charge.utils import LabelPriorityQueue +from evrouting.charge.T import Label + + +@pytest.fixture +def q(label, ch_function): + _, _, cf = ch_function + q = LabelPriorityQueue() + q.insert(label, cf) + + # create min + label = Label( + t_trip=8, + soc_last_cs=2, + soc_profile_cs_v=label.soc_profile_cs_v, + last_cs=1 + ) + + q.insert(label, cf) + + yield q + del q + + +class TestProrityQueue: + def test_empty(self, q: LabelPriorityQueue): + q.delete_min() + q.delete_min() + + with pytest.raises(KeyError): + q.delete_min() + + def test_peak(self, q: LabelPriorityQueue): + assert q.peak_min().t_trip == 8 + + def test_insert_same(self, q: LabelPriorityQueue, ch_function): + """Should be no problem.""" + _, _, cf = ch_function + label = q.peak_min() + q.remove_item(label) + q.insert(label, cf) diff --git a/tests/config.py b/tests/config.py index 7c0461a..ba91c6f 100644 --- a/tests/config.py +++ b/tests/config.py @@ -1,32 +1,53 @@ import networkx as nx +from copy import copy +from typing import Set -from evrouting.graph_tools import node_convert, edge_convert -from evrouting.graph_tools import TemplateEdge as Edge -from evrouting.graph_tools import TemplateNode as Node +from evrouting.T import Node, ChargingCoefficient +from evrouting.graph_tools import ( + node_convert, edge_convert, TemplateEdge, TemplateNode, charging_cofficient +) # List of configs -config_list = ['edge_case'] +config_list = ['edge_case', 'edge_case_start_node_no_cs'] edge_case = { - 'b_0': 0, - 'b_t': 0, + 'beta_s': 0, + 'beta_t': 0, 'U': 4, 's': 0, 't': 2, 'nodes': [ - Node('s', charging_coeff=1), - Node('a', charging_coeff=2), - Node('t'), + TemplateNode('s', charging_coeff=1), + TemplateNode('a', charging_coeff=2), + TemplateNode('t'), ], 'edges': [ - Edge(0, 1, distance=1, consumption=1), - Edge(0, 2, distance=1, consumption=4), - Edge(1, 2, distance=1, consumption=1), + TemplateEdge(0, 1, distance=1, consumption=1), + TemplateEdge(0, 2, distance=1, consumption=4), + TemplateEdge(1, 2, distance=1, consumption=1), + ] +} + +edge_case_start_node_no_cs = { + 'beta_s': 0, + 'beta_t': 0, + 'U': 4, + 's': 0, + 't': 2, + 'nodes': [ + TemplateNode('s'), + TemplateNode('a', charging_coeff=2), + TemplateNode('t'), + ], + 'edges': [ + TemplateEdge(0, 1, distance=1, consumption=1), + TemplateEdge(0, 2, distance=1, consumption=4), + TemplateEdge(1, 2, distance=1, consumption=1), ] } -def get_graph(config): +def get_graph(config: dict) -> nx.Graph: G = nx.Graph() for node_id, node in enumerate(config['nodes']): @@ -36,3 +57,34 @@ def get_graph(config): G.add_edge(edge.u, edge.v, **edge_convert(edge)) return G + + +def get_charging_stations(config: dict) -> Set[Node]: + return { + idx for idx, n in enumerate(config['nodes']) + if n.charging_coeff is not None + } + + +def init_config(config: dict) -> dict: + G = nx.Graph() + S = set() + + for node_id, node in enumerate(config['nodes']): + G.add_node(node_id, **node_convert(node)) + c: ChargingCoefficient = charging_cofficient(G, node_id) + if c is not None: + S.add(node_id) + + for edge in config['edges']: + G.add_edge(edge.u, edge.v, **edge_convert(edge)) + + return { + 'G': G, + 'charging_stations': S, + 's': config['s'], + 't': config['t'], + 'initial_soc': config['beta_s'], + 'final_soc': config['beta_t'], + 'capacity': config['U'] + } diff --git a/tests/test_charge_routing.py b/tests/test_charge_routing.py deleted file mode 100644 index b14a78e..0000000 --- a/tests/test_charge_routing.py +++ /dev/null @@ -1,18 +0,0 @@ -from evrouting.charge import shortest_path - -from .config import edge_case, get_graph - - -def test_shortest_path(): - G = get_graph(edge_case) - - path = shortest_path( - G, - s=edge_case['s'], - t=edge_case['t'], - b_0=edge_case['b_0'], - b_t=edge_case['b_t'], - U=edge_case['U'] - ) - - assert path == [(0, 2), (1, 0), (2, 0)] -- GitLab