Source code for igp2.planlibrary.maneuver_cl

import abc
import numpy as np
import logging
from typing import Dict
from shapely.geometry import LineString, Point

from igp2.planlibrary.maneuver import Maneuver, ManeuverConfig, FollowLane, Turn, \
    GiveWay, SwitchLaneLeft, SwitchLaneRight, Stop, TrajectoryManeuver
from igp2.planlibrary.controller import PIDController, AdaptiveCruiseControl
from igp2.core.agentstate import AgentState
from igp2.core.vehicle import Observation, Action
from igp2.opendrive import Map

logger = logging.getLogger(__name__)


[docs]class ClosedLoopManeuver(Maneuver, abc.ABC): """ Defines a maneuver in which sensor feedback is used """
[docs] def next_action(self, observation: Observation) -> Action: """ Selects the next action for the vehicle to take Args: observation: current environment Observation Returns: Action that the vehicle should take """ raise NotImplementedError
[docs] def done(self, observation: Observation) -> bool: """ Checks if the maneuver is finished Args: observation: current environment Observation Returns: Bool indicating whether the maneuver is completed """ raise NotImplementedError
[docs] def reset(self): """ Reset the internal state of the macro action (if any). """ raise NotImplementedError
[docs]class WaypointManeuver(ClosedLoopManeuver, abc.ABC): WAYPOINT_MARGIN = 1 COMPLETION_MARGIN = 0.5 LATERAL_ARGS = {'K_P': 1.95, 'K_I': 0.2, 'K_D': 0.0} LONGITUDINAL_ARGS = {'K_P': 1.0, 'K_I': 0.05, 'K_D': 0.0} ACC_ARGS = {'a_a': 5, 'b_a': 5, 'delta': 4., 's_0': 2., 'T_a': 1.5} def __init__(self, config: ManeuverConfig, agent_id: int, frame: Dict[int, AgentState], scenario_map: Map): super().__init__(config, agent_id, frame, scenario_map) self._controller = PIDController(1 / self.config.fps, self.LATERAL_ARGS, self.LONGITUDINAL_ARGS) self._acc = AdaptiveCruiseControl(1 / self.config.fps, **self.ACC_ARGS)
[docs] def get_target_waypoint(self, state: AgentState): """ Get the index of the target waypoint in the reference trajectory""" dist = np.linalg.norm(self.trajectory.path - state.position, axis=1) closest_idx = np.argmin(dist) if dist[-1] < self.WAYPOINT_MARGIN: target_wp_idx = len(self.trajectory.path) - 1 else: far_waypoints_dist = dist[closest_idx:] target_wp_idx = closest_idx + np.argmax(far_waypoints_dist >= self.WAYPOINT_MARGIN) return target_wp_idx, closest_idx
[docs] def next_action(self, observation: Observation) -> Action: target_wp_idx, closest_idx = self.get_target_waypoint(observation.frame[self.agent_id]) target_waypoint = self.trajectory.path[target_wp_idx] target_velocity = max(Maneuver.MIN_SPEED, self.trajectory.velocity[closest_idx]) return self._get_action(target_waypoint, target_velocity, observation)
def _get_action(self, target_waypoint: np.ndarray, target_velocity: float, observation: Observation): velocity_error = self._get_acceleration(target_velocity, observation.frame) heading_error = self._get_steering(target_waypoint, observation.frame) acceleration, steering = self._controller.next_action(velocity_error, heading_error) action = Action(acceleration, steering, target_velocity) return action def _get_steering(self, target_waypoint: np.ndarray, frame: Dict[int, AgentState]) -> float: state = frame[self.agent_id] target_direction = target_waypoint - state.position waypoint_heading = np.arctan2(target_direction[1], target_direction[0]) if np.all(target_waypoint == self.trajectory.path[-1]): waypoint_heading = self.trajectory.heading[-1] heading_error = np.diff(np.unwrap([state.heading, waypoint_heading]))[0] return heading_error def _get_acceleration(self, target_velocity: float, frame: Dict[int, AgentState]): state = frame[self.agent_id] acceleration = target_velocity - state.speed vehicle_in_front, dist, _ = self.get_vehicle_in_front(self.agent_id, frame, self.lane_sequence) if vehicle_in_front is not None: in_front_speed = frame[vehicle_in_front].speed gap = dist - state.metadata.length acc_acceleration = self._acc.get_acceleration(self.MAX_SPEED, state.speed, in_front_speed, gap) acceleration = min(acceleration, acc_acceleration) return acceleration
[docs] def done(self, observation: Observation) -> bool: state = observation.frame[self.agent_id] ls = LineString(self.trajectory.path) p = Point(state.position) dist_along = ls.project(p) dist_from_end = np.linalg.norm(state.position - self.trajectory.path[-1]) # We want the vehicle to enter the next lane, so we are not done until we have not passed the midline ret = dist_along >= ls.length and dist_from_end > self.COMPLETION_MARGIN return ret
[docs] def reset(self): return
[docs]class FollowLaneCL(FollowLane, WaypointManeuver): """ Closed loop follow lane maneuver """ pass
[docs]class TurnCL(Turn, WaypointManeuver): """ Closed loop turn maneuver """ pass
[docs]class SwitchLaneLeftCL(SwitchLaneLeft, WaypointManeuver): """ Closed loop switch lane left maneuver """ pass
[docs]class SwitchLaneRightCL(SwitchLaneRight, WaypointManeuver): """ Closed loop switch lane right maneuver """ pass
[docs]class TrajectoryManeuverCL(TrajectoryManeuver, WaypointManeuver): """ Closed loop maneuver that follows a pre-defined trajectory """ pass
[docs]class GiveWayCL(GiveWay, WaypointManeuver): """ Closed loop give way maneuver """ def __stop_required(self, observation: Observation, target_wp_idx: int): ego_time_to_junction = self.trajectory.times[-1] - self.trajectory.times[target_wp_idx] times_to_junction = self._get_times_to_junction( observation.frame, observation.scenario_map, ego_time_to_junction) time_until_clear = self._get_time_until_clear(ego_time_to_junction, times_to_junction) return time_until_clear > 0
[docs] def next_action(self, observation: Observation) -> Action: state = observation.frame[self.agent_id] target_wp_idx, closest_idx = self.get_target_waypoint(state) target_waypoint = self.trajectory.path[target_wp_idx] dist_to_junction = np.linalg.norm(self.trajectory.path[-1] - state.position) # Based on d = v^2 / (2 * mu * g), with mu=0.75 which corresponds to a slightly damp road friction coefficient stopping_distance = state.speed ** 2 / (2 * 0.75 * 9.8) + state.metadata.length / 2 close_to_junction_entry = dist_to_junction < stopping_distance target_velocity = max(Maneuver.MIN_SPEED, self.trajectory.velocity[target_wp_idx]) if close_to_junction_entry and \ self.config.stop and \ self.__stop_required(observation, target_wp_idx): target_velocity = 0 return self._get_action(target_waypoint, target_velocity, observation)
[docs]class StopCL(Stop, WaypointManeuver): def __init__(self, config: ManeuverConfig, agent_id: int, frame: Dict[int, AgentState], scenario_map: Map): self.__stop_duration = 0 super(StopCL, self).__init__(config, agent_id, frame, scenario_map)
[docs] def next_action(self, observation: Observation) -> Action: state = observation.frame[self.agent_id] target_wp_idx, closest_idx = self.get_target_waypoint(state) target_waypoint = self.trajectory.path[target_wp_idx] target_velocity = max(Maneuver.MIN_SPEED, self.trajectory.velocity[target_wp_idx]) distance_to_stop = np.linalg.norm(self.trajectory.path[-1] - state.position) stopping_distance = state.speed ** 2 / (2 * 0.75 * 9.8) + state.metadata.length / 2 if distance_to_stop < stopping_distance: self.__stop_duration += 1 target_velocity = Stop.STOP_VELOCITY return self._get_action(target_waypoint, target_velocity, observation)
[docs] def done(self, observation: Observation) -> bool: return self.__stop_duration >= self.config.stop_duration * self.config.fps
[docs] def reset(self): self.__stop_duration = 0
[docs]class CLManeuverFactory: """ Used to register and create closed-loop maneuvers. """ maneuver_types = {"follow-lane": FollowLaneCL, "switch-left": SwitchLaneLeftCL, "switch-right": SwitchLaneRightCL, "turn": TurnCL, "give-way": GiveWayCL, "stop": StopCL, "trajectory": TrajectoryManeuverCL}
[docs] @classmethod def create(cls, config: ManeuverConfig, agent_id: int, frame: Dict[int, AgentState], scenario_map: Map): """ Create a new closed-loop maneuver in the given state of the environment with the given configuration. Args: config: The maneuver configuration file. agent_id: The agent for whom the maneuver is created. frame: The state of all observable agents in the environment. scenario_map: The road layout. """ assert config.type in cls.maneuver_types, f"Unregistered maneuver {config.type}. " \ f"Register with CLManeuverFactory.register_new_maneuver." config.config_dict["adjust_swerving"] = False return cls.maneuver_types[config.type](config, agent_id, frame, scenario_map)
[docs] @classmethod def register_new_maneuver(cls, type_str: str, type_man: type(ClosedLoopManeuver)): """ Register a new closed-loop maneuver to the list of available maneuvers Args: type_str: The type name of the maneuver to register. type_man: The type of the maneuver to register. """ assert isinstance(type_man, type(ClosedLoopManeuver)), f"Given type_man is not a MacroAction" assert type_str not in cls.maneuver_types, f"CLManeuver {type_str} already registered." cls.maneuver_types[type_str] = type_man logger.info(f"Register closed-loop maneuver {type_str} as {type_man}")