Source code for igp2.agents.macro_agent

from typing import List, Dict

from igp2.agents.agent import Agent
from igp2.core.goal import Goal
from igp2.core.agentstate import AgentState
from igp2.core.vehicle import KinematicVehicle, Action, Observation
from igp2.planlibrary.macro_action import MacroAction, MacroActionConfig


[docs]class MacroAgent(Agent): """ Agent executing a pre-defined macro action. Useful for simulating the ego vehicle during MCTS. """ def __init__(self, agent_id: int, initial_state: AgentState, goal: Goal = None, fps: int = 20): """ Create a new macro agent. """ super().__init__(agent_id, initial_state, goal, fps) self._vehicle = KinematicVehicle(initial_state, self.metadata, fps) self._current_macro = None self._maneuver_end_idx = [] @property def current_macro(self) -> MacroAction: """ The current macro action of the agent. """ return self._current_macro @property def maneuver_end_idx(self) -> List[int]: """ The closed loop trajectory id at which each macro action maneuver completes.""" return self._maneuver_end_idx
[docs] def done(self, observation: Observation) -> bool: """ Returns true if the current macro action has reached a completion state. """ assert self._current_macro is not None, f"Macro action of Agent {self.agent_id} is None!" return self._current_macro.done(observation)
[docs] def next_action(self, observation: Observation) -> Action: """ Get the next action from the macro action. Args: observation: Observation of current environment state and road layout. Returns: The next action of the agent. """ assert self._current_macro is not None, f"Macro action of Agent {self.agent_id} is None!" if self._current_macro.current_maneuver is not None and self._current_macro.current_maneuver.done(observation): self._maneuver_end_idx.append(len(self.trajectory_cl.states) - 1) return self._current_macro.next_action(observation)
[docs] def next_state(self, observation: Observation, return_action: bool = False) -> AgentState: """ Get the next action from the macro action and execute it through the attached vehicle of the agent. Args: observation: Observation of current environment state and road layout. return_action: If True return the underlying action as well. Returns: The new state of the agent. """ action = self.next_action(observation) self.vehicle.execute_action(action, observation.frame[self.agent_id]) next_state = self.vehicle.get_state(observation.frame[self.agent_id].time + 1) next_state.macro_action = str(self.current_macro) next_state.maneuver = str(self.current_macro.current_maneuver) if not return_action: return next_state else: return next_state, action
[docs] def reset(self): """ Reset the vehicle and macro action of the agent.""" super(MacroAgent, self).reset() self._vehicle = KinematicVehicle(self._initial_state, self.metadata, self._fps) self._current_macro = None
[docs] def update_macro_action(self, macro_action: type(MacroAction), args: Dict, observation: Observation) -> MacroAction: """ Overwrite and initialise current macro action of the agent using the given arguments. Args: macro_action: new macro action to execute args: MA initialisation arguments observation: Observation of the environment """ config = MacroActionConfig(args) config.config_dict["open_loop"] = False config.config_dict["fps"] = self.fps self._current_macro = macro_action(config, agent_id=self.agent_id, frame=observation.frame, scenario_map=observation.scenario_map) return self._current_macro