Newer
Older
def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
initial_soc: SoC, final_soc: SoC, capacity: SoC):
:param G:
:param charging_stations:
:param s:
:param t:
:param initial_soc:
:param final_soc:
:param capacity:
# Add node that is only connected to the final node and takes no time
# to travel but consumes exactly the amount of energy that should be
# left at t (final_soc). The node becomes the new final node.
dummy_final_node: Node = len(G)
G.add_node(dummy_final_node)
G.add_edge(t, dummy_final_node, weight=0, c=final_soc)
t = dummy_final_node
# Init factories
cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
f_soc_factory = SoCFunctionFactory(cf_map)
soc_profile_factory = SoCProfileFactory(G, capacity)
l_uns: Dict[int, LabelPriorityQueue] = {
v: LabelPriorityQueue(f_soc_factory, l_set[v]) for v in G
}
# Add dummy charging station with charging function
# cf(t) = initial_soc (ie charging coefficient is zero).
dummy_node: Node = len(G.nodes)
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Register dummy charging station as the last
# seen charging station before s.
l_uns[s].insert(Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(s)
))
# A priority queue defines which node to visit next.
# The key is the trip time.
prio_queue = PriorityQueue()
if node_min in charging_stations and node_min != label_node_min.last_cs:
f_soc: SoCFunction = f_soc_factory(label_node_min)
t_charge = f_soc.calc_optimal_t_charge(cf_map[node_min])
t_trip=label_node_min.t_trip + t_charge,
soc_last_cs=f_soc(label_node_min.t_trip + t_charge),
last_cs=node_min,
soc_profile_cs_v=soc_profile_factory(node_min)
# Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step.
item=node_min,
**keys(f_soc_factory(l_uns[node_min].peak_min()))
)
except KeyError:
# l_uns[v] empty
prio_queue.delete_min()
soc_profile = label_node_min.soc_profile_cs_v + \
soc_profile_factory(node_min, n)
if cf_map[label_node_min.last_cs].is_dummy \
and soc_profile.path_cost > label_node_min.soc_last_cs:
# Dummy charging stations cannot increase SoC.
# Therefore paths that consume more energy than the SoC
# when arriving at the (dummy) station are unfeasible.
continue
label_neighbour: Label = Label(
t_trip=label_node_min.t_trip + distance(G, node_min, n),
soc_last_cs=label_node_min.soc_last_cs,
last_cs=label_node_min.last_cs,
# Update queue if entered label is the new minimum label
# of the neighbour.
is_new_min_label: bool = label_neighbour == l_uns[n].peak_min()
except KeyError:
continue
if is_new_min_label:
prio_queue.insert(n, **keys(f_soc_factory(label_neighbour)))