Skip to content
Snippets Groups Projects
Commit db70672c authored by markn92's avatar markn92
Browse files

introduce dummy node class

parent 312f8ce5
No related branches found
No related tags found
1 merge request!1Working np Algorithm
from typing import Tuple, Union, NewType from dataclasses import dataclass
from typing import Tuple, Union, NewType, Dict, Any
from math import inf from math import inf
Node = int Node = int
Edge = Tuple[Node, Node] Edge = Tuple[Node, Node]
@dataclass
class DummyNode:
n: Node
NodeData = Dict[str, Any]
EdgeData = Dict[str, Any]
Wh = NewType('Wh', Union[float, int]) Wh = NewType('Wh', Union[float, int])
SoC = NewType('SoC', Union[-inf, Wh]) SoC = NewType('SoC', Union[-inf, Wh])
......
from typing import Tuple from typing import Union
from copy import copy from copy import copy
from collections import namedtuple from collections import namedtuple
from math import inf from math import inf
import networkx as nx import networkx as nx
from evrouting.T import SoC, Wh, ChargingCoefficient, Time, Node, DummyNode
from evrouting.T import SoC, Wh, ChargingCoefficient, Time, Node
from evrouting.graph_tools import charging_cofficient, consumption from evrouting.graph_tools import charging_cofficient, consumption
class Label(namedtuple): class Label(namedtuple):
t_trip: Time t_trip: Time
beta_u: SoC beta_u: SoC
u: Node u: Union[Node, DummyNode]
SoCProfile: SoCProfile SoCProfile: SoCProfile
...@@ -28,14 +27,27 @@ class ChargingFunction: ...@@ -28,14 +27,27 @@ class ChargingFunction:
return beta if beta < self.U else self.U return beta if beta < self.U else self.U
class DummyChargingFunction:
def __init__(self, beta_s: SoC):
self.beta_s = beta_s
def __call__(self, t: Time, beta: SoC = 0) -> SoC:
return self.beta_s
class SoCFunction: class SoCFunction:
def __init__(self, G: nx.Graph, l: Label, U: SoC): def __init__(self, G: nx.Graph, l: Label, U: SoC):
self.t_trip: Time = l.t_trip self.t_trip: Time = l.t_trip
self.beta_u: SoC = l.beta_u self.beta_u: SoC = l.beta_u
self.u: Node = l.u self.u: Union[Node, DummyNode] = l.u
self.b_u_v: SoCProfile = l.SoCProfile self.b_u_v: SoCProfile = l.SoCProfile
self.cf: ChargingFunction = ChargingFunction(G, l.u, U)
self.cf: Union[ChargingFunction, DummyChargingFunction]
if isinstance(self.u, DummyNode):
self.cf = DummyChargingFunction(self.beta_u)
else:
self.cf = ChargingFunction(G, self.u, U)
def __call__(self, t: Time) -> SoC: def __call__(self, t: Time) -> SoC:
if t < self.t_trip: if t < self.t_trip:
...@@ -43,8 +55,11 @@ class SoCFunction: ...@@ -43,8 +55,11 @@ class SoCFunction:
return self.b_u_v(self.cf(t - self.t_trip, self.beta_u)) return self.b_u_v(self.cf(t - self.t_trip, self.beta_u))
def get_minimum(self) -> Time: def get_minimum(self, beta_t: SoC = 0) -> Time:
"""TODO: Explain.""" """TODO: Explain."""
if isinstance(self.cf, DummyChargingFunction):
return 0
cost_p = self.b_u_v.cost cost_p = self.b_u_v.cost
return max(self.t_trip, (cost_p - self.beta_u) / return max(self.t_trip, (cost_p - self.beta_u) /
self.cf.c + self.t_trip) self.cf.c + self.t_trip)
......
...@@ -5,7 +5,8 @@ import networkx as nx ...@@ -5,7 +5,8 @@ import networkx as nx
from evrouting.T import Node, SoC, Time from evrouting.T import Node, SoC, Time
from evrouting.utils import PriorityQueue from evrouting.utils import PriorityQueue
from .T import SoCProfile, SoCFunction, Label, ChargingFunction from .T import SoCProfile, SoCFunction, Label, ChargingFunction, DummyNode
from .utils import LabelPriorityQueue
from ..graph_tools import distance from ..graph_tools import distance
...@@ -24,18 +25,15 @@ def shortest_path(G: nx.Graph, S: set, s: Node, t: Node, ...@@ -24,18 +25,15 @@ def shortest_path(G: nx.Graph, S: set, s: Node, t: Node,
""" """
q = PriorityQueue() q = PriorityQueue()
l_set: Dict[int, set] = {v: set() for v in G} l_set: Dict[int, set] = {v: set() for v in G}
l_uns: Dict[int, PriorityQueue] = {v: PriorityQueue() for v in G} l_uns: Dict[int, LabelPriorityQueue] = {v: LabelPriorityQueue(G, U) for v in G}
# Dummy vertex without incident edges that is (temporarily) added to G # Dummy vertex without incident edges that is (temporarily) added to G
v_0: Node = len(G.nodes) dummy_node: DummyNode = DummyNode(len(G.nodes))
G.add_node(v_0) G.add_node(dummy_node)
S.add(v_0) S.add(dummy_node)
cf_v_0 = [(0, beta_s)] l = Label(0, beta_s, dummy_node, SoCProfile(G, U, s))
l_uns[s] = PriorityQueue() l_uns[s].insert(l)
l = Label(0, beta_s, v_0, SoCProfile(G, U, s))
l_uns[s].insert(item=l, priority=key(l))
q.insert(s, 0) q.insert(s, 0)
...@@ -51,7 +49,7 @@ def shortest_path(G: nx.Graph, S: set, s: Node, t: Node, ...@@ -51,7 +49,7 @@ def shortest_path(G: nx.Graph, S: set, s: Node, t: Node,
l_set[v].add(l) l_set[v].add(l)
if v == t: if v == t:
return SoCFunction(G, l, U).get_minimum() return SoCFunction(G, l, U).get_minimum(beta_t)
# handle charging stations # handle charging stations
t_trip, beta_u, u, b_u_v = l t_trip, beta_u, u, b_u_v = l
...@@ -68,7 +66,7 @@ def shortest_path(G: nx.Graph, S: set, s: Node, t: Node, ...@@ -68,7 +66,7 @@ def shortest_path(G: nx.Graph, S: set, s: Node, t: Node,
u=v, u=v,
SoCProfile=SoCProfile(G, U, v) SoCProfile=SoCProfile(G, U, v)
) )
l_uns[v].insert(l_new, priority=key(l_new)) l_uns[v].insert(l_new)
# update priority queue # update priority queue
if l_uns[v]: if l_uns[v]:
...@@ -89,4 +87,3 @@ def shortest_path(G: nx.Graph, S: set, s: Node, t: Node, ...@@ -89,4 +87,3 @@ def shortest_path(G: nx.Graph, S: set, s: Node, t: Node,
def key(l: Label) -> Time: def key(l: Label) -> Time:
return l.t_trip return l.t_trip
import networkx as nx
from evrouting.utils import PriorityQueue
from evrouting.T import SoC
from .T import Label, SoCFunction
class LabelPriorityQueue(PriorityQueue):
def __init__(self, G: nx.Graph, U: SoC):
super().__init__()
self.G = G
self.U = U
def insert(self, item: Label):
"""Breaking ties with lowest soc at t_min."""
soc_function = SoCFunction(self.G, item, self.U)
t_min = soc_function.get_minimum()
super().insert(
item,
priority=t_min,
count=soc_function(t_min)
)
from typing import Dict, Tuple, Any
from collections import namedtuple from collections import namedtuple
import networkx as nx import networkx as nx
from evrouting.T import Wh, ChargingCoefficient, Time from evrouting.T import Wh, ChargingCoefficient, Time, Node, NodeData, EdgeData
TemplateEdge = namedtuple('Edge', ['u', 'v', 'distance', 'consumption']) TemplateEdge = namedtuple('Edge', ['u', 'v', 'distance', 'consumption'])
TemplateNode = namedtuple( TemplateNode = namedtuple(
'Node', ['label', 'charging_coeff'], defaults=(None, None) 'Node', ['label', 'charging_coeff'], defaults=(None, None)
) )
NodeData = Dict[str, Any]
EdgeData = Dict[str, Any]
Node = int
Edge = Tuple[int, int]
def node_convert(n: TemplateNode) -> NodeData: def node_convert(n: TemplateNode) -> NodeData:
return {'label': n.label, 'c': n.charging_coeff} return {'label': n.label, 'c': n.charging_coeff}
......
...@@ -11,11 +11,11 @@ class PriorityQueue: ...@@ -11,11 +11,11 @@ class PriorityQueue:
self.entry_finder = {} # mapping of tasks to entries self.entry_finder = {} # mapping of tasks to entries
self.counter = itertools.count() # unique sequence count as tie break self.counter = itertools.count() # unique sequence count as tie break
def insert(self, item: Any, priority=0): def insert(self, item: Any, priority: Any = 0, count: Any = None):
"""Add a new task or update the priority of an existing task""" """Add a new task or update the priority of an existing task"""
if item in self.entry_finder: if item in self.entry_finder:
self.remove_item(item) self.remove_item(item)
count = next(self.counter) count = count or next(self.counter)
entry = [priority, count, item] entry = [priority, count, item]
self.entry_finder[item] = entry self.entry_finder[item] = entry
heappush(self.pq, entry) heappush(self.pq, entry)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment