Skip to content
Snippets Groups Projects
Commit 2d0e5f0e authored by markn92's avatar markn92
Browse files

Working np Algorithm

parent e0039f48
Branches
No related tags found
No related merge requests found
from typing import Tuple, Union, NewType
from math import inf
from dataclasses import dataclass
from typing import Tuple, Union, NewType, Dict, Any
Node = int
Edge = Tuple[Node, Node]
NodeData = Dict[str, Any]
EdgeData = Dict[str, Any]
Wh = NewType('Wh', Union[float, int])
SoC = NewType('SoC', Union[-inf, Wh])
SoC = NewType('SoC', Wh)
ChargingCoefficient = float
ChargingCoefficient = Union[float, int, None]
Time = Union[float, int]
from copy import copy
from collections import namedtuple
from typing import Callable, NamedTuple
from math import inf
import networkx as nx
from evrouting.T import SoC, Wh, ChargingCoefficient, Time, Node
from evrouting.graph_tools import charging_cofficient, consumption
Label = namedtuple('Label', ['t_trip', 'beta_u', 'u', 'SoCProfile_u_v'])
class SoCProfile:
"""
The SoC Profile maps an initial SoC to the SoC after
traversing a path.
class ChargingFunction:
SoC profile of a path is fully described with two parameters:
- cost: Cost of going from u to v.
- out: Maximal SoC after passing the path from u to v.
"""
def __init__(self, G: nx.Graph, l: Label):
self.t_trip: Time = l.t_trip
self.beta_u: SoC = l.beta_u
self.u: Node = l.u
self.b_u_v: SoCProfile = l.SoCProfile_u_v
self.c_u: ChargingCoefficient = charging_cofficient(G, l.u)
def __init__(self, path_cost: Wh, capacity: SoC):
"""
Calculates the maximum SoC after traversing the path, which is
(for non-negative path costs) U - cost.
def __call__(self, t) -> SoC:
if t < self.t_trip:
:param path_cost: Cost of the path. The SoC Profile of
a single vertex has no cost.
:param capacity: The battery's capacity.
"""
self.capacity: SoC = capacity
self.path_cost: Wh = path_cost
self.out: SoC = capacity - self.path_cost
def __call__(self, initial_soc: SoC) -> SoC:
"""
:param initial_soc: The initial SoC.
:returns: The SoC after traversing the path.
"""
if initial_soc < self.path_cost:
return -inf
final_soc = initial_soc - self.path_cost
return self.beta_u(self.beta_u + self.c_u * (t - self.t_trip))
return final_soc if final_soc < self.capacity else self.capacity
def __add__(self, other: 'SoCProfile') -> 'SoCProfile':
"""
Combines to SoC Profiles.
def get_minimum(self) -> Time:
"""TODO: Explain."""
cost_p = self.b_u_v.cost
return max(self.t_trip, (cost_p - self.beta_u) / self.c_u + self.t_trip)
The for the combined SoC Profile, given a SoC Profile for a path a
and a path b, holds:
cost = cost_a + cost_b
out = U - cost
class SoCProfile:
:return: Combined SoC Profile.
"""
Describe SoC profile with two parameters:
- cost: Cost of going from u to v.
- out: Maximal SoC after passing the path from u to v.
return SoCProfile(self.path_cost + other.path_cost, self.capacity)
class ChargingFunction:
"""
Charging functions map charging time to resulting SoCs. Since they are
monotonic, there exists also an inverse mapping SoC to charging time.
Dummy Charging Station
======================
Special case is a so called dummy charging station. This is the case
if the charging coefficient is 0. Then there is no actual charging and
the value of the charging station is always the same as the arrival SoC.
A dummy charging station is necessary for initialization of the problem
to trigger spawning of new labels.
"""
def __init__(self, c: ChargingCoefficient, capacity: SoC,
initial_soc: SoC = None):
"""
:param c: The stations charging coefficient. If c=0, the this becomes
a so called dummy charging station, that exists to trigger label
creation in the algorithm.
:param capacity: The battery's capacity, respectively the maximum of
the charging function.
:param initial_soc: The SoC at arriving at the charging station. This is
optional for dummy charging stations, that do not actually charge:
cf(b) = soc
"""
self.c: ChargingCoefficient = c
self.capacity: SoC = capacity
self.initial_soc: SoC = initial_soc
@property
def is_dummy(self) -> bool:
"""Tell if function is a dummy function."""
return self.c == 0
def __init__(self, G: nx.Graph, U: SoC, u: Node, v: Node = None):
if v is None:
self.cost: Wh = 0
self.out: Wh = U
def __call__(self, t: Time, initial_soc: SoC = 0) -> SoC:
"""
:param t: Charging time
:param initial_soc: Initial SoC when starting to charge
:return: SoC after charging.
"""
if self.is_dummy:
if self.initial_soc is None:
raise ValueError(
'Charging coefficient is 0 but no initial SoC given.'
)
soc = self.initial_soc
else:
self.cost: Wh = consumption(G, u, v)
self.out: Wh = U - self.cost
soc = initial_soc + self.c * t
return soc if soc < self.capacity else self.capacity
@property
def inverse(self) -> Callable[[SoC], Time]:
"""
:returns: Inverse charging function mapping SoC to charging
time (assuming initial SoC=0)
"""
if self.is_dummy:
raise ValueError('Dummy Carging function has no inverse.')
c = self.c
capacity = self.capacity
def __call__(self, beta) -> SoC:
if beta < self.cost:
def cf_inverse(soc: SoC) -> Time:
if soc > capacity:
# Return max charge time
return capacity / c
elif soc < 0:
# Return min charge time
return 0
return soc / c
return cf_inverse
class Label(NamedTuple):
"""
Label used for the Algorithm.
The (paleto optimal) label where u=t is the
contains the minimum trip time necessary to get to t.
The label of node v consists of a tuple:
(t_trip, soc_last_cs, last_cs, soc_profile_cs_v)
Where:
:t_trip: Trip time to node v without charging time at the last
charging station.
:soc_last_cs: Arrival SoC at the last charging station.
:last_cs: (Node ID of) The last charging station.
:soc_profile_cs_v: The SoC Profile describing the costs going from
the charging station to current node v.
"""
t_trip: Time
soc_last_cs: SoC
last_cs: Node
soc_profile_cs_v: SoCProfile
class SoCFunction:
"""
SoC Function of a node's label maps trip time plus an additional charging
time at the last charging station to a SoC at the node.
"""
def __init__(self, label: Label, cf_cs: ChargingFunction):
"""
:param label: Label containing to a node. It has the fields:
:t_trip: Trip time. Includes time to reach v from the last
charging station in the path but excludes charging at u.
:soc_last_cs: The SoC at reaching the last charging station
before any charging.
:last_cs: The last charging station in the path to current node.
:soc_profile_cs_v: The SoC Profile of the path cs -> v.
:cf_cs: Charging function of cs.
:param cf_cs: The charging function of the last charging station.
"""
# Unpack label
self.t_trip: Time = label.t_trip
self.last_cs: Node = label.last_cs
self.soc_last_cs: SoC = label.soc_last_cs
self.soc_profile_cs_v: SoCProfile = label.soc_profile_cs_v
self.cf_cs: ChargingFunction = cf_cs
def __call__(self, t: Time) -> SoC:
"""
Maps a new trip time to a SoC at the current node. The new trip time
then includes and fixes charging time at the last charging station.
:param t: t = trip time + charging at the last charging station
:return: SoC at current node at time t after spending t - t_trip at
the last charging station to charge the battery.
"""
if t < self.t_trip:
# Impossible to reach the current node for times < trip time
return -inf
return beta - self.cost
def __add__(self, other: 'SoCProfile') -> 'SoCProfile':
new = copy(self)
new.cost = self.cost + other.cost
new.out = self.out - other.cost
return self.soc_profile_cs_v(
self.cf_cs(t - self.t_trip, self.soc_last_cs)
)
@property
def minimum(self) -> Time:
"""
:returns: Least feasible trip time. That is, the minimum time when
the SoC functions yields a SoC value that is not -inf.
This is either the trip time, or if energy needs to be charged
at the previous charging station to traverse the path, trip time
plus charging time until the battery holds the minimum energie to
traverse the path to the current node (which is the cost of the
path). This time is:
t = t_trip + cf_cs^(-1)(cost_cs_v) - cf_cs^(-1)(soc_last_sc)
Thus, for t_min holds:
t_min = max(t_trip, t)
"""
cost_p = self.soc_profile_cs_v.path_cost
if cost_p > self.soc_last_cs:
if self.cf_cs.is_dummy:
# Not feasible. Dummy stations do not load.
return -inf
return self.t_trip + \
self.cf_cs.inverse(cost_p) - \
self.cf_cs.inverse(self.soc_last_cs)
return new
return self.t_trip
import networkx as nx
from .T import SoCProfile, ChargingFunction
from ..T import Node, SoC
from ..graph_tools import charging_cofficient, consumption
def charging_function(
G: nx.Graph,
n: Node,
capacity: SoC,
initial_soc: SoC = None) -> ChargingFunction:
"""Create charging function of node."""
return ChargingFunction(charging_cofficient(G, n), capacity, initial_soc)
def soc_profile(
G: nx.Graph,
capacity: SoC,
u: Node,
v: Node = None,
) -> SoCProfile:
"""
Return SoC Profile of the path from u to v.
If no v is provided, the path of u is definded as no cost path.
"""
path_cost = 0 if v is None else consumption(G, u, v)
return SoCProfile(path_cost, capacity)
from typing import List
from typing import Dict
from math import inf
import networkx as nx
from evrouting.T import Node, SoC, Time, ChargingCoefficient
from evrouting.T import Node, SoC, Time
from evrouting.utils import PriorityQueue
from evrouting.charge import factories as factories
from .T import SoCProfile, ChargingFunction, Label
from ..graph_tools import distance
from .T import SoCFunction, Label
from .utils import LabelPriorityQueue
def shortest_path(G: nx.Graph, S: set, s: Node, t: Node, beta_s: SoC, beta_t: SoC, U: SoC):
def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
initial_soc: SoC, final_soc: SoC, capacity: SoC):
"""
Calculates shortest path using the CHarge algorithm.
......@@ -21,64 +25,128 @@ def shortest_path(G: nx.Graph, S: set, s: Node, t: Node, beta_s: SoC, beta_t: So
:return:
"""
q = PriorityQueue()
l_set = {v: set() for v in G}
l_uns = {v: PriorityQueue() for v in G}
l_set: Dict[int, set] = {v: set() for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {
v: LabelPriorityQueue() for v in G
}
# Dummy vertex without incident edges that is (temporarily) added to G
v_0: Node = Node(len(G.nodes))
G.add_node(v_0)
S.add(v_0)
cf_v_0 = [(0, beta_s)]
l_uns[s] = PriorityQueue()
l = Label(0, beta_s, v_0, SoCProfile(G, U, s))
l_uns[s].insert(item=l, priority=key(l))
dummy_node: Node = len(G.nodes)
# Charging coefficient 0 indicates dummy node
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
l: Label = Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=factories.soc_profile(G, capacity, s)
)
l_uns[s].insert(
l,
factories.charging_function(G, l.last_cs, capacity, initial_soc)
)
q.insert(s, 0)
# run main loop
while True:
try:
v = q.peak_min()
minimum_node: Node = q.peak_min()
except KeyError:
# empty queue
break
l = l_uns[v].delete_min()
l_set[v].add(l)
label_minimum_node: Label = l_uns[minimum_node].delete_min()
l_set[minimum_node].add(label_minimum_node)
if v == t:
return ChargingFunction(G, l).get_minimum()
if minimum_node == t:
return SoCFunction(
label_minimum_node,
factories.charging_function(
G,
label_minimum_node.last_cs,
capacity,
initial_soc
)
).minimum
# handle charging stations
t_trip, beta_u, u, b_u_v = l
if v in S and not v == u:
# TODO !!!
for t_charge in t_breaks(l):
l_uns[v].insert(new_label(l), priority=) # prio??
if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs:
cf_last_cs = factories.charging_function(
G,
label_minimum_node.last_cs,
capacity,
initial_soc # Use here in case cs is a dummy station
)
cf_minimum_node = factories.charging_function(
G,
minimum_node,
capacity,
initial_soc # Use here in case cs is a dummy station
)
if cf_minimum_node.c > cf_last_cs.c:
# Only charge the minimum at the last charge station
# and continue charging at this station.
old_soc_function: SoCFunction = SoCFunction(
label_minimum_node, cf_last_cs
)
t_trip_old = label_minimum_node.t_trip
t_charge: Time = old_soc_function.minimum - t_trip_old
label_new = Label(
t_trip=t_trip_old + t_charge,
soc_last_cs=old_soc_function(t_trip_old + t_charge),
last_cs=minimum_node,
soc_profile_cs_v=factories.soc_profile(
G, capacity, minimum_node
)
)
l_uns[minimum_node].insert(
label_new,
cf_minimum_node
)
# update priority queue
if l_uns[v]:
l_new = l_uns[v].peak_min()
q.insert(v, key(l_new))
else:
try:
label_minimum_node = l_uns[minimum_node].peak_min()
except KeyError:
# l_uns[v] empty
q.delete_min()
else:
q.insert(minimum_node, key(label_minimum_node))
# scan outgoing arcs
for x, y in G[v]:
b_x_y = b_u_v + SoCProfile(G, U, x, y)
if not b_x_y(beta_max_u) == -inf:
l_new = (t_trip + G.edges[x, y]['weight'], beta_u, u, b_x_y)
l_uns[y].insert(l_new)
if l_new == l_uns[y].peak_min():
q.insert(y, key(l_new))
for n in G.neighbors(minimum_node):
# Create SoC Profile for getting from minimum_node to n
soc_profile = label_minimum_node.soc_profile_cs_v + \
factories.soc_profile(G, capacity, minimum_node, n)
if not soc_profile(capacity) == -inf:
# It is possible to get from minimum_node to n
l_new = Label(
label_minimum_node.t_trip + distance(G, minimum_node, n),
label_minimum_node.soc_last_cs,
label_minimum_node.last_cs,
soc_profile
)
try:
l_uns[n].insert(
l_new,
factories.charging_function(
G,
l_new.last_cs,
capacity,
initial_soc
)
)
except ValueError:
pass
else:
if l_new == l_uns[n].peak_min():
q.insert(n, key(l_new))
def key(l: Label) -> Time:
return l.t_trip
def t_breaks(c_old: ChargingCoefficient, c_new: ChargingCoefficient) -> List[Time]:
pass
from math import inf
from evrouting.utils import PriorityQueue
from evrouting.T import SoC, Time
from .T import Label, SoCFunction, ChargingFunction
class LabelPriorityQueue(PriorityQueue):
def insert(self, label: Label, cf: ChargingFunction):
"""Breaking ties with lowest soc at t_min."""
soc_function = SoCFunction(
label,
cf
)
t_min: Time = soc_function.minimum
# Might happen because of dummy charge stations
if t_min == -inf:
raise ValueError('Infeasible label.')
soc_min: SoC = soc_function(t_min)
super().insert(
item=label,
priority=t_min,
count=soc_min
)
from typing import Dict, Tuple
from collections import namedtuple
import networkx as nx
from evrouting.T import Wh, ChargingCoefficient
from evrouting.T import Wh, ChargingCoefficient, Time, Node, NodeData, EdgeData
TemplateEdge = namedtuple('Edge', ['u', 'v', 'distance', 'consumption'])
TemplateNode = namedtuple('Node', ['label', 'charging_coeff'], defaults=(None, None))
NodeData = Dict
EdgeData = Dict
Node = int
Edge = Tuple[int, int]
TemplateNode = namedtuple(
'Node', ['label', 'charging_coeff'], defaults=(None, None)
)
def node_convert(n: TemplateNode) -> NodeData:
......@@ -26,5 +21,10 @@ def consumption(G: nx.Graph, u: Node, v: Node) -> Wh:
return G.edges[u, v]['c']
def distance(G: nx.Graph, u: Node, v: Node) -> Time:
return G.edges[u, v]['weight']
def charging_cofficient(G: nx.Graph, n: Node) -> ChargingCoefficient:
return G.nodes[n]['c']
......@@ -11,12 +11,17 @@ class PriorityQueue:
self.entry_finder = {} # mapping of tasks to entries
self.counter = itertools.count() # unique sequence count as tie break
def insert(self, item: Any, priority=0):
def insert(self, item: Any, priority: Any = 0, count: Any = None):
"""Add a new task or update the priority of an existing task"""
if item in self.entry_finder:
self.remove_item(item)
count = next(self.counter)
entry = [priority, count, item]
# Additional manual set tie break
if count is not None:
entry = [priority, count, next(self.counter), item]
else:
entry = [priority, next(self.counter), item]
self.entry_finder[item] = entry
heappush(self.pq, entry)
......@@ -28,7 +33,7 @@ class PriorityQueue:
def delete_min(self) -> Any:
"""Remove and return the lowest priority task. Raise KeyError if empty."""
while self.pq:
priority, count, item = heappop(self.pq)
item = heappop(self.pq)[-1]
if item is not self.REMOVED:
del self.entry_finder[item]
return item
......@@ -37,7 +42,7 @@ class PriorityQueue:
def peak_min(self) -> Any:
"""Return minimum item without removing it from the queue."""
while self.pq:
priority, count, item = self.pq[0]
item = self.pq[0][-1]
if item is not self.REMOVED:
return item
else:
......
import pytest
from evrouting.charge.T import SoCProfile, SoCFunction, ChargingFunction, Label
@pytest.fixture
def soc_profile():
cost = 1
capacity = 4
return cost, capacity, SoCProfile(cost, capacity)
@pytest.fixture
def soc_profile_2():
cost = 2
capacity = 4
return cost, capacity, SoCProfile(cost, capacity)
@pytest.fixture
def ch_function():
c = 2
capacity = 4
return c, capacity, ChargingFunction(c, capacity)
@pytest.fixture
def label(soc_profile_2):
_, _, profile = soc_profile_2
return Label(
t_trip=10,
soc_last_cs=2,
soc_profile_cs_v=profile,
last_cs=1
)
@pytest.fixture
def soc_function(label, ch_function):
_, _, cf = ch_function
return SoCFunction(label, cf)
from evrouting.charge import shortest_path
from ..config import (
edge_case,
edge_case_start_node_no_cs,
init_config
)
def test_shortest_path_charge_at_s():
"""Charging at s."""
path = shortest_path(**init_config(edge_case))
assert path == 3.5
def test_shortest_path_no_charge_s_path_t():
"""No charging at s but enough initial SoC to go to t directly."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 4
path = shortest_path(**conf)
assert path == 1
def test_shortest_path_no_charge_s_path_a():
"""No charging at s but just enough SoC to go to t via a."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 2
path = shortest_path(**conf)
assert path == 2
from math import inf
import pytest
from evrouting.charge.T import SoCProfile, ChargingFunction
class TestSoCProfile:
def test_profile_limit(self, soc_profile):
cost, capacity, p = soc_profile
assert p(capacity) == capacity - cost
def test_profile_above_limit(self, soc_profile):
cost, capacity, p = soc_profile
assert p(capacity + 10) == capacity
def test_profile_under_limit(self, soc_profile):
cost, capacity, p = soc_profile
assert p(cost - 1) == -inf
def test_profile_lower_limit(self, soc_profile):
cost, capacity, p = soc_profile
assert p(cost) == 0
def test_profile_within_limit(self, soc_profile):
cost, capacity, p = soc_profile
assert p(1.5) == 1.5 - cost
def test_compound(self):
cost_a = 1
cost_b = 3
capacity = 2
p_a = SoCProfile(cost_a, capacity)
p_b = SoCProfile(cost_b, capacity)
p = p_a + p_b
assert p.path_cost == cost_a + cost_b
assert p.capacity == capacity
class TestSoCFunction:
def test_minimum(self, soc_function):
"""Not necessary to charge so minimum is trip time."""
assert soc_function.minimum == soc_function.t_trip
def test_minimum_with_charge(self, soc_function):
"""Empty battery at charge station."""
soc_function.soc_last_cs = 0
# Needs to charge 2 Wh because path has costs
# of 2. Charging takes additional 2 Wh / charging_coefficient
# of time compared to the sole trip time.
assert soc_function.minimum == \
2 / soc_function.cf_cs.c + soc_function.t_trip
def test_minimum_with_and_soc_charge(self, soc_function):
"""Empty battery at charge station."""
soc_function.soc_last_cs = 1
# Needs to charge 1 Wh because path has costs
# of 2. Charging takes additional 1 Wh / charging_coefficient
# of time compared to the sole trip time.
assert soc_function.minimum == \
1 / soc_function.cf_cs.c + soc_function.t_trip
def test_below_t_trip(self, soc_function):
"""For t < t_trip, the current vertex is not feasible."""
assert soc_function(9) == -inf
def test_t_trip(self, soc_function):
"""
Reaches the vertex without charging with the initial SoC 2 Wh
reduced by th cost 2 Wh = 0 Wh.
"""
assert soc_function(10) == 0
def test_above_t_trip(self, soc_function):
"""Adds 1 h of charging to the case above."""
assert soc_function(11) == 2
def test_below_t_trip_need_to_fill(self, soc_function):
"""For t < t_trip, the current vertex is not feasible."""
soc_function.soc_last_cs = 0
assert soc_function(10) == -inf
def test_t_trip_need_to_fill(self, soc_function):
"""Needs to charge to reach the current vertex."""
soc_function.soc_last_cs = 0
t_fill = 2 / soc_function.cf_cs.c
assert soc_function(10 + t_fill) == 0.
def test_above_t_trip_need_to_fill(self, soc_function):
"""Adds 1 h of charging."""
soc_function.soc_last_cs = 0
t_fill = 2 / soc_function.cf_cs.c
assert soc_function(11 + t_fill) == 2
def test_dummy_feasible(self, soc_function):
"""Cost < SoC at (dummy) Charging Station."""
# Make dummy
soc_function.cf_cs.c = 0
assert soc_function.minimum == 10
def test_dummy_not_feasible(self, soc_function):
"""Cost higher than soc at c."""
# Make dummy
soc_function.cf_cs.c = 0
soc_function.soc_last_cs = 1
assert soc_function.minimum == -inf
class TestChargingFunction:
def test_creation(self, ch_function):
c, capacity, cf = ch_function
assert not cf.is_dummy
def test_above_limit(self, ch_function):
"""Charge over capacity."""
c, capacity, cf = ch_function
# Above by time
assert cf(3, 0) == capacity
# Above by initial soc
assert cf(1, 5) == capacity
# Above by combination of time and initial soc
assert cf(1, 3) == capacity
def test_within_limit(self, ch_function):
c, capacity, cf = ch_function
assert cf(1) == 2
assert cf(1, initial_soc=1) == 3
def test_dummy_no_initial_soc_given(self):
c = 0
capacity = 4
cf = ChargingFunction(c, capacity)
assert cf.is_dummy
with pytest.raises(ValueError):
cf(3)
def test_dummy(self):
c = 0
capacity = 4
initial_soc = 3
cf = ChargingFunction(c, capacity, initial_soc)
assert cf(5) == initial_soc
assert cf(0) == initial_soc
assert cf(1) == initial_soc
def test_inverse(self, ch_function):
c, capacity, cf = ch_function
assert cf.inverse(1) == 0.5
assert cf.inverse(4) == 2
def test_inverse_above_limit(self, ch_function):
c, capacity, cf = ch_function
assert cf.inverse(-1) == 0
# Maximum time to charge is to load completely
assert cf.inverse(capacity + 1) == capacity / c
def test_inverse_dummy(self):
c = 0
capacity = 4
initial_soc = 3
cf = ChargingFunction(c, capacity, initial_soc)
with pytest.raises(ValueError):
cf.inverse(0)
import pytest
from evrouting.charge.utils import LabelPriorityQueue
from evrouting.charge.T import Label
@pytest.fixture
def q(label, ch_function):
_, _, cf = ch_function
q = LabelPriorityQueue()
q.insert(label, cf)
# create min
label = Label(
t_trip=8,
soc_last_cs=2,
soc_profile_cs_v=label.soc_profile_cs_v,
last_cs=1
)
q.insert(label, cf)
yield q
del q
class TestProrityQueue:
def test_empty(self, q: LabelPriorityQueue):
q.delete_min()
q.delete_min()
with pytest.raises(KeyError):
q.delete_min()
def test_peak(self, q: LabelPriorityQueue):
assert q.peak_min().t_trip == 8
def test_insert_same(self, q: LabelPriorityQueue, ch_function):
"""Should be no problem."""
_, _, cf = ch_function
label = q.peak_min()
q.remove_item(label)
q.insert(label, cf)
import networkx as nx
from copy import copy
from typing import Set
from evrouting.graph_tools import node_convert, edge_convert
from evrouting.graph_tools import TemplateEdge as Edge
from evrouting.graph_tools import TemplateNode as Node
from evrouting.T import Node, ChargingCoefficient
from evrouting.graph_tools import (
node_convert, edge_convert, TemplateEdge, TemplateNode, charging_cofficient
)
# List of configs
config_list = ['edge_case']
config_list = ['edge_case', 'edge_case_start_node_no_cs']
edge_case = {
'b_0': 0,
'b_t': 0,
'beta_s': 0,
'beta_t': 0,
'U': 4,
's': 0,
't': 2,
'nodes': [
Node('s', charging_coeff=1),
Node('a', charging_coeff=2),
Node('t'),
TemplateNode('s', charging_coeff=1),
TemplateNode('a', charging_coeff=2),
TemplateNode('t'),
],
'edges': [
Edge(0, 1, distance=1, consumption=1),
Edge(0, 2, distance=1, consumption=4),
Edge(1, 2, distance=1, consumption=1),
TemplateEdge(0, 1, distance=1, consumption=1),
TemplateEdge(0, 2, distance=1, consumption=4),
TemplateEdge(1, 2, distance=1, consumption=1),
]
}
edge_case_start_node_no_cs = {
'beta_s': 0,
'beta_t': 0,
'U': 4,
's': 0,
't': 2,
'nodes': [
TemplateNode('s'),
TemplateNode('a', charging_coeff=2),
TemplateNode('t'),
],
'edges': [
TemplateEdge(0, 1, distance=1, consumption=1),
TemplateEdge(0, 2, distance=1, consumption=4),
TemplateEdge(1, 2, distance=1, consumption=1),
]
}
def get_graph(config):
def get_graph(config: dict) -> nx.Graph:
G = nx.Graph()
for node_id, node in enumerate(config['nodes']):
......@@ -36,3 +57,34 @@ def get_graph(config):
G.add_edge(edge.u, edge.v, **edge_convert(edge))
return G
def get_charging_stations(config: dict) -> Set[Node]:
return {
idx for idx, n in enumerate(config['nodes'])
if n.charging_coeff is not None
}
def init_config(config: dict) -> dict:
G = nx.Graph()
S = set()
for node_id, node in enumerate(config['nodes']):
G.add_node(node_id, **node_convert(node))
c: ChargingCoefficient = charging_cofficient(G, node_id)
if c is not None:
S.add(node_id)
for edge in config['edges']:
G.add_edge(edge.u, edge.v, **edge_convert(edge))
return {
'G': G,
'charging_stations': S,
's': config['s'],
't': config['t'],
'initial_soc': config['beta_s'],
'final_soc': config['beta_t'],
'capacity': config['U']
}
from evrouting.charge import shortest_path
from .config import edge_case, get_graph
def test_shortest_path():
G = get_graph(edge_case)
path = shortest_path(
G,
s=edge_case['s'],
t=edge_case['t'],
b_0=edge_case['b_0'],
b_t=edge_case['b_t'],
U=edge_case['U']
)
assert path == [(0, 2), (1, 0), (2, 0)]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment