Source code for igp2.opendrive.parser

# -*- coding: utf-8 -*-

import numpy as np
import logging
from lxml import etree
from igp2.opendrive.elements.opendrive import OpenDrive, Header
from igp2.opendrive.elements.road import Road
from igp2.opendrive.elements.road_link import (
    Predecessor as RoadLinkPredecessor,
    Successor as RoadLinkSuccessor,
    Neighbor as RoadLinkNeighbor,
)
from igp2.opendrive.elements.road_type import (
    RoadType,
    Speed as RoadTypeSpeed,
)
from igp2.opendrive.elements.road_elevation_profile import (
    ElevationRecord as RoadElevationProfile,
)
from igp2.opendrive.elements.road_lateral_profile import (
    Superelevation as RoadLateralProfileSuperelevation,
    Crossfall as RoadLateralProfileCrossfall,
    Shape as RoadLateralProfileShape,
)
from igp2.opendrive.elements.road_lanes import (
    LaneOffset as RoadLanesLaneOffset,
    Lane as RoadLaneSectionLane,
    LaneSection as RoadLanesSection,
    LaneWidth as RoadLaneSectionLaneWidth,
    LaneBorder as RoadLaneSectionLaneBorder,
    LaneMarker as RoadLaneSectionMarker
)
from igp2.opendrive.elements.junction import (
    Junction,
    Connection as JunctionConnection,
    JunctionLaneLink as JunctionConnectionLaneLink, JunctionPriority, JunctionGroup,
)

logger = logging.getLogger(__name__)


[docs]def parse_opendrive(root_node) -> OpenDrive: """Tries to parse XML tree, returns OpenDRIVE object Args: root_node: The root node of a parsed OpenDrive XML tree Returns: The object representing an OpenDrive specification. """ # Only accept lxml element if not etree.iselement(root_node): raise TypeError("Argument root_node is not a xml element") opendrive = OpenDrive() # Header header = root_node.find("header") if header is not None: parse_opendrive_header(opendrive, header) # Load roads for road in root_node.findall("road"): parse_opendrive_road(opendrive, road) # Load Junctions for junction in root_node.findall("junction"): parse_opendrive_junction(opendrive, junction) # Load JunctionGroups for junction_group in root_node.findall("junctionGroup"): parse_opendrive_junction_group(opendrive, junction_group) # Load Road Links for road in root_node.findall("road"): parse_opendrive_road_link(opendrive, road) # Load Junction Lane Links for junction in opendrive.junctions: load_junction_lane_links(junction) # Load additional objects for road in opendrive.roads: # Load Junction references if road.junction is not None: road.junction = opendrive.get_junction(road.junction) # Load Lane Links for current Road load_road_lane_links(road) return opendrive
[docs]def parse_opendrive_road_type(road, opendrive_xml_road_type: etree.ElementTree): """Parse opendrive road type and append to road object. Args: road: Road to append the parsed road_type to types. opendrive_xml_road_type: XML element which contains the information. opendrive_xml_road_type: etree.ElementTree: """ speed = None if opendrive_xml_road_type.find("speed") is not None: speed = RoadTypeSpeed( max_speed=opendrive_xml_road_type.find("speed").get("max"), unit=opendrive_xml_road_type.find("speed").get("unit"), ) road_type = RoadType( s_pos=opendrive_xml_road_type.get("s"), use_type=opendrive_xml_road_type.get("type"), speed=speed, ) road.types.append(road_type)
[docs]def parse_opendrive_road_geometry(new_road, road_geometry): start_coord = [float(road_geometry.get("x")), float(road_geometry.get("y"))] if road_geometry.find("line") is not None: new_road.plan_view.add_line( start_coord, float(road_geometry.get("hdg")), float(road_geometry.get("length")), ) elif road_geometry.find("spiral") is not None: new_road.plan_view.add_spiral( start_coord, float(road_geometry.get("hdg")), float(road_geometry.get("length")), float(road_geometry.find("spiral").get("curvStart")), float(road_geometry.find("spiral").get("curvEnd")), ) elif road_geometry.find("arc") is not None: new_road.plan_view.add_arc( start_coord, float(road_geometry.get("hdg")), float(road_geometry.get("length")), float(road_geometry.find("arc").get("curvature")), ) elif road_geometry.find("poly3") is not None: new_road.plan_view.add_poly3( start_coord, float(road_geometry.get("hdg")), float(road_geometry.get("length")), float(road_geometry.find("poly3").get("a")), float(road_geometry.find("poly3").get("b")), float(road_geometry.find("poly3").get("c")), float(road_geometry.find("poly3").get("d")), ) # raise NotImplementedError() elif road_geometry.find("paramPoly3") is not None: if road_geometry.find("paramPoly3").get("pRange"): if road_geometry.find("paramPoly3").get("pRange") == "arcLength": p_max = float(road_geometry.get("length")) else: p_max = None else: p_max = None new_road.plan_view.add_param_poly3( start_coord, float(road_geometry.get("hdg")), float(road_geometry.get("length")), float(road_geometry.find("paramPoly3").get("aU")), float(road_geometry.find("paramPoly3").get("bU")), float(road_geometry.find("paramPoly3").get("cU")), float(road_geometry.find("paramPoly3").get("dU")), float(road_geometry.find("paramPoly3").get("aV")), float(road_geometry.find("paramPoly3").get("bV")), float(road_geometry.find("paramPoly3").get("cV")), float(road_geometry.find("paramPoly3").get("dV")), p_max, ) else: raise Exception("invalid xml")
[docs]def parse_opendrive_road_elevation_profile(new_road, road_elevation_profile): for elevation in road_elevation_profile.findall("elevation"): new_elevation = ( RoadElevationProfile( float(elevation.get("a")), float(elevation.get("b")), float(elevation.get("c")), float(elevation.get("d")), start_pos=float(elevation.get("s")), ), ) new_road.elevation_profile.elevations.append(new_elevation)
[docs]def parse_opendrive_road_lateral_profile(new_road, road_lateral_profile): for superelevation in road_lateral_profile.findall("superelevation"): new_superelevation = RoadLateralProfileSuperelevation( float(superelevation.get("a")), float(superelevation.get("b")), float(superelevation.get("c")), float(superelevation.get("d")), start_pos=float(superelevation.get("s")), ) new_road.lateral_profile.superelevations.append(new_superelevation) for crossfall in road_lateral_profile.findall("crossfall"): new_crossfall = RoadLateralProfileCrossfall( float(crossfall.get("a")), float(crossfall.get("b")), float(crossfall.get("c")), float(crossfall.get("d")), side=crossfall.get("side"), start_pos=float(crossfall.get("s")), ) new_road.lateral_profile.crossfalls.append(new_crossfall) for shape in road_lateral_profile.findall("shape"): new_shape = RoadLateralProfileShape( float(shape.get("a")), float(shape.get("b")), float(shape.get("c")), float(shape.get("d")), start_pos=float(shape.get("s")), start_pos_t=float(shape.get("t")), ) new_road.lateral_profile.shapes.append(new_shape)
[docs]def parse_opendrive_road_lane_offset(new_road, lane_offset): new_lane_offset = RoadLanesLaneOffset( float(lane_offset.get("a")), float(lane_offset.get("b")), float(lane_offset.get("c")), float(lane_offset.get("d")), start_pos=float(lane_offset.get("s")), ) new_road.lanes.lane_offsets.append(new_lane_offset)
[docs]def parse_opendrive_road_lane_section(new_road, lane_section_id, lane_section, program=None): new_lane_section = RoadLanesSection(road=new_road) # Manually enumerate lane sections for referencing purposes new_lane_section.idx = lane_section_id new_lane_section._start_ds = float(lane_section.get("s")) new_lane_section.single_side = lane_section.get("singleSide") sides = dict( left=new_lane_section.left_lanes, center=new_lane_section.center_lanes, right=new_lane_section.right_lanes, ) drivable = False for sideTag, newSideLanes in sides.items(): side = lane_section.find(sideTag) # It is possible one side is not present if side is None: continue for lane in side.findall("lane"): new_lane = RoadLaneSectionLane( parent_road=new_road, lane_section=new_lane_section ) new_lane.id = lane.get("id") new_lane.type = lane.get("type") drivable = drivable or (new_lane.type == "driving") # In some sample files the level is not specified according to the OpenDRIVE spec new_lane.level = ( "true" if lane.get("level") in [1, "1", "true"] else "false" ) # Lane Links if lane.find("link") is not None: if lane.find("link").find("predecessor") is not None: new_lane.link.predecessor_id = ( lane.find("link").find("predecessor").get("id") ) if lane.find("link").find("successor") is not None: new_lane.link.successor_id = ( lane.find("link").find("successor").get("id") ) # RoadRunner does not take lane direction into account when outputting lane links so need to flip # lane IDs for left-lanes if program == "RoadRunner" and new_lane.id > 0: new_lane.link.predecessor_id, new_lane.link.successor_id = \ new_lane.link.successor_id, new_lane.link.predecessor_id # Width for widthIdx, width in enumerate(lane.findall("width")): new_width = RoadLaneSectionLaneWidth( float(width.get("a")), float(width.get("b")), float(width.get("c")), float(width.get("d")), idx=widthIdx, start_offset=float(width.get("sOffset")), ) new_lane.widths.append(new_width) # Border for borderIdx, border in enumerate(lane.findall("border")): new_border = RoadLaneSectionLaneBorder( float(border.get("a")), float(border.get("b")), float(border.get("c")), float(border.get("d")), idx=borderIdx, start_offset=float(border.get("sOffset")), ) new_lane.borders.append(new_border) if lane.find("width") is None and lane.find("border") is not None: new_lane.widths = new_lane.borders new_lane.has_border_record = True # Road Marks for markerIdx, marker in enumerate(lane.findall("roadMark")): new_marker = RoadLaneSectionMarker( width=float(marker.get("width", 0.0)), color=marker.get("color"), weight=marker.get("weight"), type=marker.get("type"), idx=markerIdx, start_offset=float(marker.get("sOffset")) ) new_lane.markers.append(new_marker) # Material # TODO implementation # Visiblility # TODO implementation # Speed # TODO implementation # Access # TODO implementation # Lane Height # TODO implementation # Rules # TODO implementation newSideLanes.append(new_lane) new_lane_section._drivable = drivable new_road.lanes.lane_sections.append(new_lane_section)
[docs]def parse_opendrive_road(opendrive, road): new_road = Road() new_road.id = int(road.get("id")) new_road.name = road.get("name") junction_id = int(road.get("junction")) if road.get("junction") != "-1" else None new_road.junction = junction_id # TODO verify road length new_road._length = float(road.get("length")) # Type for opendrive_xml_road_type in road.findall("type"): parse_opendrive_road_type(new_road, opendrive_xml_road_type) # Plan view for road_geometry in road.find("planView").findall("geometry"): parse_opendrive_road_geometry(new_road, road_geometry) # Elevation profile road_elevation_profile = road.find("elevationProfile") if road_elevation_profile is not None: parse_opendrive_road_elevation_profile(new_road, road_elevation_profile) # Lateral profile road_lateral_profile = road.find("lateralProfile") if road_lateral_profile is not None: parse_opendrive_road_lateral_profile(new_road, road_lateral_profile) # Lanes lanes = road.find("lanes") if lanes is None: raise Exception("Road must have lanes element") # Lane offset for lane_offset in lanes.findall("laneOffset"): parse_opendrive_road_lane_offset(new_road, lane_offset) # Lane sections for lane_section_id, lane_section in enumerate(road.find("lanes").findall("laneSection")): parse_opendrive_road_lane_section(new_road, lane_section_id, lane_section, opendrive.header.program) # Objects # TODO implementation # Signals # TODO implementation calculate_lane_section_lengths(new_road) opendrive.roads.append(new_road)
[docs]def parse_opendrive_header(opendrive, header): parsed_header = Header( header.get("revMajor"), header.get("revMinor"), header.get("name"), header.get("version"), header.get("date"), header.get("north"), header.get("south"), header.get("east"), header.get("west"), header.get("vendor"), ) # Reference if header.find("geoReference") is not None: parsed_header.geo_reference = header.find("geoReference").text # Find whether file was created with RoadRunner user_data = header.find("userData") if user_data is not None: vector_scene = user_data.find("vectorScene") if vector_scene is not None: parsed_header.program = vector_scene.get("program") opendrive.header = parsed_header
[docs]def parse_opendrive_junction(opendrive, junction): new_junction = Junction() new_junction.id = int(junction.get("id")) new_junction.name = str(junction.get("name")) for connection in junction.findall("connection"): new_connection = JunctionConnection() new_connection.id = connection.get("id") incoming_road_id = int(connection.get("incomingRoad")) new_connection.incoming_road = opendrive.get_road(incoming_road_id) connecting_road_id = int(connection.get("connectingRoad")) new_connection.connecting_road = opendrive.get_road(connecting_road_id) new_connection.contact_point = connection.get("contactPoint") for laneLink in connection.findall("laneLink"): new_lane_link = JunctionConnectionLaneLink() new_lane_link.from_id = laneLink.get("from") new_lane_link.to_id = laneLink.get("to") new_connection.add_lane_link(new_lane_link) new_junction.add_connection(new_connection) for priority in junction.findall("priority"): low_id = int(priority.get("low")) high_id = int(priority.get("high")) new_priority = JunctionPriority(high_id, low_id) new_priority.low = opendrive.get_road(low_id) new_priority.high = opendrive.get_road(high_id) new_junction.add_priority(new_priority) opendrive.junctions.append(new_junction)
[docs]def parse_opendrive_junction_group(opendrive, junction_group): new_junction_group = JunctionGroup( junction_group.get("name"), int(junction_group.get("id")), junction_group.get("type") ) for junction_reference in junction_group.findall("junctionReference"): junction_id = int(junction_reference.get("junction")) junction = opendrive.get_junction(junction_id) if junction is None: raise ValueError(f"Junction with ID {junction_id}") new_junction_group.add_junction(junction) junction.junction_group = new_junction_group opendrive.junction_groups.append(new_junction_group)
[docs]def calculate_lane_section_lengths(new_road): # OpenDRIVE does not provide lane section lengths by itself, calculate them by ourselves for lane_section in new_road.lanes.lane_sections: # Last lane section in road if lane_section.idx + 1 >= len(new_road.lanes.lane_sections): lane_section.length = new_road.plan_view.length - lane_section.start_distance # All but the last lane section end at the succeeding one else: lane_section.length = ( new_road.lanes.lane_sections[lane_section.idx + 1].start_distance - lane_section.start_distance ) # OpenDRIVE does not provide lane width lengths by itself, calculate them by ourselves for lane_section in new_road.lanes.lane_sections: for lane in lane_section.all_lanes: widths_poses = np.array( [x.start_offset for x in lane.widths] + [lane_section.length] ) widths_lengths = widths_poses[1:] - widths_poses[:-1] for widthIdx, width in enumerate(lane.widths): width.length = widths_lengths[widthIdx]