Skip to content
Snippets Groups Projects
Commit 050da9af authored by markn92's avatar markn92
Browse files

consolidated factories

parent 7358366c
No related branches found
No related tags found
No related merge requests found
...@@ -6,33 +6,21 @@ from ..T import Node, SoC, Time ...@@ -6,33 +6,21 @@ from ..T import Node, SoC, Time
from ..graph_tools import charging_cofficient, consumption from ..graph_tools import charging_cofficient, consumption
def charging_function_factory( class SoCProfileFactory:
G: nx.Graph, """Maps Nodes to their (cached) charging functions."""
n: Node,
capacity: SoC,
initial_soc: SoC = None) -> ChargingFunction:
"""Create charging function of node."""
return ChargingFunction(charging_cofficient(G, n), capacity, initial_soc)
def __init__(self, G: nx.Graph, capacity: SoC):
self.G: nx.Graph = G
self.capacity: SoC = capacity
def soc_profile_factory( def __call__(self, u: Node, v: Node = None) -> SoCProfile:
G: nx.Graph, path_cost = 0 if v is None else consumption(self.G, u, v)
capacity: SoC,
u: Node,
v: Node = None,
) -> SoCProfile:
"""
Return SoC Profile of the path from u to v.
If no v is provided, the path of u is definded as no cost path.
""" return SoCProfile(path_cost, self.capacity)
path_cost = 0 if v is None else consumption(G, u, v)
return SoCProfile(path_cost, capacity)
class ChargingFunctionMap: class ChargingFunctionMap:
"""Maps Nodes to their charging functions.""" """Maps Nodes to their (cached) charging functions."""
def __init__(self, G: nx.Graph, capacity: SoC, initial_soc: SoC = None): def __init__(self, G: nx.Graph, capacity: SoC, initial_soc: SoC = None):
self.map: Dict[Node, ChargingFunction] = {} self.map: Dict[Node, ChargingFunction] = {}
...@@ -48,9 +36,8 @@ class ChargingFunctionMap: ...@@ -48,9 +36,8 @@ class ChargingFunctionMap:
try: try:
cf = self.map[node] cf = self.map[node]
except KeyError: except KeyError:
cf = charging_function_factory( cf = ChargingFunction(
G=self.G, c=charging_cofficient(self.G, node),
n=node,
capacity=self.capacity, capacity=self.capacity,
initial_soc=self.initial_soc initial_soc=self.initial_soc
) )
...@@ -59,37 +46,32 @@ class ChargingFunctionMap: ...@@ -59,37 +46,32 @@ class ChargingFunctionMap:
return cf return cf
class SoCFunctionMap: class SoCFunctionFactory:
"""Maps Nodes to their charging functions.""" """Maps Nodes to their charging functions."""
def __init__(self, cf: ChargingFunctionMap): def __init__(self, cf: ChargingFunctionMap):
self.cf: ChargingFunctionMap = cf self.cf: ChargingFunctionMap = cf
def __getitem__(self, label: Label) -> SoCFunction: def __call__(self, label: Label) -> SoCFunction:
return SoCFunction(label, self.cf[label.last_cs]) return SoCFunction(label, self.cf[label.last_cs])
class LabelsFactory: class LabelsFactory:
def __init__(self, def __init__(self,
G: nx.Graph, f_soc: SoCFunctionFactory,
capacity: SoC, soc_profile: SoCProfileFactory):
f_soc: SoCFunctionMap, self.f_soc: SoCFunctionFactory = f_soc
initial_soc: SoC = None): self.soc_profile: SoCProfileFactory = soc_profile
self.G: nx.Graph = G
self.capacity: SoC = capacity
self.f_soc: SoCFunctionMap = f_soc
self.initial_soc: SoC = initial_soc
def spawn_label(self, current_node: Node, current_label: Label, t_charge: Time): def spawn_label(self, current_node: Node, current_label: Label, t_charge: Time):
# Only charge the minimum at the last charge station # Only charge the minimum at the last charge station
# and continue charging at this station. # and continue charging at this station.
soc_function: SoCFunction = self.f_soc[current_label] soc_function: SoCFunction = self.f_soc(current_label)
return Label( return Label(
t_trip=current_label.t_trip + t_charge, t_trip=current_label.t_trip + t_charge,
soc_last_cs=soc_function(current_label.t_trip + t_charge), soc_last_cs=soc_function(current_label.t_trip + t_charge),
last_cs=current_node, last_cs=current_node,
soc_profile_cs_v=soc_profile_factory( soc_profile_cs_v=self.soc_profile(current_node)
self.G, self.capacity, current_node)
) )
...@@ -7,8 +7,8 @@ from evrouting.utils import PriorityQueue ...@@ -7,8 +7,8 @@ from evrouting.utils import PriorityQueue
from evrouting.charge.factories import ( from evrouting.charge.factories import (
LabelsFactory, LabelsFactory,
ChargingFunctionMap, ChargingFunctionMap,
SoCFunctionMap, SoCFunctionFactory,
soc_profile_factory SoCProfileFactory
) )
from ..graph_tools import distance from ..graph_tools import distance
...@@ -34,17 +34,19 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, ...@@ -34,17 +34,19 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
""" """
t = _apply_final_constraints(G, t, final_soc) t = _apply_final_constraints(G, t, final_soc)
cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc) # Init factories
f_soc = SoCFunctionMap(cf) cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
label_factory = LabelsFactory(G, capacity, f_soc, initial_soc) f_soc_factory = SoCFunctionFactory(cf_map)
soc_profile_factory = SoCProfileFactory(G, capacity)
label_factory = LabelsFactory(f_soc_factory, soc_profile_factory)
# Init maps to manage labels # Init maps to manage labels
l_set: Dict[int, Set[Label]] = {v: set() for v in G} l_set: Dict[int, Set[Label]] = {v: set() for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(cf, l_set[v]) for v in G} l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(f_soc_factory, l_set[v]) for v in G}
# Init environment # Init environment
entry_label = _create_entry_label(G, charging_stations, entry_label = _create_entry_label(G, charging_stations,
s, initial_soc, capacity) s, initial_soc, soc_profile_factory)
l_uns[s].insert(entry_label) l_uns[s].insert(entry_label)
# A priority queue defines which node to visit next. # A priority queue defines which node to visit next.
...@@ -63,12 +65,12 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, ...@@ -63,12 +65,12 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
l_set[minimum_node].add(label_minimum_node) l_set[minimum_node].add(label_minimum_node)
if minimum_node == t: if minimum_node == t:
return f_soc[label_minimum_node].minimum return f_soc_factory(label_minimum_node).minimum
# handle charging stations # handle charging stations
if minimum_node in charging_stations and \ if minimum_node in charging_stations and \
not minimum_node == label_minimum_node.last_cs: not minimum_node == label_minimum_node.last_cs:
for t_charge in _calc_optimal_t_charge(cf, label_minimum_node, minimum_node, capacity): for t_charge in _calc_optimal_t_charge(cf_map, label_minimum_node, minimum_node, capacity):
label_new = label_factory.spawn_label(minimum_node, label_new = label_factory.spawn_label(minimum_node,
label_minimum_node, label_minimum_node,
t_charge) t_charge)
...@@ -76,13 +78,13 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, ...@@ -76,13 +78,13 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
# Update priority queue. This node might have gotten a new # Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step. # minimum label spawned is th previous step.
_update_priority_queue(f_soc, prio_queue, l_uns, minimum_node) _update_priority_queue(f_soc_factory, prio_queue, l_uns, minimum_node)
# scan outgoing arcs # scan outgoing arcs
for n in G.neighbors(minimum_node): for n in G.neighbors(minimum_node):
# Create SoC Profile for getting from minimum_node to n # Create SoC Profile for getting from minimum_node to n
soc_profile = label_minimum_node.soc_profile_cs_v + \ soc_profile = label_minimum_node.soc_profile_cs_v + \
soc_profile_factory(G, capacity, minimum_node, n) soc_profile_factory(minimum_node, n)
if _is_feasible_path(soc_profile, capacity): if _is_feasible_path(soc_profile, capacity):
l_new = Label( l_new = Label(
...@@ -110,7 +112,7 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node, ...@@ -110,7 +112,7 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
continue continue
if is_new_min_label: if is_new_min_label:
key, count = _key(l_new, f_soc) key, count = _key(l_new, f_soc_factory)
prio_queue.insert(n, priority=key, count=count) prio_queue.insert(n, priority=key, count=count)
...@@ -130,8 +132,8 @@ def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, cap ...@@ -130,8 +132,8 @@ def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, cap
return t_charge return t_charge
def _key(label, f_soc): def _key(label, f_soc_factory):
soc_function = f_soc[label] soc_function = f_soc_factory(label)
t_min = soc_function.minimum t_min = soc_function.minimum
soc_min = soc_function(t_min) soc_min = soc_function(t_min)
...@@ -144,7 +146,8 @@ def _create_entry_label( ...@@ -144,7 +146,8 @@ def _create_entry_label(
charging_stations: set, charging_stations: set,
s: Node, s: Node,
initial_soc: SoC, initial_soc: SoC,
capacity: SoC) -> Label: soc_profile_factory: SoCProfileFactory
) -> Label:
""" """
Create dummy charging station with initial soc as constant charging Create dummy charging station with initial soc as constant charging
function. function.
...@@ -168,7 +171,7 @@ def _create_entry_label( ...@@ -168,7 +171,7 @@ def _create_entry_label(
t_trip=0, t_trip=0,
soc_last_cs=initial_soc, soc_last_cs=initial_soc,
last_cs=dummy_node, last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(G, capacity, s) soc_profile_cs_v=soc_profile_factory(s)
) )
...@@ -178,7 +181,7 @@ def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool: ...@@ -178,7 +181,7 @@ def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool:
def _update_priority_queue( def _update_priority_queue(
f_soc: SoCFunctionMap, f_soc: SoCFunctionFactory,
prio_queue: PriorityQueue, prio_queue: PriorityQueue,
l_uns: Dict[int, LabelPriorityQueue], l_uns: Dict[int, LabelPriorityQueue],
node: Node): node: Node):
......
...@@ -4,23 +4,19 @@ from math import inf ...@@ -4,23 +4,19 @@ from math import inf
from evrouting.utils import PriorityQueue from evrouting.utils import PriorityQueue
from evrouting.T import SoC, Time from evrouting.T import SoC, Time
from .T import Label, SoCFunction from .T import Label
from .factories import ChargingFunctionMap from .factories import SoCFunctionFactory
class LabelPriorityQueue(PriorityQueue): class LabelPriorityQueue(PriorityQueue):
def __init__(self, cf: ChargingFunctionMap, l_set: Set[Label]): def __init__(self, f_soc: SoCFunctionFactory, l_set: Set[Label]):
super().__init__() super().__init__()
self.cf: ChargingFunctionMap = cf self.f_soc_factory: SoCFunctionFactory = f_soc
self.l_set: Set[Label] = l_set self.l_set: Set[Label] = l_set
def insert(self, label: Label): def insert(self, label: Label):
"""Breaking ties with lowest soc at t_min.""" """Breaking ties with lowest soc at t_min."""
soc_function = SoCFunction( soc_function = self.f_soc_factory(label)
label,
self.cf[label.last_cs]
)
t_min: Time = soc_function.minimum t_min: Time = soc_function.minimum
# Might happen because of dummy charge stations # Might happen because of dummy charge stations
...@@ -49,9 +45,9 @@ class LabelPriorityQueue(PriorityQueue): ...@@ -49,9 +45,9 @@ class LabelPriorityQueue(PriorityQueue):
except KeyError: except KeyError:
return return
soc = SoCFunction(label, self.cf[label.last_cs]) soc = self.f_soc_factory(label)
for other_label in self.l_set: for other_label in self.l_set:
if SoCFunction(other_label, self.cf[other_label.last_cs]) > soc: if self.f_soc_factory(other_label) > soc:
self.remove_item(label) self.remove_item(label)
return return
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment