Source code for igp2.recognition.astar

import traceback
import numpy as np
import heapq
import logging
import matplotlib.pyplot as plt
from typing import Callable, List, Dict, Tuple

from shapely.geometry import LineString, Point
from scipy.spatial import distance_matrix

from igp2.opendrive.map import Map
from igp2.opendrive.plot_map import plot_map
from igp2.core.trajectory import VelocityTrajectory
from igp2.core.agentstate import AgentState
from igp2.core.goal import PointGoal, Goal, StoppingGoal
from igp2.core.util import Circle, add_offset_point
from igp2.planlibrary.macro_action import MacroAction, MacroActionConfig, MacroActionFactory, StopMA
from igp2.planlibrary.maneuver import Maneuver

logger = logging.getLogger(__name__)


[docs]class AStar: """ Class implementing A* search over trajectories to goals. """ NEXT_LANE_OFFSET = 0.01 def __init__(self, cost_function: Callable[[VelocityTrajectory, PointGoal], float] = None, heuristic_function: Callable[[VelocityTrajectory, PointGoal], float] = None, next_lane_offset: float = None, max_iter: int = 100): """ Initialises a new A* search class with the given parameters. The search frontier is ordered according to the formula f = g + h. Args: next_lane_offset: A small offset used to reach the next lane when search to the end of a lane cost_function: The cost function g heuristic_function: The heuristic function h max_iter: The maximum number of iterations A* is allowed to run """ self.next_lane_offset = AStar.NEXT_LANE_OFFSET if next_lane_offset is None else next_lane_offset self.max_iter = max_iter self._g = AStar.trajectory_duration if cost_function is None else cost_function self._h = AStar.time_to_goal if heuristic_function is None else heuristic_function self._f = self.cost_function
[docs] def search(self, agent_id: int, frame: Dict[int, AgentState], goal: Goal, scenario_map: Map, n_trajectories: int = 1, open_loop: bool = True, debug: bool = False, visible_region: Circle = None) -> Tuple[List[VelocityTrajectory], List[List[MacroAction]]]: """ Run A* search from the current frame to find trajectories to the given goal. Args: agent_id: The agent to plan for frame: State of the environment to search from goal: The target goal scenario_map: The Map of the scenario n_trajectories: The number of trajectories to return open_loop: Whether to generate open loop or closed loop macro actions in the end debug: If True, then plot the evolution of the frontier at each step visible_region: Region of the map that is visible to the ego vehicle Returns: List of VelocityTrajectories ordered in increasing order of cost. The best trajectory is at index 0, while the worst is at index -1 """ solutions = [] frontier = [(0.0, ([], frame))] iterations = 0 while frontier and len(solutions) < n_trajectories and iterations < self.max_iter: iterations += 1 cost, (actions, frame) = heapq.heappop(frontier) # Check termination condition trajectory = self._full_trajectory(actions, offset_point=False) if self.goal_reached(goal, trajectory) and \ (not isinstance(goal, StoppingGoal) or trajectory.duration >= StopMA.DEFAULT_STOP_DURATION): if not actions: logger.info(f"AID {agent_id} at {goal} already.") else: logger.info(f"Solution found for AID {agent_id} to {goal}: {actions}") solutions.append(actions) continue # Check if current position is valid if not scenario_map.roads_at(frame[agent_id].position): continue # Check if path has self-intersection if actions: # and scenario_map.in_roundabout(frame[agent_id].position, frame[agent_id].heading): if self._check_looping(trajectory, actions[-1]): continue if debug: plot_map(scenario_map, midline=True, hide_road_bounds_in_junction=True) for aid, a in frame.items(): plt.plot(*a.position, marker="o") plt.text(*a.position, aid) plt.scatter(trajectory.path[:, 0], trajectory.path[:, 1], c=trajectory.velocity, cmap=plt.cm.get_cmap('Reds'), vmin=-4, vmax=20, s=8) plt.plot(*goal.center, marker="x") plt.title(f"agent {agent_id} -> {goal}: {actions}") plt.show() for macro_action in MacroActionFactory.get_applicable_actions(frame[agent_id], scenario_map, goal): for ma_args in macro_action.get_possible_args(frame[agent_id], scenario_map, goal): try: ma_args["open_loop"] = open_loop config = MacroActionConfig(ma_args) new_ma = macro_action(config, agent_id=agent_id, frame=frame, scenario_map=scenario_map) new_actions = actions + [new_ma] new_trajectory = self._full_trajectory(new_actions) # Check if has passed through region and went outside region already if not self._check_in_region(new_trajectory, visible_region): continue new_frame = MacroAction.play_forward_macro_action(agent_id, scenario_map, frame, new_ma) new_frame[agent_id] = new_trajectory.final_agent_state new_cost = self._f(new_trajectory, goal) heapq.heappush(frontier, (new_cost, (new_actions, new_frame))) except Exception as e: logger.debug(str(e)) logger.debug(traceback.format_exc()) continue trajectories = [self._full_trajectory(mas, offset_point=False) for mas in solutions] return trajectories, solutions
[docs] def cost_function(self, trajectory: VelocityTrajectory, goal: Goal) -> float: return self._g(trajectory, goal) + self._h(trajectory, goal)
[docs] @staticmethod def trajectory_duration(trajectory: VelocityTrajectory, goal: Goal) -> float: return trajectory.duration
[docs] @staticmethod def time_to_goal(trajectory: VelocityTrajectory, goal: Goal) -> float: return goal.distance(trajectory.path[-1]) / Maneuver.MAX_SPEED
[docs] @staticmethod def goal_reached(goal: Goal, trajectory: VelocityTrajectory) -> bool: if trajectory is None: return False if goal.reached(trajectory.path[-1]): return True elif not isinstance(goal, StoppingGoal): return goal.passed_through_goal(trajectory) return False
def _full_trajectory(self, macro_actions: List[MacroAction], offset_point: bool = True): if not macro_actions: return None path = np.empty((0, 2), float) velocity = np.empty((0,), float) # Join trajectories of macro actions for ma in macro_actions: trajectory = ma.get_trajectory() path = np.concatenate([path[:-1], trajectory.path], axis=0) velocity = np.concatenate([velocity[:-1], trajectory.velocity]) # Add final offset point full_trajectory = VelocityTrajectory(path, velocity) if offset_point: add_offset_point(full_trajectory, self.next_lane_offset) return full_trajectory def _check_looping(self, trajectory: VelocityTrajectory, final_action: MacroAction) -> bool: """ Checks whether the final action brought us back to somewhere we had already visited. """ final_path = final_action.get_trajectory().path[::-1] previous_path = trajectory.path[:-len(final_path)] ds = distance_matrix(previous_path, final_path) close_points = np.sum(np.isclose(ds, 0.0, atol=2 * Maneuver.POINT_SPACING), axis=1) return np.count_nonzero(close_points) > 2 / Maneuver.POINT_SPACING def _check_in_region(self, trajectory: VelocityTrajectory, visible_region: Circle) -> bool: """ Checks whether the trajectory is in the visible region. Ignores the initial section outside of the visible region, as this often happens when the vehicle calculates the optimal trajectory from the first observed point of a vehicle.""" if visible_region is None: return True dists = np.linalg.norm(trajectory.path[:-1] - visible_region.centre, axis=1) # remove ending off offset point in_region = dists <= visible_region.radius + 1 # Add 1m for error if True in in_region: first_in_idx = np.nonzero(in_region)[0][0] in_region = in_region[first_in_idx:] return np.all(in_region) return True