Skip to content
Snippets Groups Projects
Commit a9ef7a7b authored by markn92's avatar markn92
Browse files

ae

parent a37827ee
No related branches found
No related tags found
No related merge requests found
"""
Constants for conversion.
Usage::
a * a_to_b = b
"""
ms_to_kmh = 3.6
......@@ -20,16 +20,16 @@ from collections import namedtuple
import networkx as nx
import requests
import rtree
from evrouting.graph_tools import CHARGING_COEFFICIENT_KEY
from evrouting.osm.const import ms_to_kmh
from evrouting.osm.profiles import speed
logger = logging.getLogger(__name__)
OsrmConf = namedtuple('OsrmConf', ['server', 'port', 'version', 'profile'],
defaults=('v1', 'driving'))
def query_url(service, coordinates, osrm_config: OsrmConf):
"""Construct query url."""
return f'http://{osrm_config.server}:{osrm_config.port}' \
......@@ -89,40 +89,38 @@ class OsrmDistance:
return resp['routes'][0]['duration']
def read_osm(osm_xml_data, only_roads=True) -> nx.DiGraph:
"""Read graph in OSM format from file specified by name or by stream object.
Parameters
----------
filename_or_stream : filename or stream object
Returns
-------
G : Graph
def read_osm(osm_xml_data, profile) -> nx.DiGraph:
"""
Read graph in OSM format from file specified by name or by stream object.
Create Graph containing all highways as edges.
"""
osm = OSM(osm_xml_data)
G = nx.DiGraph()
## Add ways
# Add ways
for w in osm.ways.values():
if only_roads and 'highway' not in w.tags:
if 'highway' not in w.tags:
continue
if w.tags['highway'] not in profile['highway_whitelist']:
continue
for u_id, v_id in zip(w.nds[:-1], w.nds[1:]):
u, v = osm.nodes[u_id], osm.nodes[v_id]
# Travel-time from u to v
d = haversine_distance(
u.lon, u.lat, v.lon, v.lat, unit_m=True
) / speed(w, profile) * ms_to_kmh
if ('oneway' in w.tags):
if (w.tags['oneway'] == 'yes'):
if w.tags.get('oneway', 'no') == 'yes':
# ONLY ONE DIRECTION
nx.add_path(G, w.nds, id=w.id)
G.add_edge(u_id, v_id, weight=d)
else:
# BOTH DIRECTION
nx.add_path(G, w.nds, id=w.id)
nx.add_path(G, w.nds[::-1], id=w.id)
else:
# BOTH DIRECTION
nx.add_path(G, w.nds, id=w.id)
nx.add_path(G, w.nds[::-1], id=w.id)
G.rtree = rtree.index.Index()
G.add_edge(u_id, v_id, weight=d)
G.add_edge(v_id, u_id, weight=d)
# Complete the used nodes' information
for n_id in G.nodes():
......@@ -130,7 +128,6 @@ def read_osm(osm_xml_data, only_roads=True) -> nx.DiGraph:
G.nodes[n_id]['lat'] = n.lat
G.nodes[n_id]['lon'] = n.lon
G.nodes[n_id]['id'] = n.id
G.rtree.insert(n_id, (n.lat, n.lon, n.lat, n.lon))
return G
......@@ -147,26 +144,30 @@ class Node(object):
class Way(object):
def __init__(self, id, osm):
self.osm = osm
def __init__(self, id):
self.id = id
self.nds = []
self.tags = {}
def split(self, dividers):
def split(self, node_pass_count):
"""
Slice way at every crossing i.e. when a waypoint is passend by
multiple ways.
"""
# slice the node-array using this nifty recursive function
def slice_array(ar, dividers):
for i in range(1, len(ar) - 1):
if dividers[ar[i]] > 1:
left = ar[:i + 1]
right = ar[i:]
def slice_array(waypoints):
for i in range(1, len(waypoints) - 1):
if node_pass_count[waypoints[i]] > 1:
left = waypoints[:i + 1]
right = waypoints[i:]
rightsliced = slice_array(right, dividers)
rightsliced = slice_array(right)
return [left] + rightsliced
return [ar]
return [waypoints]
slices = slice_array(self.nds, dividers)
slices = slice_array(self.nds)
# create a way object for each node-array slice
ret = []
......@@ -190,8 +191,6 @@ class OSM(object):
nodes = {}
ways = {}
superself = self
class OSMHandler(xml.sax.ContentHandler):
@classmethod
def setDocumentLocator(self, loc):
......@@ -210,7 +209,7 @@ class OSM(object):
if name == 'node':
self.currElem = Node(attrs['id'], float(attrs['lon']), float(attrs['lat']))
elif name == 'way':
self.currElem = Way(attrs['id'], superself)
self.currElem = Way(attrs['id'])
elif name == 'tag':
self.currElem.tags[attrs['k']] = attrs['v']
elif name == 'nd':
......@@ -246,7 +245,5 @@ class OSM(object):
for id, way in self.ways.items():
split_ways = way.split(node_histogram)
for split_way in split_ways:
if split_way.id in new_ways:
print('Way id already exists.')
new_ways[split_way.id] = split_way
self.ways = new_ways
"""Defines Routing Profiles"""
def speed(way, profile):
profile_speed = min(
profile['speeds'][way.tags['highway']], profile['maxspeed']
)
try:
max_speed = float(way.tags['maxspeed'])
except (KeyError, ValueError):
speed = profile_speed
else:
speed = min(profile_speed, max_speed)
return speed
car = {
"maxspeed": 140,
"speeds": {
"motorway": 130,
"motorway_link": 60,
"trunk": 100,
"trunk_link": 60,
"primary": 50,
"primary_link": 50,
"secondary": 50,
"secondary_link": 50,
"tertiary": 50,
"tertiary_link": 50,
"unclassified": 30,
"residential": 30,
"living_street": 3,
"service": 3
},
"highway_whitelist": {
'motorway',
'motorway_link',
'trunk',
'trunk_link',
'primary',
'primary_link',
'secondary',
'secondary_link',
'tertiary',
'tertiary_link',
'residential',
'living_street',
'unclassified',
'service'
},
"access": ["access", "vehicle", "motor_vehicle", "motorcar"]
}
from typing import Tuple
import networkx as nx
from evrouting.osm.const import ms_to_kmh
from evrouting.osm.imports import haversine_distance
lat = float
lon = float
point = Tuple[lat, lon]
def find_nearest(G, v: point):
min_dist = None
closest_node = None
lat_v, lon_v = v
for n in G.nodes:
d = haversine_distance(
G.nodes[n]['lat'],
G.nodes[n]['lot'],
lat_v,
lon_v,
unit_m=True
)
if min_dist is None or d < min_dist:
closest_node = n
min_dist = d
return closest_node
def shortest_path(G, s: point, t: point, profile):
"""Calc A* shortest path."""
_s = find_nearest(G, s)
_t = find_nearest(G, t)
def dist(u, v):
return haversine_distance(
G.nodes[u]['lat'],
G.nodes[u]['lot'],
G.nodes[v]['lat'],
G.nodes[v]['lon'],
unit_m=True
) / profile['maxspeed'] * ms_to_kmh
return nx.astar_path(G, _s, _t, heuristic=dist)
......@@ -4,7 +4,8 @@ import pytest
import networkx as nx
import rtree
from evrouting.osm import read_osm, insert_charging_stations
from evrouting.osm.imports import read_osm, insert_charging_stations
from evrouting.osm.profiles import car
from evrouting.graph_tools import CHARGING_COEFFICIENT_KEY
......@@ -28,10 +29,11 @@ def graph():
del G
def _test_read_osm():
def test_read_osm():
"""Just check if it runs. Todo: Delete."""
assert read_osm(os.path.join(os.path.dirname(__file__), 'static/map.osm'))
G = read_osm(os.path.join(os.path.dirname(__file__), 'static/map.osm'),
car)
assert True
def test_insert_charging_stations_close(graph):
# Close two node 1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment