Skip to content
Snippets Groups Projects
Commit 311236f5 authored by markn92's avatar markn92
Browse files

Merge branch 'dev' into 'master'

Dev

See merge request !2
parents 7a1758d0 667d2ce9
No related branches found
No related tags found
1 merge request!2Dev
......@@ -134,6 +134,26 @@ class ChargingFunction:
return cf_inverse
def __lt__(self, other) -> bool:
"""Comparison for dominance check."""
return self.c < other.c
def __le__(self, other) -> bool:
"""Comparison for dominance check."""
return self.c <= other.c
def __eq__(self, other) -> bool:
"""Comparison for dominance check."""
return self.c == other.c
def __ge__(self, other):
"""Comparison for dominance check."""
return self.c >= other.c
def __gt__(self, other):
"""Comparison for dominance check."""
return self.c > other.c
class Label(NamedTuple):
"""
......@@ -160,10 +180,10 @@ class Label(NamedTuple):
last_cs: Node
soc_profile_cs_v: SoCProfile
@property
def key(self):
"""Key for sorting."""
return self.t_trip
class Breakpoint(NamedTuple):
t: Time
soc: SoC
class SoCFunction:
......@@ -195,6 +215,18 @@ class SoCFunction:
self.cf_cs: ChargingFunction = cf_cs
@property
def breakpoints(self):
breakpoints = [Breakpoint(self.minimum, 0)]
if not self.cf_cs.is_dummy:
breakpoints.append(
Breakpoint(
self.minimum + self.cf_cs.inverse(self.soc_profile_cs_v.out),
self.soc_profile_cs_v.out
)
)
return breakpoints
def __call__(self, t: Time) -> SoC:
"""
Maps a new trip time to a SoC at the current node. The new trip time
......@@ -221,7 +253,7 @@ class SoCFunction:
This is either the trip time, or if energy needs to be charged
at the previous charging station to traverse the path, trip time
plus charging time until the battery holds the minimum energie to
plus charging time until the battery holds the minimum energy to
traverse the path to the current node (which is the cost of the
path). This time is:
......
import networkx as nx
from typing import Dict
from .T import SoCProfile, ChargingFunction
from ..T import Node, SoC
from .T import SoCProfile, SoCFunction, ChargingFunction, Label
from ..T import Node, SoC, Time
from ..graph_tools import charging_cofficient, consumption
def charging_function(
def charging_function_factory(
G: nx.Graph,
n: Node,
capacity: SoC,
......@@ -14,7 +15,7 @@ def charging_function(
return ChargingFunction(charging_cofficient(G, n), capacity, initial_soc)
def soc_profile(
def soc_profile_factory(
G: nx.Graph,
capacity: SoC,
u: Node,
......@@ -28,3 +29,67 @@ def soc_profile(
"""
path_cost = 0 if v is None else consumption(G, u, v)
return SoCProfile(path_cost, capacity)
class ChargingFunctionMap:
"""Maps Nodes to their charging functions."""
def __init__(self, G: nx.Graph, capacity: SoC, initial_soc: SoC = None):
self.map: Dict[Node, ChargingFunction] = {}
self.G: nx.Graph = G
self.capacity: SoC = capacity
self.initial_soc: SoC = initial_soc
def __getitem__(self, node: Node) -> ChargingFunction:
"""
Try to get charging function from cache,
else create function and add to cache.
"""
try:
cf = self.map[node]
except KeyError:
cf = charging_function_factory(
G=self.G,
n=node,
capacity=self.capacity,
initial_soc=self.initial_soc
)
self.map[node] = cf
return cf
class SoCFunctionMap:
"""Maps Nodes to their charging functions."""
def __init__(self, cf: ChargingFunctionMap):
self.cf: ChargingFunctionMap = cf
def __getitem__(self, label: Label) -> SoCFunction:
return SoCFunction(label, self.cf[label.last_cs])
class LabelsFactory:
def __init__(self,
G: nx.Graph,
capacity: SoC,
f_soc: SoCFunctionMap,
initial_soc: SoC = None):
self.G: nx.Graph = G
self.capacity: SoC = capacity
self.f_soc: SoCFunctionMap = f_soc
self.initial_soc: SoC = initial_soc
def spawn_label(self, current_node: Node, current_label: Label, t_charge: Time):
# Only charge the minimum at the last charge station
# and continue charging at this station.
soc_function: SoCFunction = self.f_soc[current_label]
return Label(
t_trip=current_label.t_trip + t_charge,
soc_last_cs=soc_function(current_label.t_trip + t_charge),
last_cs=current_node,
soc_profile_cs_v=soc_profile_factory(
self.G, self.capacity, current_node)
)
from typing import Dict
from typing import Dict, List
from math import inf
import networkx as nx
from evrouting.T import Node, SoC, Time
from evrouting.utils import PriorityQueue
from evrouting.charge.factories import soc_profile as soc_profile_factory
from evrouting.charge.factories import (
LabelsFactory,
ChargingFunctionMap,
SoCFunctionMap,
soc_profile_factory
)
from ..graph_tools import distance
from .T import SoCFunction, Label
from .utils import LabelPriorityQueue, ChargingFunctionMap
from .T import SoCProfile, SoCFunction, Label
from .utils import LabelPriorityQueue
__all__ = ['shortest_path']
def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
......@@ -16,46 +23,38 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
"""
Calculates shortest path using the CHarge algorithm.
:param G: Input Graph
:param s: Start Node identifier
:param t: End Node identifier
:param beta_s: Start SoC
:param beta_t: End SoC
:param U: Capacity
:param G:
:param charging_stations:
:param s:
:param t:
:param initial_soc:
:param final_soc:
:param capacity:
:return:
"""
t = _apply_final_constraints(G, t, final_soc)
cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
f_soc = SoCFunctionMap(cf)
label_factory = LabelsFactory(G, capacity, f_soc, initial_soc)
q = PriorityQueue()
# Init maps to manage labels
l_set: Dict[int, set] = {v: set() for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {
v: LabelPriorityQueue() for v in G
}
l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(cf) for v in G}
# Dummy vertex without incident edges that is (temporarily) added to G
dummy_node: Node = len(G.nodes)
# Charging coefficient 0 indicates dummy node
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Init environment
entry_label = _create_entry_label(G, charging_stations,
s, initial_soc, capacity)
l_uns[s].insert(entry_label)
l: Label = Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(G, capacity, s)
)
l_uns[s].insert(
l,
cf[l.last_cs]
)
# A priority queue defines which node to visit next.
# The key is the trip time.
prio_queue = PriorityQueue()
prio_queue.insert(s, priority=0, count=0)
q.insert(s, 0)
# run main loop
while True:
try:
minimum_node: Node = q.peak_min()
minimum_node: Node = prio_queue.peak_min()
except KeyError:
# empty queue
break
......@@ -64,65 +63,36 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
l_set[minimum_node].add(label_minimum_node)
if minimum_node == t:
return SoCFunction(
label_minimum_node,
cf[label_minimum_node.last_cs]
).minimum
return f_soc[label_minimum_node].minimum
# handle charging stations
if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs:
cf_last_cs = cf[label_minimum_node.last_cs]
cf_minimum_node = cf[minimum_node]
if cf_minimum_node.c > cf_last_cs.c:
# Only charge the minimum at the last charge station
# and continue charging at this station.
old_soc_function: SoCFunction = SoCFunction(
label_minimum_node, cf_last_cs
)
t_trip_old = label_minimum_node.t_trip
t_charge: Time = old_soc_function.minimum - t_trip_old
label_new = Label(
t_trip=t_trip_old + t_charge,
soc_last_cs=old_soc_function(t_trip_old + t_charge),
last_cs=minimum_node,
soc_profile_cs_v=soc_profile_factory(
G, capacity, minimum_node
)
)
l_uns[minimum_node].insert(
label_new,
cf_minimum_node
)
# update priority queue
try:
label_minimum_node = l_uns[minimum_node].peak_min()
except KeyError:
# l_uns[v] empty
q.delete_min()
else:
q.insert(minimum_node, label_minimum_node.key)
if minimum_node in charging_stations and \
not minimum_node == label_minimum_node.last_cs:
for t_charge in _calc_optimal_t_charge(cf, label_minimum_node, minimum_node, capacity):
label_new = label_factory.spawn_label(minimum_node,
label_minimum_node,
t_charge)
l_uns[minimum_node].insert(label_new)
# Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step.
_update_priority_queue(f_soc, prio_queue, l_uns, minimum_node)
# scan outgoing arcs
for n in G.neighbors(minimum_node):
# Create SoC Profile for getting from minimum_node to n
soc_profile = label_minimum_node.soc_profile_cs_v + \
soc_profile_factory(G, capacity, minimum_node, n)
if not soc_profile(capacity) == -inf:
# It is possible to get from minimum_node to n
if _is_feasible_path(soc_profile, capacity):
l_new = Label(
label_minimum_node.t_trip + distance(G, minimum_node, n),
label_minimum_node.soc_last_cs,
label_minimum_node.last_cs,
soc_profile
t_trip=label_minimum_node.t_trip + distance(G, minimum_node, n),
soc_last_cs=label_minimum_node.soc_last_cs,
last_cs=label_minimum_node.last_cs,
soc_profile_cs_v=soc_profile
)
try:
l_uns[n].insert(
l_new,
cf[l_new.last_cs]
)
l_uns[n].insert(l_new)
except ValueError:
# Infeasible because last_cs might be an
# dummy charging station. Therefore, the path might
......@@ -135,4 +105,95 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
pass
else:
if l_new == l_uns[n].peak_min():
q.insert(n, l_new.key)
key, count = _key(l_new, f_soc)
prio_queue.insert(n, priority=key, count=count)
def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, capacity: SoC) -> List[Time]:
f_soc_breakpoints = SoCFunction(label_v, cf[label_v.last_cs]).breakpoints
t_charge = []
if cf[v] > cf[label_v.last_cs]:
# Faster charging station -> charge as soon as possible
t_charge.append(f_soc_breakpoints[0].t - label_v.t_trip)
elif f_soc_breakpoints[-1].soc < capacity:
# Slower charging station might still be dominating
# because the soc cannot be more than the full capacity
# decreased by the trip costs. This will be refilled at this station.
t_charge.append(f_soc_breakpoints[-1].t - label_v.t_trip)
return t_charge
def _key(label, f_soc):
soc_function = f_soc[label]
t_min = soc_function.minimum
soc_min = soc_function(t_min)
return t_min, soc_min
def _create_entry_label(
G: nx.Graph,
charging_stations: set,
s: Node,
initial_soc: SoC,
capacity: SoC) -> Label:
"""
Create dummy charging station with initial soc as constant charging
function.
:param G: Graph
:param charging_stations: Set of charging stations in Graph G
:param s: Starting Node
:param initial_soc: Initial SoC at beginng of the route
:param capacity: The restricting battery capacity
:return: Label for the starting Node
"""
dummy_node: Node = len(G.nodes)
# Charging coefficient 0 indicates dummy node
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Register dummy charging station as the last
# seen charging station before s.
return Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(G, capacity, s)
)
def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool:
"""Check, if possible to traverse path at least with full battery."""
return not soc_profile(capacity) == -inf
def _update_priority_queue(
f_soc: SoCFunctionMap,
prio_queue: PriorityQueue,
l_uns: Dict[int, LabelPriorityQueue],
node: Node):
"""
Update key of a node the priority queue according to
its minimum label.
"""
try:
minimum_label: Label = l_uns[node].peak_min()
except KeyError:
# l_uns[v] empty
prio_queue.delete_min()
else:
key, count = _key(minimum_label, f_soc)
prio_queue.insert(node, priority=key, count=count)
def _apply_final_constraints(G: nx.Graph, t: Node, final_soc: SoC) -> Node:
temp_final_node = len(G)
G.add_node(temp_final_node)
G.add_edge(t, temp_final_node, weight=0, c=final_soc)
return temp_final_node
from typing import Dict
from math import inf
import networkx as nx
from evrouting.utils import PriorityQueue
from evrouting.T import SoC, Time, Node
from evrouting.T import SoC, Time
from .factories import charging_function
from .T import Label, SoCFunction, ChargingFunction
from .T import Label, SoCFunction
from .factories import ChargingFunctionMap
class LabelPriorityQueue(PriorityQueue):
def insert(self, label: Label, cf: ChargingFunction):
def __init__(self, cf: ChargingFunctionMap):
super().__init__()
self.cf: ChargingFunctionMap = cf
def insert(self, label: Label):
"""Breaking ties with lowest soc at t_min."""
soc_function = SoCFunction(
label,
cf
self.cf[label.last_cs]
)
t_min: Time = soc_function.minimum
......@@ -30,31 +32,3 @@ class LabelPriorityQueue(PriorityQueue):
priority=t_min,
count=soc_min
)
class ChargingFunctionMap:
"""Maps Nodes to their charging functions."""
def __init__(self, G: nx.Graph, capacity: SoC, initial_soc: SoC = None):
self.map: Dict[Node, ChargingFunction] = {}
self.G: nx.Graph = G
self.capacity: SoC = capacity
self.initial_soc: SoC = initial_soc
def __getitem__(self, node: Node) -> ChargingFunction:
"""
Try to get charging function from cache,
else create function and add to cache.
"""
try:
cf = self.map[node]
except KeyError:
cf = charging_function(
G=self.G,
n=node,
capacity=self.capacity,
initial_soc=self.initial_soc
)
self.map[node] = cf
return cf
......@@ -8,33 +8,60 @@ from ..config import (
)
def test_shortest_path_charge_at_s_and_a():
"""Charging at s."""
path = shortest_path(**init_config(edge_case))
class TestRoutes:
assert path == 3.5
def test_shortest_path_charge_at_s_and_a(self):
"""Charging at s."""
path = shortest_path(**init_config(edge_case))
assert path == 3.5
def test_shortest_path_charge_at_s_only():
"""Charging at s."""
path = shortest_path(**init_config(edge_case_a_slow))
def test_shortest_path_charge_at_s_only(self):
"""Charging at s."""
path = shortest_path(**init_config(edge_case_a_slow))
assert path == 3
assert path == 3
def test_shortest_path_no_charge_s_path_t(self):
"""No charging at s but enough initial SoC to go to t directly."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 4
path = shortest_path(**conf)
def test_shortest_path_no_charge_s_path_t():
"""No charging at s but enough initial SoC to go to t directly."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 4
path = shortest_path(**conf)
assert path == 1
assert path == 1
def test_shortest_path_no_charge_s_path_a(self):
"""No charging at s but just enough SoC to go to t via a."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 2
path = shortest_path(**conf)
assert path == 2
def test_shortest_path_no_charge_s_path_a():
"""No charging at s but just enough SoC to go to t via a."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 2
path = shortest_path(**conf)
assert path == 2
class TestWithFinalSoC:
def test_shortest_path_charge_at_s_and_a(self):
"""Charging at s."""
conf = init_config(edge_case)
conf['final_soc'] = 3
path = shortest_path(**conf)
assert path == 5
def test_shortest_path_charge_at_s_only(self):
"""Charging at s and a to reach final_soc."""
conf = init_config(edge_case_a_slow)
conf['final_soc'] = 3
path = shortest_path(**conf)
assert path == 5
def test_shortest_path_no_charge_s_path_t(self):
"""No charging at s but initial soc."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 4
conf['final_soc'] = 3
path = shortest_path(**conf)
assert path == 2.5
......@@ -6,8 +6,9 @@ from evrouting.charge.T import Label
@pytest.fixture
def q(label, ch_function):
_, _, cf = ch_function
q = LabelPriorityQueue()
q.insert(label, cf)
dummy_cf = {label.last_cs: cf}
q = LabelPriorityQueue(dummy_cf)
q.insert(label)
# create min
label = Label(
......@@ -17,7 +18,8 @@ def q(label, ch_function):
last_cs=1
)
q.insert(label, cf)
dummy_cf[label.last_cs] = cf
q.insert(label)
yield q
del q
......@@ -39,4 +41,4 @@ class TestProrityQueue:
_, _, cf = ch_function
label = q.peak_min()
q.remove_item(label)
q.insert(label, cf)
q.insert(label)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment