Newer
Older
"""
Implementation of the CHArge algorithm [0] with two further constraints:
1. There are no negative path costs (ie no recurpation).
2. All charging stations have linear charging functions.
[0] https://dl.acm.org/doi/10.1145/2820783.2820826
"""
from evrouting.graph_tools import distance
from evrouting.charge.T import SoCFunction, Label
def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
:param G: Graph to work on
:param charging_stations: Set containing identifiers of all
charging stations
:param s: Start Node
:param t: End Node
:param initial_soc: SoC at s
:param final_soc: SoC at t
:param capacity: Battery capacity
t, factories, queues = _setup(
G, charging_stations, capacity, initial_soc, final_soc, s, t
)
f_soc_factory: SoCFunctionFactory = factories['f_soc']
soc_profile_factory: SoCProfileFactory = factories['soc_profile']
cf_map: ChargingFunctionMap = factories['cf']
l_set: Dict[int, List[Label]] = queues['settled labels']
l_uns: Dict[int, LabelPriorityQueue] = queues['unsettled labels']
prio_queue: PriorityQueue = queues['priority queue']
if node_min in charging_stations and node_min != label_node_min.last_cs:
f_soc: SoCFunction = f_soc_factory(label_node_min)
t_charge = f_soc.calc_optimal_t_charge(cf_map[node_min])
t_trip=label_node_min.t_trip + t_charge,
soc_last_cs=f_soc(label_node_min.t_trip + t_charge),
last_cs=node_min,
soc_profile_cs_v=soc_profile_factory(node_min)
# Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step.
item=node_min,
**keys(f_soc_factory(l_uns[node_min].peak_min()))
)
except KeyError:
# l_uns[v] empty
prio_queue.delete_min()
soc_profile = label_node_min.soc_profile_cs_v + \
soc_profile_factory(node_min, n)
if cf_map[label_node_min.last_cs].is_dummy \
and soc_profile.path_cost > label_node_min.soc_last_cs:
# Dummy charging stations cannot increase SoC.
# Therefore paths that consume more energy than the SoC
# when arriving at the (dummy) station are unfeasible.
continue
label_neighbour: Label = Label(
t_trip=label_node_min.t_trip + distance(G, node_min, n),
soc_last_cs=label_node_min.soc_last_cs,
last_cs=label_node_min.last_cs,
# Update queue if entered label is the new minimum label
# of the neighbour.
prio_queue.insert(n, **keys(f_soc_factory(label_neighbour)))
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
initial_soc: SoC, final_soc: SoC, s: Node, t: Node
) -> Tuple[Node, Dict, Dict]:
"""
Initialises the data structures and graph setup.
:returns: Tupel(t, factories, queues):
:t: The new dummy final node taking care of the final SoC.
:factories: A dict containing factory functions for:
:```factories['f_soc']```: The SoC Functions
:```factories['cf']```: The Charging Functions
:```factories['soc_profile']```: The SoC Profiles
:queues: A dict containing initialized queues for the algorithm.
:```queues['settled labels']```:
:```queues['unsettled labels']```:
:```queues['priority queue'']```:
"""
# Add node that is only connected to the final node and takes no time
# to travel but consumes exactly the amount of energy that should be
# left at t (final_soc). The node becomes the new final node.
dummy_final_node: Node = len(G)
G.add_node(dummy_final_node)
G.add_edge(t, dummy_final_node, weight=0, c=final_soc)
t = dummy_final_node
# Init factories
cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
f_soc_factory = SoCFunctionFactory(cf_map)
soc_profile_factory = SoCProfileFactory(G, capacity)
# Init maps to manage labels
l_set: Dict[int, List[Label]] = {v: [] for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {
v: LabelPriorityQueue(f_soc_factory, l_set[v]) for v in G
}
# Add dummy charging station with charging function
# cf(t) = initial_soc (ie charging coefficient is zero).
dummy_node: Node = len(G.nodes)
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Register dummy charging station as the last
# seen charging station before s.
l_uns[s].insert(Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(s)
))
# A priority queue defines which node to visit next.
# The key is the trip time.
prio_queue: PriorityQueue = PriorityQueue()
prio_queue.insert(s, priority=0, count=0)
return (t, # New final Node
{ # factories
'f_soc': f_soc_factory,
'cf': cf_map,
'soc_profile': soc_profile_factory
},
{ # queues
'settled labels': l_set,
'unsettled labels': l_uns,
'priority queue': prio_queue
}
)