Newer
Older
from evrouting.charge.factories import (
LabelsFactory, ChargingFunctionMap, soc_profile_factory
)
from .T import SoCFunction, SoCProfile, Label
from .utils import LabelPriorityQueue
__all__ = ['shortest_path']
def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
initial_soc: SoC, final_soc: SoC, capacity: SoC):
"""
Calculates shortest path using the CHarge algorithm.
:param G: Input Graph
:param s: Start Node identifier
:param t: End Node identifier
cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(cf) for v in G}
entry_label = _create_entry_label(G, charging_stations,
s, initial_soc, capacity)
l_uns[s].insert(entry_label)
# A priority queue defines which node to visit next.
# The key is the trip time.
prio_queue = PriorityQueue()
label_minimum_node: Label = l_uns[minimum_node].delete_min()
l_set[minimum_node].add(label_minimum_node)
return SoCFunction(label_minimum_node,
cf[label_minimum_node.last_cs]
).minimum
if minimum_node in charging_stations and \
not minimum_node == label_minimum_node.last_cs:
label_new = label_factory.spawn_label(minimum_node,
label_minimum_node)
l_uns[minimum_node].insert(label_new)
# Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step.
for n in G.neighbors(minimum_node):
# Create SoC Profile for getting from minimum_node to n
soc_profile = label_minimum_node.soc_profile_cs_v + \
t_trip=label_minimum_node.t_trip + distance(G, minimum_node, n),
soc_last_cs=label_minimum_node.soc_last_cs,
last_cs=label_minimum_node.last_cs,
soc_profile_cs_v=soc_profile
# Infeasible because last_cs might be an
# dummy charging station. Therefore, the path might
# be infeasible even though one could reach it with a full
# battery, because charging is not possible at dummy
# stations.
#
# That means, the SoC and thereby the range is restricted
# to the SoC at the last cs (soc_last_cs).
key, count = _key(l_new, cf[l_new.last_cs])
prio_queue.insert(n, priority=key, count=count)
def _key(label, cf):
soc_function = SoCFunction(
label,
cf
)
t_min = soc_function.minimum
soc_min = soc_function(t_min)
return t_min, soc_min
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def _create_entry_label(
G: nx.Graph,
charging_stations: set,
s: Node,
initial_soc: SoC,
capacity: SoC) -> Label:
"""
Create dummy charging station with initial soc as constant charging
function.
:param G: Graph
:param charging_stations: Set of charging stations in Graph G
:param s: Starting Node
:param initial_soc: Initial SoC at beginng of the route
:param capacity: The restricting battery capacity
:return: Label for the starting Node
"""
dummy_node: Node = len(G.nodes)
# Charging coefficient 0 indicates dummy node
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Register dummy charging station as the last
# seen charging station before s.
return Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(G, capacity, s)
)
def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool:
"""Check, if possible to traverse path at least with full battery."""
return not soc_profile(capacity) == -inf
def _update_priority_queue(
l_uns: Dict[int, LabelPriorityQueue],
node: Node):
"""
Update key of a node the priority queue according to
its minimum label.
"""
try:
key, count = _key(minimum_label, cf[minimum_label.last_cs])
prio_queue.insert(node, priority=key, count=count)
def _apply_final_constraints(G: nx.Graph, t: Node, final_soc: SoC) -> Node:
temp_final_node = len(G)
G.add_node(temp_final_node)
G.add_edge(t, temp_final_node, weight=0, c=final_soc)
return temp_final_node