Skip to content
Snippets Groups Projects
Commit 2cfd3e35 authored by markn92's avatar markn92
Browse files

get rid of label factory

parent 5c27e442
No related branches found
No related tags found
No related merge requests found
......@@ -55,24 +55,3 @@ class SoCFunctionFactory:
def __call__(self, label: Label) -> SoCFunction:
return SoCFunction(label, self.cf[label.last_cs])
class LabelsFactory:
def __init__(self,
f_soc: SoCFunctionFactory,
soc_profile: SoCProfileFactory):
self.f_soc_factory: SoCFunctionFactory = f_soc
self.soc_profile: SoCProfileFactory = soc_profile
def spawn_label(self, current_node: Node, current_label: Label, t_charge: Time):
# Only charge the minimum at the last charge station
# and continue charging at this station.
soc_function: SoCFunction = self.f_soc_factory(current_label)
return Label(
t_trip=current_label.t_trip + t_charge,
soc_last_cs=soc_function(current_label.t_trip + t_charge),
last_cs=current_node,
soc_profile_cs_v=self.soc_profile(current_node)
)
from typing import Dict, List, Set
from typing import Dict, Union, Set
from math import inf
import networkx as nx
from evrouting.T import Node, SoC, Time
from evrouting.utils import PriorityQueue
from evrouting.charge.factories import (
LabelsFactory,
ChargingFunctionMap,
SoCFunctionFactory,
SoCProfileFactory
)
from ..graph_tools import distance
from .T import SoCFunction, Label
from .T import ChargingFunction, SoCFunction, Label
from .utils import LabelPriorityQueue, keys
__all__ = ['shortest_path']
......@@ -38,7 +37,6 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
cf_map = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
f_soc_factory = SoCFunctionFactory(cf_map)
soc_profile_factory = SoCProfileFactory(G, capacity)
label_factory = LabelsFactory(f_soc_factory, soc_profile_factory)
# Init maps to manage labels
l_set: Dict[int, Set[Label]] = {v: set() for v in G}
......@@ -70,11 +68,21 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
# handle charging stations
if minimum_node in charging_stations and \
not minimum_node == label_minimum_node.last_cs:
for t_charge in _calc_optimal_t_charge(cf_map, label_minimum_node, minimum_node, capacity):
label_new = label_factory.spawn_label(minimum_node,
label_minimum_node,
t_charge)
l_uns[minimum_node].insert(label_new)
f_soc: SoCFunction = f_soc_factory(label_minimum_node)
t_charge = _calc_optimal_t_charge(
current_cs=cf_map[minimum_node],
f_soc=f_soc,
capacity=capacity)
if t_charge is not None:
l_uns[minimum_node].insert(
Label(
t_trip=label_minimum_node.t_trip + t_charge,
soc_last_cs=f_soc(label_minimum_node.t_trip + t_charge),
last_cs=minimum_node,
soc_profile_cs_v=soc_profile_factory(minimum_node)
)
)
# Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step.
......@@ -115,18 +123,20 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
prio_queue.insert(n, **keys(f_soc_factory, l_new))
def _calc_optimal_t_charge(cf: ChargingFunctionMap, label_v: Label, v: Node, capacity: SoC) -> List[Time]:
f_soc_breakpoints = SoCFunction(label_v, cf[label_v.last_cs]).breakpoints
t_charge = []
def _calc_optimal_t_charge(current_cs: ChargingFunction,
f_soc: SoCFunction,
capacity: SoC) -> Union[Time, None]:
f_soc_breakpoints = f_soc.breakpoints
t_charge = None
if cf[v] > cf[label_v.last_cs]:
if current_cs > f_soc.cf_cs:
# Faster charging station -> charge as soon as possible
t_charge.append(f_soc_breakpoints[0].t - label_v.t_trip)
t_charge = f_soc_breakpoints[0].t - f_soc.t_trip
elif f_soc_breakpoints[-1].soc < capacity:
# Slower charging station might still be dominating
# because the soc cannot be more than the full capacity
# decreased by the trip costs. This will be refilled at this station.
t_charge.append(f_soc_breakpoints[-1].t - label_v.t_trip)
t_charge = f_soc_breakpoints[-1].t - f_soc.t_trip
return t_charge
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment