Newer
Older
from evrouting.charge.factories import LabelsFactory, ChargingFunctionMap, soc_profile_factory
from .T import SoCFunction, SoCProfile, Label
from .utils import LabelPriorityQueue
__all__ = ['shortest_path']
def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
initial_soc: SoC, final_soc: SoC, capacity: SoC):
"""
Calculates shortest path using the CHarge algorithm.
:param G: Input Graph
:param s: Start Node identifier
:param t: End Node identifier
cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue() for v in G}
# Init environment
entry_label = _create_entry_label(G, charging_stations, s, initial_soc, capacity)
l_uns[s].insert(entry_label, cf[entry_label.last_cs])
# A priority queue defines which node to visit next.
# The key is the trip time.
prio_queue = PriorityQueue()
prio_queue.insert(s, 0)
label_minimum_node: Label = l_uns[minimum_node].delete_min()
l_set[minimum_node].add(label_minimum_node)
return SoCFunction(label_minimum_node,
cf[label_minimum_node.last_cs]
).minimum
if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs:
if cf[minimum_node] > cf[label_minimum_node.last_cs]:
label_new = label_factory.spawn_label(minimum_node, label_minimum_node)
l_uns[minimum_node].insert(label_new, cf[minimum_node])
# Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step.
for n in G.neighbors(minimum_node):
# Create SoC Profile for getting from minimum_node to n
soc_profile = label_minimum_node.soc_profile_cs_v + \
t_trip=label_minimum_node.t_trip + distance(G, minimum_node, n),
soc_last_cs=label_minimum_node.soc_last_cs,
last_cs=label_minimum_node.last_cs,
soc_profile_cs_v=soc_profile
# Infeasible because last_cs might be an
# dummy charging station. Therefore, the path might
# be infeasible even though one could reach it with a full
# battery, because charging is not possible at dummy
# stations.
#
# That means, the SoC and thereby the range is restricted
# to the SoC at the last cs (soc_last_cs).
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def _create_entry_label(
G: nx.Graph,
charging_stations: set,
s: Node,
initial_soc: SoC,
capacity: SoC) -> Label:
"""
Create dummy charging station with initial soc as constant charging
function.
:param G: Graph
:param charging_stations: Set of charging stations in Graph G
:param s: Starting Node
:param initial_soc: Initial SoC at beginng of the route
:param capacity: The restricting battery capacity
:return: Label for the starting Node
"""
dummy_node: Node = len(G.nodes)
# Charging coefficient 0 indicates dummy node
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Register dummy charging station as the last
# seen charging station before s.
return Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(G, capacity, s)
)
def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool:
"""Check, if possible to traverse path at least with full battery."""
return not soc_profile(capacity) == -inf
def _update_priority_queue(
l_uns: Dict[int, LabelPriorityQueue],
node: Node):
"""
Update key of a node the priority queue according to
its minimum label.
"""
try:
minimum_label = l_uns[node].peak_min()
except KeyError:
# l_uns[v] empty