igp2.planning package¶
Submodules¶
igp2.planning.mcts module¶
- class igp2.planning.mcts.MCTS(scenario_map: Map, n_simulations: int = 30, max_depth: int = 5, reward: Reward | None = None, open_loop_rollout: bool = False, trajectory_agents: bool = True, fps: int = 10, env_fps: int = 20, store_results: str | None = None, **kwargs)[source]¶
Bases:
object
Class implementing single-threaded MCTS search over environment states with macro actions.
- search(agent_id: int, goal: Goal, frame: Dict[int, AgentState], meta: Dict[int, AgentMetadata], predictions: Dict[int, GoalsProbabilities], debug: bool = False) Tuple[List[MCTSAction], Tree] [source]¶
Run MCTS search for the given agent
- Parameters:
agent_id – agent to plan for
goal – end goal of the vehicle
frame – current (observed) state of the environment
meta – metadata of agents present in frame
predictions – dictionary of goal predictions for agents in frame
debug – Whether to plot rollouts.
- Returns:
a list of macro actions encoding the optimal plan for the ego agent given the current goal predictions for other agents and the search tree.
- to_key(plan: List[MCTSAction] | None = None) Tuple[str, ...] [source]¶
Convert a list of MCTS actions to an MCTS key.
igp2.planning.mctsaction module¶
igp2.planning.node module¶
- class igp2.planning.node.Node(key: Tuple, state: Dict[int, AgentState], actions: List[MCTSAction])[source]¶
Bases:
object
Represents a search node in the MCTS tree. Stores all relevant information for computation of Q-values and action selection. Keys must be hashable.
During search, a Node must be expanded before it can be added to a Tree. Children of the node are stored in a dictionary with the key being the state and the value the child node itself.
- property action_visits: ndarray¶
Return number of time each action has been selected in this node.
- property actions: List[MCTSAction]¶
Return possible actions in state of node.
- property actions_names: List[str]¶
Return the human-readable names of actions in the node.
- add_reward_result(key: Tuple[str], reward_results: Reward)[source]¶
Add a new reward outcome to the node if the search has ended here.
- property descendants¶
Return all descendants of this node.
- property is_leaf: bool¶
Return true if the node has no children.
- property key: Tuple¶
Unique hashable key identifying the node and the sequence of actions that lead to it.
- property q_values: ndarray¶
Return the Q-values corresponding to each action.
- property reward_results: Dict[str, List[Reward]]¶
Returns a dictionary of reward outcomes where the keys are all possible actions in the node.
- property state: Dict[int, AgentState]¶
Return the state corresponding to this node.
- property state_visits: int¶
Return number of time this state has been selected.
igp2.planning.policy module¶
- class igp2.planning.policy.MaxPolicy[source]¶
Bases:
Policy
Policy selecting the action with highest Q-value at a node.
- class igp2.planning.policy.Policy[source]¶
Bases:
ABC
Abstract class for implementing various selection policies
- class igp2.planning.policy.UCB1(c: float = 1.4142135623730951)[source]¶
Bases:
Policy
Policy implementing the UCB1 selection policy. Ref: https://en.wikipedia.org/wiki/Monte_Carlo_tree_search#Exploration_and_exploitation
igp2.planning.reward module¶
- class igp2.planning.reward.Reward(time_discount: float = 0.99, factors: Dict[str, float] | None = None, default_rewards: Dict[str, float] | None = None)[source]¶
Bases:
object
Used to calculate rollout rewards in MCTS.
- property cost_components: Dict[str, float]¶
The trajectory cost components.
- property default_rewards: Dict[str, float]¶
Default rewards for rollout events e.g. collision, termination, and death.
- property factors: Dict[str, float]¶
Reward component factors.
- property reward: float¶
The last calculate reward.
- property reward_components: Dict[str, float]¶
The current reward components
- property time_discount: float¶
Discounting factor for time-to-goal reward component.
- trajectory_reward(trajectory: StateTrajectory, goal: Goal) Dict[str, float] [source]¶
Calculate reward components for a given trajectory.
igp2.planning.rollout module¶
- class igp2.planning.rollout.Rollout(ego_id: int, initial_frame: Dict[int, AgentState], metadata: Dict[int, AgentMetadata], scenario_map: Map, fps: int = 10, open_loop_agents: bool = False, trajectory_agents: bool = False, t_max: int = 1000)[source]¶
Bases:
object
Lightweight environment simulator useful for rolling out scenarios in MCTS.
One agent is designated as the ego vehicle, while the other agents follow predefined trajectories calculated during goal recognition. Simulation is performed at a given frequency with collision checking.
- property ego_id: int¶
ID of the ego vehicle
- property fps: int¶
Executing frame rate of the simulator
- property initial_frame: Dict[int, AgentState]¶
Return the initial state of the environment
- property metadata: Dict[int, AgentMetadata]¶
Metadata of agents in the current frame
- plot(t: int, axis: Axes | None = None) Axes [source]¶
Plot the current agents and the road layout for visualisation purposes.
- Parameters:
axis – Axis to draw on
- run(start_frame: Dict[int, AgentState], plot_rollout: bool = False) Tuple[StateTrajectory, Dict[int, AgentState], bool, bool, List[Agent]] [source]¶
Execute current macro action of ego and forward the state of the environment with collision checking.
- Returns:
A 5-tuple giving the new state of the environment, the final frame of the simulation, whether the ego has reached its goal, whether the ego is still alive, and if it has collided with another (possible multiple) agents and if so the colliding agents.
- update_ego_action(action: MacroAction, args: Dict, frame: Dict[int, AgentState]) MacroAction [source]¶
Update the current macro action of the ego vehicle.
- Parameters:
action – new macro action to execute
args – MA initialisation arguments
frame – Current state of the environment
- Returns:
The currently execute MA of the ego
- update_ego_goal(goal: Goal)[source]¶
Update the final goal of the ego vehicle.
- Parameters:
goal – new goal to reach
- update_trajectory(agent_id: int, new_trajectory: Trajectory, new_plan: List[MacroAction])[source]¶
Update the predicted trajectory of the non-ego agent. Has no effect for ego or if agent_id not in agents
- Parameters:
agent_id – ID of agent to update
new_trajectory – new trajectory for agent
new_plan – The macro action plan that generated new_trajectory
igp2.planning.tree module¶
- class igp2.planning.tree.Tree(root: Node, action_policy: Policy | None = None, plan_policy: Policy | None = None)[source]¶
Bases:
object
Defines an abstract tree consisting of nodes.
A Tree is represented as a dictionary where each key corresponds to a key of the node and the values are the corresponding node object. A node is a leaf if it has no children and keys correspond to the action selection history that led to the node.
- add_child(parent: Node, child: Node)[source]¶
Add a new child to the tree and assign it under an existing parent node.
- backprop(r: float, final_key: Tuple, force_reward: bool = False)[source]¶
Back-propagate the reward through the search branches.
- Parameters:
r – reward at end of simulation
final_key – final node key including final action
force_reward – if true, then use the reward for a non-terminal state. (default: False)
- property max_depth: int¶
The maximal depth of the search tree.
- property plan_policy: Policy¶
Policy used to select the final plan from the tree. Defaults to argmax by Q-values.
- print(node: Node | None = None)[source]¶
Print the nodes of the tree recursively starting from the given node.
- select_action(node: Node) MCTSAction [source]¶
Select one of the actions in the node using the specified policy and update node statistics
- select_plan() Tuple[List, Tuple[str]] [source]¶
Return the best sequence of actions from the root according to the specified policy.
- property tree: Dict¶
The dictionary representing the tree itself.