Skip to content
Snippets Groups Projects
Commit d17d5421 authored by markn92's avatar markn92
Browse files

charging times working

parent 35ac8209
No related branches found
No related tags found
No related merge requests found
...@@ -160,6 +160,8 @@ class Label(NamedTuple): ...@@ -160,6 +160,8 @@ class Label(NamedTuple):
soc_last_cs: SoC soc_last_cs: SoC
last_cs: Node last_cs: Node
soc_profile_cs_v: SoCProfile soc_profile_cs_v: SoCProfile
parent_label: Union['Label', None] = None
parent_node: Union[Node, None] = None
class Breakpoint(NamedTuple): class Breakpoint(NamedTuple):
......
...@@ -7,11 +7,11 @@ Implementation of the CHArge algorithm [0] with two further constraints: ...@@ -7,11 +7,11 @@ Implementation of the CHArge algorithm [0] with two further constraints:
[0] https://dl.acm.org/doi/10.1145/2820783.2820826 [0] https://dl.acm.org/doi/10.1145/2820783.2820826
""" """
from typing import Dict, List, Tuple, Set from typing import Dict, List, Tuple, Set, Union
from math import inf from math import inf
import networkx as nx import networkx as nx
from evrouting.T import Node, SoC from evrouting.T import Node, SoC, Time
from evrouting.utils import PriorityQueue from evrouting.utils import PriorityQueue
from evrouting.graph_tools import distance from evrouting.graph_tools import distance
from evrouting.charge.T import SoCFunction, Label from evrouting.charge.T import SoCFunction, Label
...@@ -24,7 +24,7 @@ from evrouting.charge.factories import ( ...@@ -24,7 +24,7 @@ from evrouting.charge.factories import (
def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
initial_soc: SoC, final_soc: SoC, capacity: SoC): initial_soc: SoC, final_soc: SoC, capacity: SoC) -> Dict:
""" """
Calculates shortest path using the CHarge algorithm. Calculates shortest path using the CHarge algorithm.
...@@ -39,6 +39,7 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, ...@@ -39,6 +39,7 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
:return: :return:
""" """
actual_t: Node = t
t, factories, queues = _setup( t, factories, queues = _setup(
G, charging_stations, capacity, initial_soc, final_soc, s, t G, charging_stations, capacity, initial_soc, final_soc, s, t
) )
...@@ -61,7 +62,10 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, ...@@ -61,7 +62,10 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
l_set[node_min].append(label_node_min) l_set[node_min].append(label_node_min)
if node_min == t: if node_min == t:
return f_soc_factory(label_node_min).minimum return _result(
label_node_min,
f_soc_factory(label_node_min).minimum,
node_min)
# Handle charging stations # Handle charging stations
if node_min in charging_stations and node_min != label_node_min.last_cs: if node_min in charging_stations and node_min != label_node_min.last_cs:
...@@ -75,12 +79,14 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, ...@@ -75,12 +79,14 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
t_trip=label_node_min.t_trip + t_charge, t_trip=label_node_min.t_trip + t_charge,
soc_last_cs=f_soc(label_node_min.t_trip + t_charge), soc_last_cs=f_soc(label_node_min.t_trip + t_charge),
last_cs=node_min, last_cs=node_min,
soc_profile_cs_v=soc_profile_factory(node_min) soc_profile_cs_v=soc_profile_factory(node_min),
parent_node=node_min,
parent_label=label_node_min
) )
) )
# Update priority queue. This node might have gotten a new # Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step. # minimum label spawned is the previous step.
try: try:
prio_queue.insert( prio_queue.insert(
item=node_min, item=node_min,
...@@ -108,7 +114,9 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, ...@@ -108,7 +114,9 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
t_trip=label_node_min.t_trip + distance(G, node_min, n), t_trip=label_node_min.t_trip + distance(G, node_min, n),
soc_last_cs=label_node_min.soc_last_cs, soc_last_cs=label_node_min.soc_last_cs,
last_cs=label_node_min.last_cs, last_cs=label_node_min.last_cs,
soc_profile_cs_v=soc_profile soc_profile_cs_v=soc_profile,
parent_node=node_min,
parent_label=label_node_min
) )
l_uns[n].insert(label_neighbour) l_uns[n].insert(label_neighbour)
...@@ -122,6 +130,8 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node, ...@@ -122,6 +130,8 @@ def shortest_path(G: nx.Graph, charging_stations: Set[Node], s: Node, t: Node,
if is_new_min: if is_new_min:
prio_queue.insert(n, **keys(f_soc_factory(label_neighbour))) prio_queue.insert(n, **keys(f_soc_factory(label_neighbour)))
return _result()
def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
initial_soc: SoC, final_soc: SoC, s: Node, t: Node initial_soc: SoC, final_soc: SoC, s: Node, t: Node
...@@ -171,7 +181,9 @@ def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, ...@@ -171,7 +181,9 @@ def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
t_trip=0, t_trip=0,
soc_last_cs=initial_soc, soc_last_cs=initial_soc,
last_cs=dummy_node, last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(s) soc_profile_cs_v=soc_profile_factory(s),
parent_node=None,
parent_label=None
)) ))
# A priority queue defines which node to visit next. # A priority queue defines which node to visit next.
...@@ -191,3 +203,50 @@ def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC, ...@@ -191,3 +203,50 @@ def _setup(G: nx.Graph, charging_stations: Set[Node], capacity: SoC,
'priority queue': prio_queue 'priority queue': prio_queue
} }
) )
def _result(label: Label = None, f_soc_min: Time = None,
node: Node = None) -> Dict:
"""
Returns a dict with two fields, as described below.
:param label: The final label of the algorithm
:param f_soc_min: The min time of the SoC Function of the final label
:param node: The final node.
:return Time result['trip_time']: The overall trip time ```f_soc_min```
:return List[Tuple[Time, Node]] result['path']: List of Nodes passed
along the path. It is modelled as Tuples of Trip Time and Node.
If a Node appears twice in a row, it means, the battery got
charged. The charging time is the difference of the trip times.
"""
if any(arg is None for arg in [label, f_soc_min, node]):
return {'trip_time': None, 'path': []}
t_charge_map = {label.last_cs: f_soc_min - label.t_trip}
path = []
# Skip inserted extra node
node = label.parent_node
label = label.parent_label
time = f_soc_min
while label is not None:
if node == label.parent_node:
# Label got spawned at fixing t_charge of the parent's label
# last_cs. For the current label holds: label.last_cs == node
t_charge_map[label.parent_label.last_cs] = label.t_trip - label.parent_label.t_trip
else:
path.append((node, t_charge_map.get(node, 0)))
"""
if node not in t_charge_map:
t_trip = label.t_trip - label.parent_label.t_trip
time -= t_trip
path.append((node, time))
else:
time -= t_charge_map[node]
path.append((node, t_charge_map[node]))
"""
node = label.parent_node
label = label.parent_label
return {'trip_time': f_soc_min, 'path': path[::-1]}
...@@ -12,31 +12,35 @@ class TestRoutes: ...@@ -12,31 +12,35 @@ class TestRoutes:
def test_shortest_path_charge_at_s_and_a(self): def test_shortest_path_charge_at_s_and_a(self):
"""Charging at s.""" """Charging at s."""
path = shortest_path(**init_config(edge_case)) result = shortest_path(**init_config(edge_case))
assert path == 3.5 assert result['trip_time'] == 3.5
assert result['path'] == [(0, 0), (0, 1), (1, 2), (1, 2.5), (2, 3.5)]
def test_shortest_path_charge_at_s_only(self): def test_shortest_path_charge_at_s_only(self):
"""Charging at s.""" """Charging at s."""
path = shortest_path(**init_config(edge_case_a_slow)) result = shortest_path(**init_config(edge_case_a_slow))
assert path == 3 assert result['trip_time'] == 3
assert result['path'] == [(0, 0), (0, 2), (2, 3)]
def test_shortest_path_no_charge_s_path_t(self): def test_shortest_path_no_charge_s_path_t(self):
"""No charging at s but enough initial SoC to go to t directly.""" """No charging at s but enough initial SoC to go to t directly."""
conf = init_config(edge_case_start_node_no_cs) conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 4 conf['initial_soc'] = 4
path = shortest_path(**conf) result = shortest_path(**conf)
assert path == 1 assert result['trip_time'] == 1
assert result['path'] == [(0, 0), (2, 1)]
def test_shortest_path_no_charge_s_path_a(self): def test_shortest_path_no_charge_s_path_a(self):
"""No charging at s but just enough SoC to go to t via a.""" """No charging at s but just enough SoC to go to t via a."""
conf = init_config(edge_case_start_node_no_cs) conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 2 conf['initial_soc'] = 2
path = shortest_path(**conf) result = shortest_path(**conf)
assert path == 2 assert result['trip_time'] == 2
assert result['path'] == [(0, 0), (1, 1), (2, 2)]
class TestWithFinalSoC: class TestWithFinalSoC:
...@@ -45,31 +49,36 @@ class TestWithFinalSoC: ...@@ -45,31 +49,36 @@ class TestWithFinalSoC:
"""Charging at s.""" """Charging at s."""
conf = init_config(edge_case) conf = init_config(edge_case)
conf['final_soc'] = 3 conf['final_soc'] = 3
path = shortest_path(**conf) result = shortest_path(**conf)
assert path == 5 assert result['trip_time'] == 5
assert result['path'] == [(0, 0), (0, 1), (1, 2), (1, 4), (2, 5)]
def test_path_impossilbe(self): def test_path_impossilbe(self):
"""Not possible to end with full battery.""" """Not possible to end with full battery."""
conf = init_config(edge_case) conf = init_config(edge_case)
conf['final_soc'] = 4 conf['final_soc'] = 4
path = shortest_path(**conf) result = shortest_path(**conf)
assert path is None assert result['trip_time'] is None
assert result['path'] == []
def test_shortest_path_charge_at_s_only(self): def test_shortest_path_charge_at_s_only(self):
"""Charging at s and a to reach final_soc.""" """Charging at s and a to reach final_soc."""
conf = init_config(edge_case_a_slow) conf = init_config(edge_case_a_slow)
conf['final_soc'] = 3 conf['final_soc'] = 3
path = shortest_path(**conf) result = shortest_path(**conf)
assert path == 5 assert result['trip_time'] == 5
assert result['path'] == [(0, 0), (0, 2), (1, 3), (1, 4), (2, 5)]
def test_shortest_path_no_charge_s_path_t(self): def test_shortest_path_no_charge_s_path_t(self):
"""No charging at s but initial soc.""" """No charging at s but initial soc."""
conf = init_config(edge_case_start_node_no_cs) conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 4 conf['initial_soc'] = 4
conf['final_soc'] = 3 conf['final_soc'] = 3
path = shortest_path(**conf) result = shortest_path(**conf)
assert result['trip_time'] == 2.5
assert result['path'] == [(0, 0), (1, 1), (1, 1.5), (2, 2.5)]
assert path == 2.5
...@@ -44,7 +44,7 @@ edge_case_a_slow = { ...@@ -44,7 +44,7 @@ edge_case_a_slow = {
], ],
'edges': [ 'edges': [
TemplateEdge(0, 1, distance=1, consumption=1), TemplateEdge(0, 1, distance=1, consumption=1),
TemplateEdge(0, 2, distance=1, consumption=4), TemplateEdge(0, 2, distance=1.5, consumption=4),
TemplateEdge(1, 2, distance=1, consumption=1), TemplateEdge(1, 2, distance=1, consumption=1),
] ]
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment