import logging
import numpy as np
from typing import Union, Tuple, List, Dict, Optional
from shapely.geometry import Point
from lxml import etree
from igp2.opendrive.elements.geometry import normalise_angle
from igp2.opendrive.elements.junction import Junction, JunctionGroup
from igp2.opendrive.elements.opendrive import OpenDrive
from igp2.opendrive.elements.road import Road
from igp2.opendrive.elements.road_lanes import Lane, LaneTypes
from igp2.opendrive.parser import parse_opendrive
logger = logging.getLogger(__name__)
[docs]
class Map(object):
""" Define a map object based on the OpenDrive standard """
ROAD_PRECISION_ERROR = 1e-8 # Maximum precision error allowed when checking if two geometries contain each other
LANE_PRECISION_ERROR = 1e-8
JUNCTION_PRECISION_ERROR = 1e-8
def __init__(self, opendrive: OpenDrive = None):
""" Create a map object given the parsed OpenDrive file
Args:
opendrive: A class describing the parsed contents of the OpenDrive file
"""
self.__xodr_path = None
self.__opendrive = opendrive
self.__process_header()
self.__process_road_layout()
def __process_header(self):
self.__name = self.__opendrive.header.name
self.__date = self.__opendrive.header.date
self.__north = float(self.__opendrive.header.north)
self.__west = float(self.__opendrive.header.west)
self.__south = float(self.__opendrive.header.south)
self.__east = float(self.__opendrive.header.east)
self.__geo_reference = self.__opendrive.header.geo_reference
def __process_road_layout(self):
roads = {}
for road in self.__opendrive.roads:
road.plan_view.precalculate(linestring=True)
road.calculate_road_geometry(resolution=min(road.plan_view.length / 4, 0.25))
assert road.id not in roads
roads[road.id] = road
self.__roads = roads
junctions = {}
for junction in self.__opendrive.junctions:
junction.calculate_boundary()
assert junction.id not in junctions
junctions[junction.id] = junction
self.__junctions = junctions
junction_groups = {}
for junction_group in self.__opendrive.junction_groups:
assert junction_group.id not in junction_groups
junction_groups[junction_group.id] = junction_group
self.__junction_groups = junction_groups
def __repr__(self):
return f"Map(name={self.name})"
[docs]
def roads_at(self, point: Union[Point, Tuple[float, float], np.ndarray], drivable: bool = False,
max_distance: float = None) -> List[Road]:
""" Find all roads that pass through the given point within an error given by Map.ROAD_PRECISION_ERROR. The
default error is 1e-8.
Args:
point: Point in cartesian coordinates
drivable: Whether the returned roads need to be drivable
max_distance: Maximum distance error
Returns:
A list of all viable roads or empty list
"""
if max_distance is None:
max_distance = Map.ROAD_PRECISION_ERROR
point = Point(point)
candidates = []
for road_id, road in self.roads.items():
if road.boundary is not None and road.boundary.distance(point) < max_distance:
if drivable and not road.drivable: continue
candidates.append(road)
return candidates
[docs]
def lanes_at(self, point: Union[Point, Tuple[float, float], np.ndarray], drivable_only: bool = False,
max_distance: float = None) -> List[Lane]:
""" Return all lanes passing through the given point within an error given by Map.LANE_PRECISION_ERROR. The
default error is 1.5
Args:
point: Point in cartesian coordinates
drivable_only: If True, only return drivable lanes
max_distance: Maximum distance error
Returns:
A list of all viable lanes or empty list
"""
if max_distance is None:
max_distance = Map.LANE_PRECISION_ERROR
candidates = []
point = Point(point)
roads = self.roads_at(point, max_distance=max_distance)
for road in roads:
for lane_section in road.lanes.lane_sections:
for lane in lane_section.all_lanes:
if (lane.boundary is not None and
not lane.boundary.is_empty and
lane.boundary.distance(point) < max_distance and
(not drivable_only or lane.type == LaneTypes.DRIVING) and
lane not in candidates):
candidates.append(lane)
return candidates
[docs]
def roads_within_angle(self, point: Union[Point, Tuple[float, float], np.ndarray],
heading: float, threshold: float, max_distance: float = None) -> List[Road]:
""" Return a list of Roads whose angular distance from the given heading is within the given threshold. If only
one road is available at the given point, then always return that regardless of angle difference. If point is
within a junction, then check against all roads of the junction.
Args:
point: Point in cartesian coordinates
heading: Heading in radians
threshold: The threshold in radians
max_distance: Maximum error in lane distance calculations
Returns:
List of Roads
"""
if threshold <= 0.0:
return []
if max_distance is None:
max_distance = Map.ROAD_PRECISION_ERROR
point = Point(point)
roads = self.roads_at(point, max_distance=max_distance)
if len(roads) == 1:
return roads
junction_at_point = self.junction_at(point)
if len(roads) > 1 and junction_at_point is not None:
roads = junction_at_point.roads
ret = []
original_heading = normalise_angle(heading)
for road in roads:
_, angle = road.plan_view.calc(road.midline.project(point))
if road.junction is None and np.abs(original_heading - angle) > np.pi / 2:
heading = normalise_angle(original_heading + np.pi)
else:
heading = original_heading
diff = np.abs(normalise_angle(heading - angle))
if diff < threshold:
ret.append(road)
return ret
[docs]
def lanes_within_angle(self, point: Union[Point, Tuple[float, float], np.ndarray],
heading: float, threshold: float, drivable_only: bool = True,
max_distance: float = None) -> List[Lane]:
""" Return a list of Lanes whose angular distance from the given heading is within the given threshold and whose
distance from the point is within an error as given by Map.LANE_PRECISION_ERROR.
Args:
point: Point in cartesian coordinates
heading: Heading in radians
threshold: The threshold in radians
drivable_only: If True, only return a Lane if it is drivable
max_distance: Maximum error in lane distance calculations
Returns:
List of Lanes
"""
if max_distance is None:
max_distance = Map.LANE_PRECISION_ERROR
point = Point(point)
ret = []
roads = self.roads_within_angle(point, heading, threshold, max_distance=max_distance)
for road in roads:
for lane_section in road.lanes.lane_sections:
for lane in lane_section.all_lanes:
_, original_angle = road.plan_view.calc(road.midline.project(point))
if lane.id > 0:
angle = normalise_angle(original_angle + np.pi)
else:
angle = original_angle
angle_diff = np.abs(normalise_angle(heading - angle))
if lane.boundary is not None and lane.boundary.distance(point) < max_distance and \
lane.id != 0 and (not drivable_only or lane.type == LaneTypes.DRIVING) \
and angle_diff < threshold:
ret.append(lane)
return ret
[docs]
def best_road_at(self,
point: Union[Point, Tuple[float, float], np.ndarray],
heading: float = None,
drivable: bool = True,
goal: "Goal" = None) -> Optional[Road]:
""" Get the road at the given point with the closest direction to heading. If no heading is given, then select
the first viable road.
Args:
point: Point in cartesian coordinates
heading: Heading in radians
drivable: Whether only to consider roads that have drivable lanes
goal: If given, the best road is chosen based on its distance from the goal
Returns:
A Road passing through point with its direction closest to the given heading, or None.
"""
point = Point(point)
roads = self.roads_at(point)
if len(roads) == 0:
logger.debug(f"No roads found at point: {point}!")
return None
if len(roads) == 1 or heading is None:
return roads[0]
best = None
best_diff = np.inf
original_heading = normalise_angle(heading)
for road in roads:
if drivable and not road.drivable: continue
_, angle = road.plan_view.calc(road.midline.project(point))
heading = original_heading
if road.junction:
if road.all_lane_backwards:
angle = normalise_angle(angle + np.pi)
elif np.abs(original_heading - angle) > np.pi / 2:
heading = normalise_angle(original_heading + np.pi)
diff = abs((heading - angle + np.pi) % (2 * np.pi) - np.pi)
if goal is not None and best is not None:
# Measure the distance from the 'best' and current road to the goal
current_norm_distance = 0.0 if road.all_lane_backwards else 1.0
best_norm_distance = 0.0 if best.all_lane_backwards else 1.0
dist_best_road_from_goal = goal.distance(
best.midline.interpolate(best_norm_distance, normalized=True))
dist_current_road_from_goal = goal.distance(
road.midline.interpolate(current_norm_distance, normalized=True))
# Check if the new road is closer to the goal than the current best.
goal_distance_diff = dist_current_road_from_goal - dist_best_road_from_goal
angle_diff = diff - best_diff
if goal_distance_diff < 0 and angle_diff < np.pi / 36 \
or goal_distance_diff < 1 and angle_diff < 0:
best = road
best_diff = diff
elif diff < best_diff:
best = road
best_diff = diff
# warn_threshold = np.pi / 18
# if best_diff > warn_threshold: # Warning if angle difference was too large
# logger.debug(f"Best angle difference of {np.rad2deg(best_diff)} > "
# f"{np.rad2deg(warn_threshold)} at {point} on road {best}!")
return best
[docs]
def best_lane_at(self,
point: Union[Point, Tuple[float, float], np.ndarray],
heading: float = None,
drivable_only: bool = True,
max_distance: float = None,
goal: "Goal" = None) -> Optional[Lane]:
""" Get the lane at the given point whose direction is closest to the given heading and whose distance from the
point is the smallest.
Args:
point: Point in cartesian coordinates
heading: Heading in radians
drivable_only: If True, only return a Lane if it is drivable
max_distance: Maximum error in distance calculations
goal: If given, the road on which the best lane will be is chosen based on its distance from the goal
Returns:
A Lane passing through point with its direction closest to the given heading, or None.
"""
if max_distance is None:
max_distance = Map.LANE_PRECISION_ERROR
point = Point(point)
road = self.best_road_at(point, heading, goal=goal)
if road is None:
return None
best = None
_, original_angle = road.plan_view.calc(road.midline.project(point))
for lane_section in road.lanes.lane_sections:
for lane in lane_section.all_lanes:
if lane.boundary is not None and lane.id != 0 and (not drivable_only or lane.type == LaneTypes.DRIVING):
distance = lane.boundary.distance(point)
if distance < max_distance:
angle_diff = 0.0
if heading is not None:
if lane.id > 0:
angle = normalise_angle(original_angle + np.pi)
else:
angle = original_angle
angle_diff = np.abs(heading - angle)
if best is None or best[0] > angle_diff + distance:
best = (angle_diff + distance, lane)
return best[1] if best is not None else None
[docs]
def junction_at(self, point: Union[Point, Tuple[float, float], np.ndarray]) -> Optional[Junction]:
""" Get the Junction at a given point within an error given by Map.JUNCTION_PRECISION_ERROR
Args:
point: Location to check in cartesian coordinates
Returns:
A Junction object or None
"""
point = Point(point)
for junction_id, junction in self.junctions.items():
if junction.boundary is not None and junction.boundary.distance(point) < Map.JUNCTION_PRECISION_ERROR:
return junction
return None
[docs]
def junction_predecessor_lanes(self, lane: Lane, in_roundabout: bool = False) -> List[Lane]:
""" Retrieve all predecessor lanes of a lane if the predecessor of the parent road is
a junction.
Args:
lane: The lane to check.
in_roundabout: Whether to only check a junction if it is in a roundabout.
"""
if lane.link.predecessor is not None:
logger.warning(f"Lane predecessors is not None. Predecessor is not a junction.")
return lane.link.predecessor
road_predecessor = lane.parent_road.link.predecessor
if road_predecessor is None or \
not isinstance(road_predecessor.element, Junction) or \
in_roundabout != road_predecessor.element.in_roundabout:
return []
start_point = lane.midline.interpolate(0.0).buffer(1.0)
possible_lanes = []
for junction_road in road_predecessor.element.roads:
for ls in junction_road.lanes.lane_sections:
for lane in ls.all_lanes:
if lane.midline.intersects(start_point):
possible_lanes.append(lane)
return possible_lanes
[docs]
def adjacent_lanes_at(self, point: Union[Point, Tuple[float, float], np.ndarray], heading: float = None,
same_direction: bool = False, drivable_only: bool = False) -> List[Lane]:
""" Return all adjacent lanes on the same Road at the given point and heading.
Args:
point: Point in cartesian coordinates
heading: Heading in radians
same_direction: If True, only return lanes in the same direction as the current Lane
drivable_only: If True, only return a Lane if it is drivable
Returns:
A list of all adjacent Lane objects on the same Road
"""
point = Point(point)
current_lane = self.best_lane_at(point, heading)
return self.get_adjacent_lanes(current_lane, same_direction, drivable_only)
[docs]
def get_adjacent_lanes(self, current_lane: Lane,
same_direction: bool = True, drivable_only: bool = True) -> List[Lane]:
""" Return all adjacent lanes of the given lane.
Args:
current_lane: The current lane
same_direction: If True, only return lanes that have the same direction as the given lane
drivable_only: If True, only return drivable lanes
Returns:
List of adjacent lanes
"""
adjacents = []
direction = np.sign(current_lane.id)
for lane in current_lane.lane_section.all_lanes:
if lane.id != current_lane.id and lane.id != 0:
dirs_equal = np.sign(lane.id) == direction
drivable = lane.type == LaneTypes.DRIVING
if same_direction and drivable_only:
if dirs_equal and drivable:
adjacents.append(lane)
elif same_direction:
if dirs_equal:
adjacents.append(lane)
elif drivable_only:
if drivable:
adjacents.append(lane)
else:
adjacents.append(lane)
return adjacents
[docs]
def in_roundabout(self, point: Union[Point, Tuple[float, float], np.ndarray], heading: float = None) -> bool:
""" Determines whether the vehicle is currently in a roundabout. A roundabout road is either a connector road
in a junction with a junction group of type 'roundabout' - that is, it is neither an exit from or entry into the
roundabout - or it is a road whose predecessor and successor are both in the same roundabout junction group.
Args:
point: Point in cartesian coordinates
heading: Heading in radians
Returns:
True if the vehicle is one a road in a roundabout.
"""
road = self.best_road_at(point, heading)
if road is None:
raise ValueError(f"No road found at {point}.")
return self.road_in_roundabout(road)
[docs]
def road_in_roundabout(self, road: Road, iters: int = 7) -> bool:
""" Calculate whether a road is in a roundabout. A roundabout road is either a connector road
in a junction with a junction group of type 'roundabout' - that is, it is neither an exit from or entry into the
roundabout - or it is a road whose predecessor and successor are both in the same roundabout junction group.
Args:
road: The Road to check
iters: The number of successor roads to check around the roundabout before throwing an error
Returns:
True if the road is part of a roundabout
"""
def check_element(e) -> Optional[bool]:
if isinstance(e, Junction):
return e.junction_group is not None and e.junction_group.type == "roundabout"
junction = e.junction
pred = e.link.predecessor
succ = e.link.successor
# Dead-end roads cannot be in roundabouts
if pred is None or succ is None:
return False
# Road with left and right lanes cannot be a roundabout
if any([len(ls.right_lanes) > 0 and len(ls.left_lanes) > 0 for ls in e.lanes.lane_sections]):
return False
# Check if the junction is a roundabout-junction
if junction is not None:
return junction.junction_group is not None and junction.junction_group.type == "roundabout"
return None
# Return value may be None, but we strictly care about False
if check_element(road) == False:
return False
t = 0
s, p = False, False
road_p, road_s = road, road
while t < iters:
if not p:
predecessor = road_p.link.predecessor
if predecessor is None:
return False
p = check_element(predecessor.element)
road_p = predecessor.element
if not s:
successor = road_s.link.successor
if successor is None:
return False
s = check_element(successor.element)
road_s = successor.element
if p == False or s == False:
return False
if p == True and s == True:
return True
t += 1
raise RuntimeError(f"Couldn't determine whether {road} is in a roundabout.")
[docs]
def get_lane(self, road_id: int, lane_id: int, lane_section_idx: int = 0) -> Lane:
""" Get a certain lane given the road id and lane id from the given lane section.
Args:
road_id: Road ID of the road containing the lane
lane_id: Lane ID of lane to look up
lane_section_idx: The index of the lane section to look-up
Returns:
Lane
"""
lane_sections = self.roads.get(road_id).lanes.lane_sections
assert 0 <= lane_section_idx < len(lane_sections), "Invalid lane section index given"
return lane_sections[lane_section_idx].get_lane(lane_id)
[docs]
def is_valid(self):
""" Checks if the Map geometry is valid. """
for road in self.roads.values():
if road.boundary is None or not road.boundary.is_valid:
return False
for lane_section in road.lanes.lane_sections:
for lane in lane_section.left_lanes + lane_section.right_lanes:
if lane.boundary is None or not lane.boundary.is_valid:
return False
for junction in self.junctions.values():
if junction.boundary is None or not junction.boundary.is_valid:
return False
return True
@property
def name(self) -> str:
""" Name for the map """
return self.__name
@property
def date(self) -> str:
""" Date when the map was created """
return self.__date
@property
def geo_reference(self) -> str:
""" Geo-reference parameters for geo-location """
return self.__geo_reference
@property
def roads(self) -> Dict[int, Road]:
""" Dictionary of all roads in the map with keys the road IDs """
return self.__roads
@property
def junctions(self) -> Dict[int, Junction]:
return self.__junctions
@property
def junction_groups(self) -> Dict[int, JunctionGroup]:
return self.__junction_groups
@property
def north(self) -> float:
""" North boundary of the map"""
return self.__north
@property
def south(self) -> float:
""" South boundary of the map"""
return self.__south
@property
def east(self) -> float:
""" East boundary of the map"""
return self.__east
@property
def west(self) -> float:
""" West boundary of the map"""
return self.__west
@property
def xodr_path(self):
return self.__xodr_path
[docs]
@classmethod
def parse_from_opendrive(cls, file_path: str):
""" Parse the OpenDrive file and create a new Map instance
Args:
file_path: The absolute/relative path to the OpenDrive file
Returns:
A new instance of the Map class
"""
logger.info(f"Parsing map {file_path}.")
tree = etree.parse(file_path)
odr = parse_opendrive(tree.getroot())
new_map = cls(odr)
new_map.__xodr_path = file_path
return new_map