Skip to content
Snippets Groups Projects
Commit 23d3cc00 authored by markn92's avatar markn92
Browse files

more elegant datastructures

parent 0e90839e
No related branches found
No related tags found
No related merge requests found
from typing import NewType, Tuple
from typing import Tuple, Union, NewType
from math import inf
Node = NewType('Node', int)
Edge = NewType('Edge', Tuple[Node, Node])
SoC = NewType('SoC', float)
\ No newline at end of file
Node = int
Edge = Tuple[Node, Node]
Wh = NewType('Wh', Union[float, int])
SoC = NewType('SoC', Union[-inf, Wh])
ChargingCoefficient = float
Time = Union[float, int]
from typing import NewType, Callable
from copy import copy
from collections import namedtuple
from math import inf
from evrouting.T import SoC
import networkx as nx
SoCProfile = NewType('SoCProfile', Callable[[SoC], SoC])
from evrouting.T import SoC, Wh, ChargingCoefficient, Time, Node
from evrouting.graph_tools import charging_cofficient, consumption
Label = namedtuple('Label', ['t_trip', 'beta_u', 'u', 'SoCProfile_u_v'])
class ChargingFunction:
def __init__(self, G: nx.Graph, l: Label):
self.t_trip: Time = l.t_trip
self.beta_u: SoC = l.beta_u
self.u: Node = l.u
self.b_u_v: SoCProfile = l.SoCProfile_u_v
self.c_u: ChargingCoefficient = charging_cofficient(G, l.u)
def __call__(self, t) -> SoC:
if t < self.t_trip:
return -inf
return self.beta_u(self.beta_u + self.c_u * (t - self.t_trip))
def get_minimum(self) -> Time:
"""TODO: Explain."""
cost_p = self.b_u_v.cost
return max(self.t_trip, (cost_p - self.beta_u) / self.c_u + self.t_trip)
class SoCProfile:
"""
Describe SoC profile with two parameters:
- cost: Cost of going from u to v.
- out: Maximal SoC after passing the path from u to v.
"""
def __init__(self, G: nx.Graph, U: SoC, u: Node, v: Node = None):
if v is None:
self.cost: Wh = 0
self.out: Wh = U
else:
self.cost: Wh = consumption(G, u, v)
self.out: Wh = U - self.cost
def __call__(self, beta) -> SoC:
if beta < self.cost:
return -inf
return beta - self.cost
def __add__(self, other: 'SoCProfile') -> 'SoCProfile':
new = copy(self)
new.cost = self.cost + other.cost
new.out = self.out - other.cost
return new
from typing import List
from math import inf
import networkx as nx
from evrouting.T import Node, SoC, Time, ChargingCoefficient
from evrouting.utils import PriorityQueue
from .T import SoCProfile, ChargingFunction, Label
def shortest_path(G: nx.Graph, s, t, b_0: float, b_t: float, U: float):
def shortest_path(G: nx.Graph, S: set, s: Node, t: Node, beta_s: SoC, beta_t: SoC, U: SoC):
"""
Calculates shortest path using the CHarge algorithm.
:param G: Input Graph
:param s: Start Node identifier
:param t: End Node identifier
:param b_0: Start SoC
:param b_t: End SoC
:param beta_s: Start SoC
:param beta_t: End SoC
:param U: Capacity
:return:
"""
q = PriorityQueue()
l_set = {v: set() for v in G}
l_uns = {v: PriorityQueue() for v in G}
# Dummy vertex without incident edges that is (temporarily) added to G
v_0: Node = Node(len(G.nodes))
G.add_node(v_0)
S.add(v_0)
cf_v_0 = [(0, beta_s)]
l_uns[s] = PriorityQueue()
l = Label(0, beta_s, v_0, SoCProfile(G, U, s))
l_uns[s].insert(item=l, priority=key(l))
q.insert(s, 0)
# run main loop
while True:
try:
v = q.peak_min()
except KeyError:
# empty queue
break
l = l_uns[v].delete_min()
l_set[v].add(l)
if v == t:
return ChargingFunction(G, l).get_minimum()
# handle charging stations
t_trip, beta_u, u, b_u_v = l
if v in S and not v == u:
# TODO !!!
for t_charge in t_breaks(l):
l_uns[v].insert(new_label(l), priority=) # prio??
# update priority queue
if l_uns[v]:
l_new = l_uns[v].peak_min()
q.insert(v, key(l_new))
else:
q.delete_min()
# scan outgoing arcs
for x, y in G[v]:
b_x_y = b_u_v + SoCProfile(G, U, x, y)
if not b_x_y(beta_max_u) == -inf:
l_new = (t_trip + G.edges[x, y]['weight'], beta_u, u, b_x_y)
l_uns[y].insert(l_new)
if l_new == l_uns[y].peak_min():
q.insert(y, key(l_new))
def key(l: Label) -> Time:
return l.t_trip
def t_breaks(c_old: ChargingCoefficient, c_new: ChargingCoefficient) -> List[Time]:
pass
from typing import Dict, Tuple
from collections import namedtuple
Street = namedtuple('Street', ['u', 'v', 'distance', 'consumption'])
Node = namedtuple('Node', ['label', 'charging_coeff'], defaults=(None, None))
import networkx as nx
from evrouting.T import Wh, ChargingCoefficient
TemplateEdge = namedtuple('Edge', ['u', 'v', 'distance', 'consumption'])
TemplateNode = namedtuple('Node', ['label', 'charging_coeff'], defaults=(None, None))
def node_convert(n: Node) -> dict:
NodeData = Dict
EdgeData = Dict
Node = int
Edge = Tuple[int, int]
def node_convert(n: TemplateNode) -> NodeData:
return {'label': n.label, 'c': n.charging_coeff}
def street_convert(s: Street) -> dict:
return {'weight': s.distance, 'c': s.consumption}
def edge_convert(e: TemplateEdge) -> EdgeData:
return {'weight': e.distance, 'c': e.consumption}
def consumption(G: nx.Graph, u: Node, v: Node) -> Wh:
return G.edges[u, v]['c']
def charging_cofficient(G: nx.Graph, n: Node) -> ChargingCoefficient:
return G.nodes[n]['c']
import itertools
from typing import Any
from heapq import *
......@@ -10,7 +11,7 @@ class PriorityQueue:
self.entry_finder = {} # mapping of tasks to entries
self.counter = itertools.count() # unique sequence count as tie break
def insert(self, item, priority=0):
def insert(self, item: Any, priority=0):
"""Add a new task or update the priority of an existing task"""
if item in self.entry_finder:
self.remove_item(item)
......@@ -19,12 +20,12 @@ class PriorityQueue:
self.entry_finder[item] = entry
heappush(self.pq, entry)
def remove_item(self, item):
def remove_item(self, item: Any):
"""Mark an existing task as REMOVED. Raise KeyError if not found."""
entry = self.entry_finder.pop(item)
entry[-1] = self.REMOVED
def delete_min(self):
def delete_min(self) -> Any:
"""Remove and return the lowest priority task. Raise KeyError if empty."""
while self.pq:
priority, count, item = heappop(self.pq)
......@@ -33,7 +34,7 @@ class PriorityQueue:
return item
raise KeyError('pop from an empty priority queue')
def peak_min(self):
def peak_min(self) -> Any:
"""Return minimum item without removing it from the queue."""
while self.pq:
priority, count, item = self.pq[0]
......
import networkx as nx
from evrouting.graph_tools import Node, Street, node_convert, street_convert
from evrouting.graph_tools import node_convert, edge_convert
from evrouting.graph_tools import TemplateEdge as Edge
from evrouting.graph_tools import TemplateNode as Node
# List of configs
config_list = ['edge_case']
......@@ -17,9 +19,9 @@ edge_case = {
Node('t'),
],
'edges': [
Street(0, 1, distance=1, consumption=1),
Street(0, 2, distance=1, consumption=4),
Street(1, 2, distance=1, consumption=1),
Edge(0, 1, distance=1, consumption=1),
Edge(0, 2, distance=1, consumption=4),
Edge(1, 2, distance=1, consumption=1),
]
}
......@@ -31,6 +33,6 @@ def get_graph(config):
G.add_node(node_id, **node_convert(node))
for edge in config['edges']:
G.add_edge(edge.u, edge.v, **street_convert(edge))
G.add_edge(edge.u, edge.v, **edge_convert(edge))
return G
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment