"""The example map format described in `map_format.md`, conforming to
the interface in openlr_dereferencer.maps"""
import os
import sqlite3
from typing import Sequence, Tuple, Iterable
from openlr import Coordinates
from .primitives import Line, Node, ExampleMapError, SRID
from ..maps import MapReader, wgs84
[docs]class ExampleMapReader(MapReader):
"""
This is a reader for the example map format described in `map_format.md`.
Create an instance with: `ExampleMapReader('example.sqlite')`.
"""
def __init__(self, map_db_file: str):
self.connection = sqlite3.connect(map_db_file)
self.connection.enable_load_extension(True)
try:
self.connection.load_extension("mod_spatialite.so")
except sqlite3.OperationalError:
raise ExampleMapError(
"Spatialite (mod_spatialite.so) was not found on your system."
"Please install all dependencies."
)
[docs] def get_line(self, line_id: int) -> Line:
# Just verify that this line ID exists.
result = self.connection.execute("SELECT rowid FROM lines WHERE rowid=?", (line_id,))
if result.fetchone() is None:
raise ExampleMapError(f"The line {line_id} does not exist")
return Line(self, line_id)
[docs] def get_lines(self) -> Iterable[Line]:
result = self.connection.execute("SELECT rowid FROM lines")
for (line_id,) in result:
yield Line(self, line_id)
[docs] def get_linecount(self) -> int:
(count,) = self.connection.execute("SELECT COUNT(*) FROM lines").fetchone()
return count
[docs] def get_node(self, node_id: int) -> Node:
result = self.connection.execute("SELECT id FROM nodes WHERE id=?", (node_id,))
(node_id,) = result.fetchone()
return Node(self, node_id)
[docs] def get_nodes(self) -> Iterable[Node]:
result = self.connection.execute("SELECT id FROM nodes")
for (node_id,) in result.fetchall():
yield Node(self, node_id)
[docs] def get_nodecount(self) -> int:
(count,) = self.connection.execute("SELECT COUNT(*) FROM nodes").fetchone()
return count
[docs] def find_nodes_close_to(self, coord: Coordinates, dist: float) -> Iterable[Node]:
"""Finds all nodes in a given radius, given in meters
Yields every node within this distance to `coord`."""
lon, lat = coord.lon, coord.lat
stmt = """SELECT id FROM nodes WHERE Distance(MakePoint(?, ?), coord, 0) < ?"""
for (node_id,) in self.connection.execute(stmt, (lon, lat, dist)):
yield Node(self, node_id)
[docs] def find_lines_close_to(self, coord: Coordinates, dist: float) -> Iterable[Line]:
"Yields all lines within `dist` meters around `coord`"
lon, lat = coord.lon, coord.lat
stmt = """SELECT rowid FROM lines WHERE PtDistWithin(MakePoint(?, ?), path, ?, 0)"""
for (line_id,) in self.connection.execute(stmt, (lon, lat, dist)):
yield Line(self, line_id)