Source code for igp2.opendrive.elements.junction

# -*- coding: utf-8 -*-

from typing import List

from shapely.geometry import Polygon, MultiPolygon
from shapely.ops import unary_union

from igp2.opendrive.elements.road_lanes import Lane


[docs]class JunctionPriority: """ Priority specification between incoming and connecting roads. """ def __init__(self, high_id: int, low_id: int): self.high_id = high_id self.low_id = low_id self.high = None self.low = None def __repr__(self): return f"{self.high_id} > {self.low_id}"
[docs]class Connection: """ Object representing a Junction in the OpenDrive standard """ def __init__(self): self._id = None self._incoming_road = None self._connecting_road = None self._contact_point = None self._lane_links = [] def __repr__(self): return f"{self._incoming_road.id} > {self._connecting_road.id}" @property def id(self) -> int: """ Unique ID for the Connection """ return self._id @id.setter def id(self, value: int): self._id = int(value) @property def incoming_road(self): """ The incoming Road object""" return self._incoming_road @incoming_road.setter def incoming_road(self, value): self._incoming_road = value @property def connecting_road(self): """ The connecting Road object """ return self._connecting_road @connecting_road.setter def connecting_road(self, value: int): self._connecting_road = value @property def contact_point(self) -> str: """ Contact point of the connecting road. Either 'start' or 'end' """ return self._contact_point @contact_point.setter def contact_point(self, value: str): if value not in ["start", "end"]: raise AttributeError("Contact point can only be start or end.") self._contact_point = value @property def lane_links(self) -> List[JunctionLaneLink]: """ List of LaneLinks between lanes of the incoming and connecting road """ return self._lane_links
[docs]class Junction: """ Represents a Junction object in the OpenDrive standard""" # TODO controller def __init__(self): self._id = None self._name = None self._boundary = None self._connections = [] self._priorities = [] self._junction_group = None self._roads = [] @property def id(self) -> int: """ Unique ID of the junction """ return self._id @id.setter def id(self, value: int): self._id = int(value) @property def name(self) -> str: """ Name for the junction """ return self._name @name.setter def name(self, value: str): self._name = str(value) @property def connections(self) -> List[Connection]: """ The list of connections in the junction""" return self._connections @property def priorities(self) -> List[JunctionPriority]: return self._priorities @property def junction_group(self) -> "JunctionGroup": return self._junction_group @junction_group.setter def junction_group(self, junction_group: "JunctionGroup"): self._junction_group = junction_group @property def in_roundabout(self) -> bool: """ True if the junction is a roudabout junction. """ return self.junction_group is not None and self.junction_group.type == "roundabout" @property def roads(self) -> List["Road"]: if not self._roads: self._roads = self.get_all_roads() return self._roads
[docs] def add_connection(self, connection: Connection): """ Add a new connection to the Junction Args: connection: The Connection object to add """ if not isinstance(connection, Connection): raise TypeError("Has to be of instance Connection") self._connections.append(connection)
[docs] def add_priority(self, priority: JunctionPriority): """ Add a new priority field to the Junction Args: priority: The JunctionPriority object to add """ if not isinstance(priority, JunctionPriority): raise TypeError("Must be of instance JunctionPriority") self._priorities.append(priority)
[docs] def get_all_roads(self) -> List["Road"]: """ Return all roads that are part of this Junction. Warning: This function assumes that all roads in the junction are connecting roads. """ ret = [] for connection in self._connections: if connection.connecting_road not in ret: ret.append(connection.connecting_road) return ret
[docs] def get_all_connecting_roads(self, incoming_road: "Road") -> List["Road"]: """ Return all connecting roads of the given incoming Road. Args: incoming_road: The incoming Road object Returns: List of all connecting roads """ ret = [] for connection in self._connections: if connection.incoming_road == incoming_road: ret.append(connection.connecting_road) return ret
[docs] def get_all_connecting_lanes(self, incoming_lane: Lane) -> List[Lane]: """ Return all connecting lanes of the given incoming Lane. Args: incoming_lane: The incoming Lane object Returns: List of connecting Lanes """ ret = [] for connection in self._connections: if connection.incoming_road == incoming_lane.parent_road: for lane_link in connection.lane_links: if lane_link.from_id == incoming_lane.id: ret.append(lane_link.to_lane) return ret
@property def boundary(self): """ The boundary of the Junction formed as the union of all roads in the Junction""" return self._boundary
[docs] def calculate_boundary(self): """ Calculate the boundary of the Junction given the list of roads in it """ def extend_boundary(_road): if _road.id not in visited and _road.junction is not None and \ _road.junction.id == self.id: visited.add(_road.id) return unary_union([boundary, _road.boundary]) return boundary boundary = Polygon() visited = set() for connection in self._connections: boundary = extend_boundary(connection.incoming_road) boundary = extend_boundary(connection.connecting_road) if boundary.geom_type == "Polygon": self._boundary = Polygon(boundary.exterior) else: self._boundary = MultiPolygon([Polygon(polygon.exterior) for polygon in boundary.geoms])
[docs]class JunctionGroup: """ A Junction group. Reference: OpenDrive 1.6.1 - Section 10.5 """ def __init__(self, name: str, group_id: int, group_type: str): self.name = name self.id = group_id if group_type not in ["roundabout", "unknown"]: raise ValueError("Junction group type must be roundabout or unknown") self.type = group_type self.junctions = []
[docs] def add_junction(self, junction: Junction): """ Add a new Junction element to the group """ if not isinstance(junction, Junction): raise ValueError("Given object is not a Junction") self.junctions.append(junction)