Source code for igp2.agents.mcts_agent

import numpy as np
from typing import List, Dict, Tuple
from shapely.geometry import Point

from igp2.agents.traffic_agent import TrafficAgent
from igp2.core.trajectory import Trajectory, StateTrajectory
from igp2.core.agentstate import AgentState
from igp2.opendrive.map import Map
from igp2.core.goal import Goal, PointGoal, StoppingGoal, PointCollectionGoal
from igp2.core.vehicle import TrajectoryVehicle, Observation, Action
from igp2.core.util import Circle, find_lane_sequence
from igp2.core.cost import Cost
from igp2.core.velocitysmoother import VelocitySmoother
from igp2.planlibrary.maneuver import Maneuver
from igp2.planning.reward import Reward
from igp2.planning.mcts import MCTS
from igp2.recognition.astar import AStar
from igp2.recognition.goalrecognition import GoalRecognition
from igp2.recognition.goalprobabilities import GoalsProbabilities


[docs]class MCTSAgent(TrafficAgent): def __init__(self, agent_id: int, initial_state: AgentState, t_update: float, scenario_map: Map, goal: Goal = None, view_radius: float = 50.0, fps: int = 20, kinematic: bool = False, n_simulations: int = 5, max_depth: int = 5, store_results: str = 'final', trajectory_agents: bool = True, cost_factors: Dict[str, float] = None, reward_factors: Dict[str, float] = None, velocity_smoother: dict = None, goal_recognition: dict = None, stop_goals: bool = False): """ Create a new MCTS agent. Args: agent_id: THe ID of the agent to create initial_state: The initial state of the agent at the start of initialisation t_update: the time interval between runs of the planner scenario_map: The current road layout goal: The end goal of the agent view_radius: The radius of a circle in which the agent can see the other agents fps: The execution frequency of the environment kinematic: If True then use a kinematic vehicle, otherwise a trajectory vehicle. n_simulations: The number of simulations to perform in MCTS max_depth: The maximum search depth of MCTS (in macro actions) store_results: Whether to save the traces of the MCTS rollouts trajectory_agents: Whether to use trajectories or plans for non-egos in MCTS cost_factors: For trajectory cost calculations of ego in goal recognition reward_factors: Reward factors for MCTS rollouts velocity_smoother: Velocity smoother arguments. See: VelocitySmoother goal_recognition: Goal recognition parameters. See: GoalRecognition stop_goals: Whether to check for stopping goals. """ super().__init__(agent_id, initial_state, goal, fps) if not kinematic: self._vehicle = TrajectoryVehicle(initial_state, self.metadata, fps) self._current_macro_id = 0 self._macro_actions = None self._goal_probabilities = None self._observations = {} self._k = 0 self._stop_goals = stop_goals self._view_radius = view_radius self._kmax = t_update * self._fps self._cost = Cost(factors=cost_factors) if cost_factors is not None else Cost() self._reward = Reward(factors=reward_factors) if reward_factors is not None else Reward() self._astar = AStar(next_lane_offset=0.1) if velocity_smoother is None: velocity_smoother = {"vmin_m_s": 1, "vmax_m_s": 10, "n": 10, "amax_m_s2": 5, "lambda_acc": 10} self._smoother = VelocitySmoother(**velocity_smoother) if goal_recognition is None: goal_recognition = {"reward_as_difference": False, "n_trajectories": 2} self._goal_recognition = GoalRecognition(astar=self._astar, smoother=self._smoother, scenario_map=scenario_map, cost=self._cost, **goal_recognition) self._mcts = MCTS(scenario_map=scenario_map, reward=self._reward, n_simulations=n_simulations, max_depth=max_depth, store_results=store_results, trajectory_agents=trajectory_agents) self._goals: List[Goal] = []
[docs] def done(self, observation: Observation): """ True if the agent has reached its goal. """ return self.goal.reached(self.state.position)
[docs] def reset(self): """ Reset the vehicle and macro action of the agent.""" super(MCTSAgent, self).reset() self._vehicle = type(self.vehicle)(self._initial_state, self.metadata, self._fps)
[docs] def update_plan(self, observation: Observation): """ Runs MCTS to generate a new sequence of macro actions to execute.""" frame = observation.frame agents_metadata = {aid: state.metadata for aid, state in frame.items()} self._goal_probabilities = {aid: GoalsProbabilities(self._goals) for aid in frame.keys() if aid != self.agent_id} visible_region = Circle(frame[self.agent_id].position, self.view_radius) for agent_id in frame: if agent_id == self.agent_id: continue self._goal_recognition.update_goals_probabilities( goals_probabilities=self._goal_probabilities[agent_id], observed_trajectory=self._observations[agent_id][0], agent_id=agent_id, frame_ini=self._observations[agent_id][1], frame=frame, visible_region=visible_region) self._macro_actions = self._mcts.search( agent_id=self.agent_id, goal=self.goal, frame=frame, meta=agents_metadata, predictions=self._goal_probabilities)
[docs] def next_action(self, observation: Observation) -> Action: """ Returns the next action for the agent. If the current macro actions has finished, then updates it. If no macro actions are left in the plan, or we have hit the planning time step, then calls goal recognition and MCTS. """ self.update_observations(observation) if self._k >= self._kmax or self.current_macro is None or \ (self.current_macro.done(observation) and self._current_macro_id == len(self._macro_actions) - 1): self._goals = self.get_goals(observation) self.update_plan(observation) self.update_macro_action(self._macro_actions[0].macro_action_type, self._macro_actions[0].ma_args, observation) self._k = 0 if self.current_macro.done(observation): self._advance_macro(observation) self._k += 1 return self.current_macro.next_action(observation)
[docs] def update_observations(self, observation: Observation): frame = observation.frame for aid, agent_state in frame.items(): try: # TODO check that this will handle agents disappearing and reappearing from field of view successfully # We remove the agent after a single frame of it not being observed, to handle despawned # agents and agents that leave the field of view. # As a future improvement we could have the agents that leave the observation radius stay in memory # for a given amount of time. We could do this by comparing the observation time with the last time # each agent was observed. We should also use the alive/dead attribute for despawned agents. self._observations[aid][0].add_state(agent_state) except KeyError: self._observations[aid] = (StateTrajectory(fps=self._fps, states=[agent_state]), frame) for aid in list(self._observations.keys()): if aid not in frame: self._observations.pop(aid)
[docs] def get_goals(self, observation: Observation, threshold: float = 2.0) -> List[Goal]: """Retrieve all possible goals reachable from the current position on the map in any direction. If more than one goal is found on a single lane, then only choose the one furthest along the midline of the lane. Args: observation: Observation of the environment threshold: The goal checking threshold """ scenario_map = observation.scenario_map frame = observation.frame state = frame[self.agent_id] view_circle = Point(*state.position).buffer(self.view_radius) possible_goals = [] # Retrieve relevant roads and check intersection of its lanes' midlines for road in scenario_map.roads.values(): if not road.boundary.intersects(view_circle): continue for lane_section in road.lanes.lane_sections: for lane in lane_section.all_lanes: if lane.id == 0 or lane.type != "driving": continue new_point = None # First check if the lane intersects the view boundary anywhere intersection = lane.midline.intersection(view_circle.boundary) if not intersection.is_empty: if hasattr(intersection, "geoms"): max_distance = np.inf for point in intersection.geoms: if lane.distance_at(point) < max_distance: new_point = point else: new_point = intersection # If not, and the lane is completely within the circle then choose the lane end-point elif view_circle.contains(lane.boundary): # if lane.link.predecessor is None: # new_point = lane.midline.coords[-1] if lane.link.successor is None: new_point = Point(lane.midline.coords[-1]) else: continue else: continue # Do not add point if within threshold distance to an existing goal new_point = np.array(new_point.coords[0]) if not any([np.allclose(new_point, g.center, atol=threshold) for _, g in possible_goals]): new_goal = PointGoal(new_point, threshold=threshold) possible_goals.append((lane, new_goal)) # Add stopping goals stopping_goals = [] if self._stop_goals: for aid, s in frame.items(): if aid == self.agent_id: continue # First, for all agents that are stopped if s.speed < Trajectory.VELOCITY_STOP: stopping_goals.append(StoppingGoal(s.position, threshold=threshold)) current_lane = observation.scenario_map.best_lane_at(s.position, s.heading) for lane, goal in possible_goals: lanes_to_goal = find_lane_sequence(current_lane, lane, goal) if not lanes_to_goal: continue vehicle_in_front, distance, lane_ls = Maneuver.get_vehicle_in_front(aid, frame, lanes_to_goal) if vehicle_in_front is not None and frame[vehicle_in_front].speed - Trajectory.VELOCITY_STOP < 0.05: ds = lane_ls.project(Point(frame[vehicle_in_front].position)) backtrack_length = frame[vehicle_in_front].metadata.length / 2 + 3 + self.metadata.length / 2 backtrack_ds = max(self.metadata.length / 2 + 1e-3, ds - backtrack_length) stopping_point = np.array(lane_ls.interpolate(backtrack_ds).coords[0]) if not any([np.allclose(stopping_point, g.center, atol=threshold) for g in stopping_goals]): new_goal = StoppingGoal(stopping_point, threshold=threshold) stopping_goals.append(new_goal) # Group goals that are in neighbouring lanes goals = [] used = [] for lane, goal in possible_goals: if goal in used: continue neighbouring_goals = [goal] for other_lane, other_goal in possible_goals: if goal == other_goal: continue if lane.parent_road == other_lane.parent_road and np.abs(lane.id - other_lane.id) == 1: # TODO: This could group goals that are in neighbouring lanes but very far apart still. neighbouring_goals.append(other_goal) used.append(other_goal) if len(neighbouring_goals) > 1: goals.append(PointCollectionGoal(neighbouring_goals)) else: goals.append(goal) return goals + stopping_goals
def _advance_macro(self, observation: Observation): if not self._macro_actions: raise RuntimeError("Agent has no macro actions.") self._current_macro_id += 1 if self._current_macro_id >= len(self._macro_actions): raise RuntimeError("No more macro actions to execute.") else: next_macro = self._macro_actions[self._current_macro_id] self.update_macro_action(next_macro.macro_action_type, next_macro.ma_args, observation) @property def view_radius(self) -> float: """ The view radius of the agent. """ return self._view_radius @property def observations(self) -> Dict[int, Tuple[StateTrajectory, AgentState]]: """Returns the ego's knowledge about other agents, sorted in a dictionary with keys corresponding to agents ids. It stores the trajectory observed so far and the frame at which each agent was initially observed. Currently, any agent out of view is immediately forgotten.""" return self._observations @property def possible_goals(self) -> List[Goal]: """ Return the current list of possible goals. """ return self._goals @property def goal_probabilities(self) -> Dict[int, GoalsProbabilities]: """ Return the currently stored goal prediction probabilities of the ego.""" return self._goal_probabilities @property def mcts(self) -> "MCTS": """ Return the MCTS planner of the agent. """ return self._mcts