Newer
Older
from ..graph_tools import distance
from .T import SoCFunction, Label
from .utils import LabelPriorityQueue
def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
initial_soc: SoC, final_soc: SoC, capacity: SoC):
"""
Calculates shortest path using the CHarge algorithm.
:param G: Input Graph
:param s: Start Node identifier
:param t: End Node identifier
l_set: Dict[int, set] = {v: set() for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {
v: LabelPriorityQueue() for v in G
}
# Dummy vertex without incident edges that is (temporarily) added to G
dummy_node: Node = len(G.nodes)
# Charging coefficient 0 indicates dummy node
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
l: Label = Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=factories.soc_profile(G, capacity, s)
)
l_uns[s].insert(
l,
factories.charging_function(G, l.last_cs, capacity, initial_soc)
)
q.insert(s, 0)
# run main loop
while True:
try:
label_minimum_node: Label = l_uns[minimum_node].delete_min()
l_set[minimum_node].add(label_minimum_node)
if minimum_node == t:
return SoCFunction(
label_minimum_node,
factories.charging_function(
G,
label_minimum_node.last_cs,
capacity,
initial_soc
)
).minimum
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs:
cf_last_cs = factories.charging_function(
G,
label_minimum_node.last_cs,
capacity,
initial_soc # Use here in case cs is a dummy station
)
cf_minimum_node = factories.charging_function(
G,
minimum_node,
capacity,
initial_soc # Use here in case cs is a dummy station
)
if cf_minimum_node.c > cf_last_cs.c:
# Only charge the minimum at the last charge station
# and continue charging at this station.
old_soc_function: SoCFunction = SoCFunction(
label_minimum_node, cf_last_cs
)
t_trip_old = label_minimum_node.t_trip
t_charge: Time = old_soc_function.minimum - t_trip_old
label_new = Label(
t_trip=t_trip_old + t_charge,
soc_last_cs=old_soc_function(t_trip_old + t_charge),
last_cs=minimum_node,
soc_profile_cs_v=factories.soc_profile(
G, capacity, minimum_node
)
)
l_uns[minimum_node].insert(
label_new,
cf_minimum_node
)
try:
label_minimum_node = l_uns[minimum_node].peak_min()
except KeyError:
# l_uns[v] empty
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
for n in G.neighbors(minimum_node):
# Create SoC Profile for getting from minimum_node to n
soc_profile = label_minimum_node.soc_profile_cs_v + \
factories.soc_profile(G, capacity, minimum_node, n)
if not soc_profile(capacity) == -inf:
# It is possible to get from minimum_node to n
l_new = Label(
label_minimum_node.t_trip + distance(G, minimum_node, n),
label_minimum_node.soc_last_cs,
label_minimum_node.last_cs,
soc_profile
)
try:
l_uns[n].insert(
l_new,
factories.charging_function(
G,
l_new.last_cs,
capacity,
initial_soc
)
)
except ValueError:
pass
else:
if l_new == l_uns[n].peak_min():