Skip to content
Snippets Groups Projects
Commit 258a96eb authored by markn92's avatar markn92
Browse files

git rid of stupid replacement functions

parent fabb001c
No related branches found
No related tags found
No related merge requests found
from typing import Callable, NamedTuple
from typing import Callable, NamedTuple, Union
from math import inf
from evrouting.T import SoC, Wh, ChargingCoefficient, Time, Node
......@@ -202,9 +202,9 @@ class SoCFunction:
self.soc_profile_cs_v: SoCProfile = label.soc_profile_cs_v
self.cf_cs: ChargingFunction = cf_cs
self.breakpoints = self.get_breakpoints()
@property
def breakpoints(self):
def get_breakpoints(self):
breakpoints = [Breakpoint(self.minimum, 0)]
if not self.cf_cs.is_dummy:
breakpoints.append(
......@@ -233,6 +233,22 @@ class SoCFunction:
self.cf_cs(t - self.t_trip, self.soc_last_cs)
)
def calc_optimal_t_charge(self, cs: ChargingFunction) -> Union[Time, None]:
capacity: SoC = self.soc_profile_cs_v.capacity
t_charge = None
if cs > self.cf_cs:
# Faster charging station -> charge as soon as possible
t_charge = self.breakpoints[0].t - self.t_trip
elif self.breakpoints[-1].soc < capacity:
# Slower charging station might still be dominating
# because the soc cannot be more than the full capacity
# decreased by the trip costs. This will be refilled at this station.
t_charge = self.breakpoints[-1].t - self.t_trip
return t_charge
def __lt__(self, other: 'SoCFunction') -> bool:
"""Comparison for dominance check."""
for t_i, soc_i in self.breakpoints:
......
......@@ -31,7 +31,13 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
:param capacity:
:return:
"""
t = _apply_final_constraints(G, t, final_soc)
# Add node that is only connected to the final node and takes no time
# to travel but consumes exactly the amount of energy that should be
# left at t (final_soc). The node becomes the new final node.
dummy_final_node: Node = len(G)
G.add_node(dummy_final_node)
G.add_edge(t, dummy_final_node, weight=0, c=final_soc)
t = dummy_final_node
# Init factories
cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
......@@ -42,10 +48,20 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
l_set: Dict[int, Set[Label]] = {v: set() for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(f_soc_factory, l_set[v]) for v in G}
# Init environment
entry_label = _create_entry_label(G, charging_stations,
s, initial_soc, soc_profile_factory)
l_uns[s].insert(entry_label)
# Add dummy charging station with charging function
# cf(t) = initial_soc (ie charging coefficient is zero).
dummy_node: Node = len(G.nodes)
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Register dummy charging station as the last
# seen charging station before s.
l_uns[s].insert(Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(s)
))
# A priority queue defines which node to visit next.
# The key is the trip time.
......@@ -65,10 +81,7 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
if minimum_node in charging_stations and \
not minimum_node == label_minimum_node.last_cs:
f_soc: SoCFunction = f_soc_factory(label_minimum_node)
t_charge = _calc_optimal_t_charge(
current_cs=cf_map[minimum_node],
f_soc=f_soc,
capacity=capacity)
t_charge = f_soc.calc_optimal_t_charge(cf_map[minimum_node])
if t_charge is not None:
# Spawn new label at t_charge
......@@ -83,7 +96,14 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
# Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step.
_update_priority_queue(f_soc_factory, prio_queue, l_uns, minimum_node)
try:
prio_queue.insert(
item=minimum_node,
**keys(f_soc_factory(l_uns[minimum_node].peak_min()))
)
except KeyError:
# l_uns[v] empty
prio_queue.delete_min()
# scan outgoing arcs
for n in G.neighbors(minimum_node):
......@@ -117,82 +137,4 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
continue
if is_new_min_label:
prio_queue.insert(n, **keys(f_soc_factory, l_new))
def _calc_optimal_t_charge(current_cs: ChargingFunction,
f_soc: SoCFunction,
capacity: SoC) -> Union[Time, None]:
f_soc_breakpoints = f_soc.breakpoints
t_charge = None
if current_cs > f_soc.cf_cs:
# Faster charging station -> charge as soon as possible
t_charge = f_soc_breakpoints[0].t - f_soc.t_trip
elif f_soc_breakpoints[-1].soc < capacity:
# Slower charging station might still be dominating
# because the soc cannot be more than the full capacity
# decreased by the trip costs. This will be refilled at this station.
t_charge = f_soc_breakpoints[-1].t - f_soc.t_trip
return t_charge
def _create_entry_label(
G: nx.Graph,
charging_stations: set,
s: Node,
initial_soc: SoC,
soc_profile_factory: SoCProfileFactory
) -> Label:
"""
Create dummy charging station with initial soc as constant charging
function.
:param G: Graph
:param charging_stations: Set of charging stations in Graph G
:param s: Starting Node
:param initial_soc: Initial SoC at beginng of the route
:param capacity: The restricting battery capacity
:return: Label for the starting Node
"""
dummy_node: Node = len(G.nodes)
# Charging coefficient 0 indicates dummy node
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Register dummy charging station as the last
# seen charging station before s.
return Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(s)
)
def _update_priority_queue(
f_soc: SoCFunctionFactory,
prio_queue: PriorityQueue,
l_uns: Dict[int, LabelPriorityQueue],
node: Node):
"""
Update key of a node the priority queue according to
its minimum label.
"""
try:
minimum_label: Label = l_uns[node].peak_min()
except KeyError:
# l_uns[v] empty
prio_queue.delete_min()
else:
prio_queue.insert(node, **keys(f_soc, minimum_label))
def _apply_final_constraints(G: nx.Graph, t: Node, final_soc: SoC) -> Node:
temp_final_node = len(G)
G.add_node(temp_final_node)
G.add_edge(t, temp_final_node, weight=0, c=final_soc)
return temp_final_node
prio_queue.insert(n, **keys(f_soc_factory(l_new)))
......@@ -4,7 +4,7 @@ from math import inf
from evrouting.utils import PriorityQueue
from evrouting.T import SoC, Time
from .T import Label
from .T import Label, SoCFunction
from .factories import SoCFunctionFactory
......@@ -16,7 +16,7 @@ class LabelPriorityQueue(PriorityQueue):
def insert(self, label: Label):
"""Breaking ties with lowest soc at t_min."""
super().insert(item=label, **keys(self.f_soc_factory, label))
super().insert(item=label, **keys(self.f_soc_factory(label)))
if self.peak_min() == label:
self.dominance_check()
......@@ -40,14 +40,13 @@ class LabelPriorityQueue(PriorityQueue):
return
def keys(f_soc_factory: SoCFunctionFactory, label: Label) -> Dict:
soc_function = f_soc_factory(label)
t_min: Time = soc_function.minimum
def keys(f_soc: SoCFunction) -> Dict:
t_min: Time = f_soc.minimum
# Might happen because of dummy charge stations
if t_min == -inf:
raise ValueError('Infeasible label.')
soc_min: SoC = soc_function(t_min)
soc_min: SoC = f_soc(t_min)
return {'priority': t_min, 'count': soc_min}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment