Skip to content
Snippets Groups Projects
Commit 5c664e34 authored by markn92's avatar markn92
Browse files

little cleanup

parent 7a1758d0
No related branches found
No related tags found
1 merge request!2Dev
......@@ -134,6 +134,26 @@ class ChargingFunction:
return cf_inverse
def __lt__(self, other) -> bool:
"""Comparison for dominance check."""
return self.c < other.c
def __le__(self, other) -> bool:
"""Comparison for dominance check."""
return self.c <= other.c
def __eq__(self, other) -> bool:
"""Comparison for dominance check."""
return self.c == other.c
def __ge__(self, other):
"""Comparison for dominance check."""
return self.c >= other.c
def __gt__(self, other):
"""Comparison for dominance check."""
return self.c > other.c
class Label(NamedTuple):
"""
......
import networkx as nx
from typing import Dict
from .T import SoCProfile, ChargingFunction
from ..T import Node, SoC
from ..graph_tools import charging_cofficient, consumption
from .T import SoCProfile, SoCFunction, ChargingFunction, Label
from ..T import Node, SoC, Time
from ..graph_tools import charging_cofficient, consumption, distance
def charging_function(
def charging_function_factory(
G: nx.Graph,
n: Node,
capacity: SoC,
......@@ -14,7 +15,7 @@ def charging_function(
return ChargingFunction(charging_cofficient(G, n), capacity, initial_soc)
def soc_profile(
def soc_profile_factory(
G: nx.Graph,
capacity: SoC,
u: Node,
......@@ -28,3 +29,62 @@ def soc_profile(
"""
path_cost = 0 if v is None else consumption(G, u, v)
return SoCProfile(path_cost, capacity)
class ChargingFunctionMap:
"""Maps Nodes to their charging functions."""
def __init__(self, G: nx.Graph, capacity: SoC, initial_soc: SoC = None):
self.map: Dict[Node, ChargingFunction] = {}
self.G: nx.Graph = G
self.capacity: SoC = capacity
self.initial_soc: SoC = initial_soc
def __getitem__(self, node: Node) -> ChargingFunction:
"""
Try to get charging function from cache,
else create function and add to cache.
"""
try:
cf = self.map[node]
except KeyError:
cf = charging_function_factory(
G=self.G,
n=node,
capacity=self.capacity,
initial_soc=self.initial_soc
)
self.map[node] = cf
return cf
class LabelsFactory:
def __init__(self,
G: nx.Graph,
capacity: SoC,
cf: ChargingFunctionMap,
initial_soc: SoC = None):
self.G: nx.Graph = G
self.capacity: SoC = capacity
self.cf: ChargingFunctionMap = cf
self.initial_soc: SoC = initial_soc
def spawn_label(self, current_node: Node, current_label: Label):
# Only charge the minimum at the last charge station
# and continue charging at this station.
soc_function: SoCFunction = SoCFunction(
current_label, self.cf[current_label.last_cs]
)
t_trip_old = current_label.t_trip
t_charge: Time = soc_function.minimum - t_trip_old
return Label(
t_trip=t_trip_old + t_charge,
soc_last_cs=soc_function(t_trip_old + t_charge),
last_cs=current_node,
soc_profile_cs_v=soc_profile_factory(
self.G, self.capacity, current_node)
)
......@@ -2,13 +2,15 @@ from typing import Dict
from math import inf
import networkx as nx
from evrouting.T import Node, SoC, Time
from evrouting.T import Node, SoC
from evrouting.utils import PriorityQueue
from evrouting.charge.factories import soc_profile as soc_profile_factory
from evrouting.charge.factories import LabelsFactory, ChargingFunctionMap, soc_profile_factory
from ..graph_tools import distance
from .T import SoCFunction, Label
from .utils import LabelPriorityQueue, ChargingFunctionMap
from .T import SoCFunction, SoCProfile, Label
from .utils import LabelPriorityQueue
__all__ = ['shortest_path']
def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
......@@ -25,34 +27,18 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
:return:
"""
cf = ChargingFunctionMap(G=G, capacity=capacity, initial_soc=initial_soc)
label_factory = LabelsFactory(G, capacity, cf, initial_soc)
q = PriorityQueue()
l_set: Dict[int, set] = {v: set() for v in G}
l_uns: Dict[int, LabelPriorityQueue] = {
v: LabelPriorityQueue() for v in G
}
l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue() for v in G}
# Dummy vertex without incident edges that is (temporarily) added to G
dummy_node: Node = len(G.nodes)
# Charging coefficient 0 indicates dummy node
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
l: Label = Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(G, capacity, s)
)
l_uns[s].insert(
l,
cf[l.last_cs]
)
# Init environment
entry_label = _create_entry_label(G, charging_stations, s, initial_soc, capacity)
l_uns[s].insert(entry_label, cf[entry_label.last_cs])
q = PriorityQueue()
q.insert(s, 0)
# run main loop
while True:
try:
minimum_node: Node = q.peak_min()
......@@ -64,65 +50,35 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
l_set[minimum_node].add(label_minimum_node)
if minimum_node == t:
return SoCFunction(
label_minimum_node,
cf[label_minimum_node.last_cs]
).minimum
return SoCFunction(label_minimum_node,
cf[label_minimum_node.last_cs]
).minimum
# handle charging stations
if minimum_node in charging_stations and not minimum_node == label_minimum_node.last_cs:
cf_last_cs = cf[label_minimum_node.last_cs]
cf_minimum_node = cf[minimum_node]
if cf_minimum_node.c > cf_last_cs.c:
# Only charge the minimum at the last charge station
# and continue charging at this station.
old_soc_function: SoCFunction = SoCFunction(
label_minimum_node, cf_last_cs
)
t_trip_old = label_minimum_node.t_trip
t_charge: Time = old_soc_function.minimum - t_trip_old
label_new = Label(
t_trip=t_trip_old + t_charge,
soc_last_cs=old_soc_function(t_trip_old + t_charge),
last_cs=minimum_node,
soc_profile_cs_v=soc_profile_factory(
G, capacity, minimum_node
)
)
l_uns[minimum_node].insert(
label_new,
cf_minimum_node
)
if cf[minimum_node] > cf[label_minimum_node.last_cs]:
label_new = label_factory.spawn_label(minimum_node, label_minimum_node)
l_uns[minimum_node].insert(label_new, cf[minimum_node])
# update priority queue
try:
label_minimum_node = l_uns[minimum_node].peak_min()
except KeyError:
# l_uns[v] empty
q.delete_min()
else:
q.insert(minimum_node, label_minimum_node.key)
# Update priority queue. This node might have gotten a new
# minimum label spawned is th previous step.
_update_priority_queue(q, l_uns, minimum_node)
# scan outgoing arcs
for n in G.neighbors(minimum_node):
# Create SoC Profile for getting from minimum_node to n
soc_profile = label_minimum_node.soc_profile_cs_v + \
soc_profile_factory(G, capacity, minimum_node, n)
if not soc_profile(capacity) == -inf:
# It is possible to get from minimum_node to n
if _is_feasible_path(soc_profile, capacity):
l_new = Label(
label_minimum_node.t_trip + distance(G, minimum_node, n),
label_minimum_node.soc_last_cs,
label_minimum_node.last_cs,
soc_profile
t_trip=label_minimum_node.t_trip + distance(G, minimum_node, n),
soc_last_cs=label_minimum_node.soc_last_cs,
last_cs=label_minimum_node.last_cs,
soc_profile_cs_v=soc_profile
)
try:
l_uns[n].insert(
l_new,
cf[l_new.last_cs]
)
l_uns[n].insert(l_new, cf[l_new.last_cs])
except ValueError:
# Infeasible because last_cs might be an
# dummy charging station. Therefore, the path might
......@@ -136,3 +92,58 @@ def shortest_path(G: nx.Graph, charging_stations: set, s: Node, t: Node,
else:
if l_new == l_uns[n].peak_min():
q.insert(n, l_new.key)
def _create_entry_label(
G: nx.Graph,
charging_stations: set,
s: Node,
initial_soc: SoC,
capacity: SoC) -> Label:
"""
Create dummy charging station with initial soc as constant charging
function.
:param G: Graph
:param charging_stations: Set of charging stations in Graph G
:param s: Starting Node
:param initial_soc: Initial SoC at beginng of the route
:param capacity: The restricting battery capacity
:return: Label for the starting Node
"""
dummy_node: Node = len(G.nodes)
# Charging coefficient 0 indicates dummy node
G.add_node(dummy_node, c=0)
charging_stations.add(dummy_node)
# Register dummy charging station as the last
# seen charging station before s.
return Label(
t_trip=0,
soc_last_cs=initial_soc,
last_cs=dummy_node,
soc_profile_cs_v=soc_profile_factory(G, capacity, s)
)
def _is_feasible_path(soc_profile: SoCProfile, capacity: SoC) -> bool:
"""Check, if possible to traverse path at least with full battery."""
return not soc_profile(capacity) == -inf
def _update_priority_queue(
q: PriorityQueue,
l_uns: Dict[int, LabelPriorityQueue],
node: Node):
"""
Update key of a node the priority queue according to
its minimum label.
"""
try:
minimum_label = l_uns[node].peak_min()
except KeyError:
# l_uns[v] empty
q.delete_min()
else:
q.insert(node, minimum_label.key)
from typing import Dict
from math import inf
import networkx as nx
from evrouting.utils import PriorityQueue
from evrouting.T import SoC, Time, Node
from .factories import charging_function
from .T import Label, SoCFunction, ChargingFunction
......@@ -32,29 +29,3 @@ class LabelPriorityQueue(PriorityQueue):
)
class ChargingFunctionMap:
"""Maps Nodes to their charging functions."""
def __init__(self, G: nx.Graph, capacity: SoC, initial_soc: SoC = None):
self.map: Dict[Node, ChargingFunction] = {}
self.G: nx.Graph = G
self.capacity: SoC = capacity
self.initial_soc: SoC = initial_soc
def __getitem__(self, node: Node) -> ChargingFunction:
"""
Try to get charging function from cache,
else create function and add to cache.
"""
try:
cf = self.map[node]
except KeyError:
cf = charging_function(
G=self.G,
n=node,
capacity=self.capacity,
initial_soc=self.initial_soc
)
self.map[node] = cf
return cf
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment