Newer
Older
LabelsFactory,
ChargingFunctionMap,
SoCFunctionMap,
soc_profile_factory
from .utils import LabelPriorityQueue
__all__ = ['shortest_path']
def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
initial_soc: SoC, final_soc: SoC, capacity: SoC):
:param G:
:param charging_stations:
:param s:
:param t:
:param initial_soc:
:param final_soc:
:param capacity:
cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
label_factory = LabelsFactory(G, capacity, f_soc, initial_soc)
l_set: Dict[int, Set[Label]] = {v: set() for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(cf, l_set[v]) for v in G}
entry_label = _create_entry_label(G, charging_stations,
s, initial_soc, capacity)
l_uns[s].insert(entry_label)
# A priority queue defines which node to visit next.
# The key is the trip time.
prio_queue = PriorityQueue()
label_minimum_node: Label = l_uns[minimum_node].delete_min()
l_set[minimum_node].add(label_minimum_node)
return f_soc[label_minimum_node].minimum
if minimum_node in charging_stations and \
not minimum_node == label_minimum_node.last_cs:
for t_charge in _calc_optimal_t_charge(cf, label_minimum_node, minimum_node, capacity):
# Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step.
_update_priority_queue(f_soc, prio_queue, l_uns, minimum_node)
for n in G.neighbors(minimum_node):
# Create SoC Profile for getting from minimum_node to n
soc_profile = label_minimum_node.soc_profile_cs_v + \
t_trip=label_minimum_node.t_trip + distance(G, minimum_node, n),
soc_last_cs=label_minimum_node.soc_last_cs,
last_cs=label_minimum_node.last_cs,
soc_profile_cs_v=soc_profile
# Infeasible because last_cs might be an
# dummy charging station. Therefore, the path might
# be infeasible even though one could reach it with a full
# battery, because charging is not possible at dummy
# stations.
#
# That means, the SoC and thereby the range is restricted
# to the SoC at the last cs (soc_last_cs).
continue
try:
is_new_min_label: bool = l_new == l_uns[n].peak_min()
except KeyError:
continue
if is_new_min_label:
key, count = _key(l_new, f_soc)
prio_queue.insert(n, priority=key, count=count)
def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, capacity: SoC) -> List[Time]:
f_soc_breakpoints = SoCFunction(label_v, cf[label_v.last_cs]).breakpoints
t_charge = []
if cf[v] > cf[label_v.last_cs]:
# Faster charging station -> charge as soon as possible
t_charge.append(f_soc_breakpoints[0].t - label_v.t_trip)
elif f_soc_breakpoints[-1].soc < capacity:
# Slower charging station might still be dominating
# because the soc cannot be more than the full capacity
# decreased by the trip costs. This will be refilled at this station.
t_charge.append(f_soc_breakpoints[-1].t - label_v.t_trip)
return t_charge
def _key(label, f_soc):
soc_function = f_soc[label]
t_min = soc_function.minimum
soc_min = soc_function(t_min)
return t_min, soc_min
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def _create_entry_label(
G: nx.Graph,
charging_stations: set,
s: Node,
initial_soc: SoC,
capacity: SoC) -> Label:
"""
Create dummy charging station with initial soc as constant charging
function.
:param G: Graph
:param charging_stations: Set of charging stations in Graph G
:param s: Starting Node
:param initial_soc: Initial SoC at beginng of the route
:param capacity: The restricting battery capacity
:return: Label for the starting Node
"""
dummy_node: Node = len(G.nodes)
# Charging coefficient 0 indicates dummy node
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Register dummy charging station as the last
# seen charging station before s.
return Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(G, capacity, s)
)
def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool:
"""Check, if possible to traverse path at least with full battery."""
return not soc_profile(capacity) == -inf
def _update_priority_queue(
l_uns: Dict[int, LabelPriorityQueue],
node: Node):
"""
Update key of a node the priority queue according to
its minimum label.
"""
try:
key, count = _key(minimum_label, f_soc)
prio_queue.insert(node, priority=key, count=count)
def _apply_final_constraints(G: nx.Graph, t: Node, final_soc: SoC) -> Node:
temp_final_node = len(G)
G.add_node(temp_final_node)
G.add_edge(t, temp_final_node, weight=0, c=final_soc)
return temp_final_node