Skip to content
Snippets Groups Projects
Commit 3d9be99b authored by markn92's avatar markn92
Browse files

wip

parent 3475815f
No related branches found
No related tags found
1 merge request!2Dev
......@@ -180,11 +180,6 @@ class Label(NamedTuple):
last_cs: Node
soc_profile_cs_v: SoCProfile
@property
def key(self):
"""Key for sorting."""
return self.t_trip
class SoCFunction:
"""
......
......@@ -3,7 +3,7 @@ from typing import Dict
from .T import SoCProfile, SoCFunction, ChargingFunction, Label
from ..T import Node, SoC, Time
from ..graph_tools import charging_cofficient, consumption, distance
from ..graph_tools import charging_cofficient, consumption
def charging_function_factory(
......
......@@ -4,7 +4,9 @@ from math import inf
import networkx as nx
from evrouting.T import Node, SoC
from evrouting.utils import PriorityQueue
from evrouting.charge.factories import LabelsFactory, ChargingFunctionMap, soc_profile_factory
from evrouting.charge.factories import (
LabelsFactory, ChargingFunctionMap, soc_profile_factory
)
from ..graph_tools import distance
from .T import SoCFunction, SoCProfile, Label
......@@ -26,21 +28,24 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
:param U: Capacity
:return:
"""
t = _apply_final_constraints(G, t, final_soc)
cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
label_factory = LabelsFactory(G, capacity, cf, initial_soc)
# Init maps to manage labels
l_set: Dict[int, set] = {v: set() for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue() for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(cf) for v in G}
# Init environment
entry_label = _create_entry_label(G, charging_stations, s, initial_soc, capacity)
l_uns[s].insert(entry_label, cf[entry_label.last_cs])
entry_label = _create_entry_label(G, charging_stations,
s, initial_soc, capacity)
l_uns[s].insert(entry_label)
# A priority queue defines which node to visit next.
# The key is the trip time.
prio_queue = PriorityQueue()
prio_queue.insert(s, 0)
prio_queue.insert(s, priority=0, count=0)
while True:
try:
......@@ -58,14 +63,16 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
).minimum
# handle charging stations
if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs:
if minimum_node in charging_stations and \
not minimum_node == label_minimum_node.last_cs:
if cf[minimum_node] > cf[label_minimum_node.last_cs]:
label_new = label_factory.spawn_label(minimum_node, label_minimum_node)
l_uns[minimum_node].insert(label_new, cf[minimum_node])
label_new = label_factory.spawn_label(minimum_node,
label_minimum_node)
l_uns[minimum_node].insert(label_new)
# Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step.
_update_priority_queue(prio_queue, l_uns, minimum_node)
_update_priority_queue(cf, prio_queue, l_uns, minimum_node)
# scan outgoing arcs
for n in G.neighbors(minimum_node):
......@@ -81,7 +88,7 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
soc_profile_cs_v=soc_profile
)
try:
l_uns[n].insert(l_new, cf[l_new.last_cs])
l_uns[n].insert(l_new)
except ValueError:
# Infeasible because last_cs might be an
# dummy charging station. Therefore, the path might
......@@ -94,7 +101,20 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
pass
else:
if l_new == l_uns[n].peak_min():
prio_queue.insert(n, l_new.key)
key, count = _key(l_new, cf[l_new.last_cs])
prio_queue.insert(n, priority=key, count=count)
def _key(label, cf):
soc_function = SoCFunction(
label,
cf
)
t_min = soc_function.minimum
soc_min = soc_function(t_min)
return t_min, soc_min
def _create_entry_label(
......@@ -136,6 +156,7 @@ def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool:
def _update_priority_queue(
cf: ChargingFunctionMap,
prio_queue: PriorityQueue,
l_uns: Dict[int, LabelPriorityQueue],
node: Node):
......@@ -144,9 +165,18 @@ def _update_priority_queue(
its minimum label.
"""
try:
minimum_label = l_uns[node].peak_min()
minimum_label: Label = l_uns[node].peak_min()
except KeyError:
# l_uns[v] empty
prio_queue.delete_min()
else:
prio_queue.insert(node, minimum_label.key)
key, count = _key(minimum_label, cf[minimum_label.last_cs])
prio_queue.insert(node, priority=key, count=count)
def _apply_final_constraints(G: nx.Graph, t: Node, final_soc: SoC) -> Node:
temp_final_node = len(G)
G.add_node(temp_final_node)
G.add_edge(t, temp_final_node, weight=0, c=final_soc)
return temp_final_node
from math import inf
from evrouting.utils import PriorityQueue
from evrouting.T import SoC, Time, Node
from evrouting.T import SoC, Time
from .T import Label, SoCFunction, ChargingFunction
from .T import Label, SoCFunction
from .factories import ChargingFunctionMap
class LabelPriorityQueue(PriorityQueue):
def insert(self, label: Label, cf: ChargingFunction):
def __init__(self, cf: ChargingFunctionMap):
super().__init__()
self.cf: ChargingFunctionMap = cf
def insert(self, label: Label):
"""Breaking ties with lowest soc at t_min."""
soc_function = SoCFunction(
label,
cf
self.cf[label.last_cs]
)
t_min: Time = soc_function.minimum
......@@ -27,5 +32,3 @@ class LabelPriorityQueue(PriorityQueue):
priority=t_min,
count=soc_min
)
......@@ -8,33 +8,60 @@ from ..config import (
)
def test_shortest_path_charge_at_s_and_a():
"""Charging at s."""
path = shortest_path(**init_config(edge_case))
class TestRoutes:
assert path == 3.5
def test_shortest_path_charge_at_s_and_a(self):
"""Charging at s."""
path = shortest_path(**init_config(edge_case))
assert path == 3.5
def test_shortest_path_charge_at_s_only():
"""Charging at s."""
path = shortest_path(**init_config(edge_case_a_slow))
def test_shortest_path_charge_at_s_only(self):
"""Charging at s."""
path = shortest_path(**init_config(edge_case_a_slow))
assert path == 3
assert path == 3
def test_shortest_path_no_charge_s_path_t(self):
"""No charging at s but enough initial SoC to go to t directly."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 4
path = shortest_path(**conf)
def test_shortest_path_no_charge_s_path_t():
"""No charging at s but enough initial SoC to go to t directly."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 4
path = shortest_path(**conf)
assert path == 1
assert path == 1
def test_shortest_path_no_charge_s_path_a(self):
"""No charging at s but just enough SoC to go to t via a."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 2
path = shortest_path(**conf)
assert path == 2
def test_shortest_path_no_charge_s_path_a():
"""No charging at s but just enough SoC to go to t via a."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 2
path = shortest_path(**conf)
assert path == 2
class TestWithFinalSoC:
def test_shortest_path_charge_at_s_and_a(self):
"""Charging at s."""
conf = init_config(edge_case)
conf['final_soc'] = 3
path = shortest_path(**conf)
assert path == 5
def test_shortest_path_charge_at_s_only(self):
"""Charging at s and a to reach final_soc."""
conf = init_config(edge_case_a_slow)
conf['final_soc'] = 3
path = shortest_path(**conf)
assert path == 4
def test_shortest_path_no_charge_s_path_t(self):
"""No charging at s but initial soc."""
conf = init_config(edge_case_start_node_no_cs)
conf['initial_soc'] = 4
conf['final_soc'] = 3
path = shortest_path(**conf)
assert path == 2.5
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment