Skip to content
Snippets Groups Projects
Commit 5c27e442 authored by markn92's avatar markn92
Browse files

wip

parent 050da9af
No related branches found
No related tags found
No related merge requests found
import networkx as nx
from typing import Dict from typing import Dict
import networkx as nx
from .T import SoCProfile, SoCFunction, ChargingFunction, Label from .T import SoCProfile, SoCFunction, ChargingFunction, Label
from ..T import Node, SoC, Time from ..T import Node, SoC, Time
from ..graph_tools import charging_cofficient, consumption from ..graph_tools import charging_cofficient, consumption
...@@ -61,13 +62,13 @@ class LabelsFactory: ...@@ -61,13 +62,13 @@ class LabelsFactory:
def __init__(self, def __init__(self,
f_soc: SoCFunctionFactory, f_soc: SoCFunctionFactory,
soc_profile: SoCProfileFactory): soc_profile: SoCProfileFactory):
self.f_soc: SoCFunctionFactory = f_soc self.f_soc_factory: SoCFunctionFactory = f_soc
self.soc_profile: SoCProfileFactory = soc_profile self.soc_profile: SoCProfileFactory = soc_profile
def spawn_label(self, current_node: Node, current_label: Label, t_charge: Time): def spawn_label(self, current_node: Node, current_label: Label, t_charge: Time):
# Only charge the minimum at the last charge station # Only charge the minimum at the last charge station
# and continue charging at this station. # and continue charging at this station.
soc_function: SoCFunction = self.f_soc(current_label) soc_function: SoCFunction = self.f_soc_factory(current_label)
return Label( return Label(
t_trip=current_label.t_trip + t_charge, t_trip=current_label.t_trip + t_charge,
......
...@@ -12,8 +12,8 @@ from evrouting.charge.factories import ( ...@@ -12,8 +12,8 @@ from evrouting.charge.factories import (
) )
from ..graph_tools import distance from ..graph_tools import distance
from .T import SoCProfile, SoCFunction, Label from .T import SoCFunction, Label
from .utils import LabelPriorityQueue from .utils import LabelPriorityQueue, keys
__all__ = ['shortest_path'] __all__ = ['shortest_path']
...@@ -86,7 +86,7 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, ...@@ -86,7 +86,7 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
soc_profile = label_minimum_node.soc_profile_cs_v + \ soc_profile = label_minimum_node.soc_profile_cs_v + \
soc_profile_factory(minimum_node, n) soc_profile_factory(minimum_node, n)
if _is_feasible_path(soc_profile, capacity): if soc_profile(capacity) != -inf:
l_new = Label( l_new = Label(
t_trip=label_minimum_node.t_trip + distance(G, minimum_node, n), t_trip=label_minimum_node.t_trip + distance(G, minimum_node, n),
soc_last_cs=label_minimum_node.soc_last_cs, soc_last_cs=label_minimum_node.soc_last_cs,
...@@ -112,8 +112,7 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, ...@@ -112,8 +112,7 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
continue continue
if is_new_min_label: if is_new_min_label:
key, count = _key(l_new, f_soc_factory) prio_queue.insert(n, **keys(f_soc_factory, l_new))
prio_queue.insert(n, priority=key, count=count)
def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, capacity: SoC) -> List[Time]: def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, capacity: SoC) -> List[Time]:
...@@ -132,15 +131,6 @@ def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, cap ...@@ -132,15 +131,6 @@ def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, cap
return t_charge return t_charge
def _key(label, f_soc_factory):
soc_function = f_soc_factory(label)
t_min = soc_function.minimum
soc_min = soc_function(t_min)
return t_min, soc_min
def _create_entry_label( def _create_entry_label(
G: nx.Graph, G: nx.Graph,
charging_stations: set, charging_stations: set,
...@@ -175,11 +165,6 @@ def _create_entry_label( ...@@ -175,11 +165,6 @@ def _create_entry_label(
) )
def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool:
"""Check, if possible to traverse path at least with full battery."""
return not soc_profile(capacity) == -inf
def _update_priority_queue( def _update_priority_queue(
f_soc: SoCFunctionFactory, f_soc: SoCFunctionFactory,
prio_queue: PriorityQueue, prio_queue: PriorityQueue,
...@@ -195,8 +180,7 @@ def _update_priority_queue( ...@@ -195,8 +180,7 @@ def _update_priority_queue(
# l_uns[v] empty # l_uns[v] empty
prio_queue.delete_min() prio_queue.delete_min()
else: else:
key, count = _key(minimum_label, f_soc) prio_queue.insert(node, **keys(f_soc, minimum_label))
prio_queue.insert(node, priority=key, count=count)
def _apply_final_constraints(G: nx.Graph, t: Node, final_soc: SoC) -> Node: def _apply_final_constraints(G: nx.Graph, t: Node, final_soc: SoC) -> Node:
......
from typing import Set, Any from typing import Set, Any, Dict
from math import inf from math import inf
from evrouting.utils import PriorityQueue from evrouting.utils import PriorityQueue
...@@ -16,20 +16,7 @@ class LabelPriorityQueue(PriorityQueue): ...@@ -16,20 +16,7 @@ class LabelPriorityQueue(PriorityQueue):
def insert(self, label: Label): def insert(self, label: Label):
"""Breaking ties with lowest soc at t_min.""" """Breaking ties with lowest soc at t_min."""
soc_function = self.f_soc_factory(label) super().insert(item=label, **keys(self.f_soc_factory, label))
t_min: Time = soc_function.minimum
# Might happen because of dummy charge stations
if t_min == -inf:
raise ValueError('Infeasible label.')
soc_min: SoC = soc_function(t_min)
super().insert(
item=label,
priority=t_min,
count=soc_min
)
if self.peak_min() == label: if self.peak_min() == label:
self.dominance_check() self.dominance_check()
...@@ -51,3 +38,16 @@ class LabelPriorityQueue(PriorityQueue): ...@@ -51,3 +38,16 @@ class LabelPriorityQueue(PriorityQueue):
if self.f_soc_factory(other_label) > soc: if self.f_soc_factory(other_label) > soc:
self.remove_item(label) self.remove_item(label)
return return
def keys(f_soc_factory: SoCFunctionFactory, label: Label) -> Dict:
soc_function = f_soc_factory(label)
t_min: Time = soc_function.minimum
# Might happen because of dummy charge stations
if t_min == -inf:
raise ValueError('Infeasible label.')
soc_min: SoC = soc_function(t_min)
return {'priority': t_min, 'count': soc_min}
import pytest import pytest
from evrouting.charge.utils import LabelPriorityQueue from evrouting.charge.utils import LabelPriorityQueue
from evrouting.charge.T import Label from evrouting.charge.T import Label
from evrouting.charge.factories import SoCFunctionFactory
@pytest.fixture @pytest.fixture
def q(label, ch_function): def q(label, ch_function):
_, _, cf = ch_function _, _, cf = ch_function
dummy_cf = {label.last_cs: cf} dummy_cf = {label.last_cs: cf}
q = LabelPriorityQueue(dummy_cf) q = LabelPriorityQueue(SoCFunctionFactory(dummy_cf), l_set=set({}))
q.insert(label) q.insert(label)
# create min # create min
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment