Newer
Older
"""
Convert a Open Street Maps `.map` format file into a networkx directional graph.
This parser is based on the osm to networkx tool
from Loïc Messal (github : Tofull)
Added :
- python3.6 compatibility
- networkx v2 compatibility
- cache to avoid downloading the same osm tiles again and again
- distance computation to estimate length of each ways (useful to compute the shortest path)
"""
import copy
import xml.sax
from evrouting.osm.const import ms_to_kmh
from evrouting.osm.profiles import speed
from evrouting.osm.routing import point, haversine_distance
class OSMGraph(nx.DiGraph):
"""
Adding some functionality to the graph for convenience when
working with actual geo data from osm, such as a spatial index and
a method to get an entry node by the spacial index for some coordinates.
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.charging_stations = set()
# Data structures for spatial index.
# Rtree only supports indexing by integers, but the
# node ids are strings. Therefore a mapping is introduced.
self._rtree = rtree.index.Index()
self._int_index = itertools.count()
info = self.nodes[node]
lat, lon = info['lat'], info['lon']
index = next(self._int_index)
self._rtree.insert(index, (lon, lat, lon, lat))
self._int_index_map[index] = node
def rebuild_rtree(self):
"""Rebuild tree. Necessary because it cannot be pickled."""
self._rtree = rtree.index.Index()
self._int_index = itertools.count()
self._int_index_map = {}
for n in self.nodes:
self.insert_into_rtree(n)
def insert_charging_stations(self, charging_stations):
"""Insert Charging Stations"""
S = set()
for s in charging_stations:
lon = s['lon']
lat = s['lat']
n = self.find_nearest((lat, lon), distance_limit=500)
if n is not None:
self.nodes[n][CHARGING_COEFFICIENT_KEY] = s['power']
S.add(n)
self.charging_stations = S
def find_nearest(self, v: point, distance_limit=None):
"""
Find nearest point to location v within radius
of distance_limit.
"""
lat_v, lon_v = v
index_n = next(self._rtree.nearest(
(lon_v, lat_v, lon_v, lat_v), 1
))
n = self._int_index_map[index_n]
if distance_limit is not None:
d = haversine_distance(
self.nodes[n]['lat'],
self.nodes[n]['lon'],
lat_v,
lon_v,
unit_m=True
)
if d > distance_limit:
n = None
"""
Read graph in OSM format from file specified by name or by stream object.
Create Graph containing all highways as edges.
if w.tags['highway'] not in profile['highway_whitelist']:
continue
for u_id, v_id in zip(w.nds[:-1], w.nds[1:]):
u, v = osm.nodes[u_id], osm.nodes[v_id]
# Travel-time from u to v
d = haversine_distance(u.lon, u.lat, v.lon, v.lat, unit_m=True) # in m
t = d / (speed(w, profile) / ms_to_kmh) # in s
class Node(object):
def __init__(self, id, lon, lat):
self.id = id
self.lon = lon
self.lat = lat
self.tags = {}
def __str__(self):
return "Node (id : %s) lon : %s, lat : %s " % (self.id, self.lon, self.lat)
def split(self, node_pass_count):
"""
Slice way at every crossing i.e. when a waypoint is passend by
multiple ways.
"""
def slice_array(waypoints):
for i in range(1, len(waypoints) - 1):
if node_pass_count[waypoints[i]] > 1:
left = waypoints[:i + 1]
right = waypoints[i:]
# create a way object for each node-array slice
ret = []
i = 0
for slice in slices:
littleway = copy.copy(self)
littleway.id += "-%d" % i
littleway.nds = slice
ret.append(littleway)
i += 1
return ret
class OSM(object):
""" File can be either a filename or stream/file object.
set `is_xml_string=False` if osm_xml_data is a filename or a file stream.
"""
nodes = {}
ways = {}
class OSMHandler(xml.sax.ContentHandler):
@classmethod
def setDocumentLocator(self, loc):
pass
@classmethod
def startDocument(self):
pass
@classmethod
def endDocument(self):
pass
@classmethod
def startElement(self, name, attrs):
if name == 'node':
self.currElem = Node(attrs['id'], float(attrs['lon']), float(attrs['lat']))
elif name == 'way':
elif name == 'tag':
self.currElem.tags[attrs['k']] = attrs['v']
elif name == 'nd':
self.currElem.nds.append(attrs['ref'])
@classmethod
def endElement(self, name):
if name == 'node':
nodes[self.currElem.id] = self.currElem
elif name == 'way':
ways[self.currElem.id] = self.currElem
@classmethod
def characters(self, chars):
pass
self.nodes = nodes
self.ways = ways
# count times each node is used
node_histogram = dict.fromkeys(self.nodes.keys(), 0)
for way in self.ways.values():
if len(way.nds) < 2: # if a way has only one node, delete it out of the osm collection
del self.ways[way.id]
else:
for node in way.nds:
node_histogram[node] += 1
# use that histogram to split all ways, replacing the member set of ways
new_ways = {}
for id, way in self.ways.items():
split_ways = way.split(node_histogram)
for split_way in split_ways:
new_ways[split_way.id] = split_way
self.ways = new_ways