igp2.planning package

Submodules

igp2.planning.mcts module

class igp2.planning.mcts.MCTS(scenario_map: Map, n_simulations: int = 30, max_depth: int = 5, reward: Reward | None = None, open_loop_rollout: bool = False, trajectory_agents: bool = True, fps: int = 10, store_results: str | None = None, tree_type: type | None = None, node_type: type | None = None, action_type: type | None = None)[source]

Bases: object

Class implementing single-threaded MCTS search over environment states with macro actions.

create_node(key: Tuple, agent_id: int, frame: Dict[int, AgentState], goal: Goal) Node[source]

Create a new node and expand it.

Parameters:
  • key – Key to assign to the node

  • agent_id – Agent we are searching for

  • frame – Current state of the environment

  • goal – Goal of the agent with agent_id

reset_results()[source]

Resets the stored results in the MCTS instance.

search(agent_id: int, goal: Goal, frame: Dict[int, AgentState], meta: Dict[int, AgentMetadata], predictions: Dict[int, GoalsProbabilities], debug: bool = False) List[MCTSAction][source]

Run MCTS search for the given agent

Parameters:
  • agent_id – agent to plan for

  • goal – end goal of the vehicle

  • frame – current (observed) state of the environment

  • meta – metadata of agents present in frame

  • predictions – dictionary of goal predictions for agents in frame

  • debug – Whether to plot rollouts.

Returns:

a list of macro actions encoding the optimal plan for the ego agent given the current goal predictions for other agents

static to_key(plan: List[MCTSAction] | None = None) Tuple[str, ...][source]

Convert a list of MCTS actions to an MCTS key.

igp2.planning.mctsaction module

class igp2.planning.mctsaction.MCTSAction(macro_action_type: ABCMeta, ma_args: dict)[source]

Bases: object

Wrapper class for handling actions in MCTS.

igp2.planning.node module

class igp2.planning.node.Node(key: Tuple, state: Dict[int, AgentState], actions: List[MCTSAction])[source]

Bases: object

Represents a search node in the MCTS tree. Stores all relevant information for computation of Q-values and action selection. Keys must be hashable.

During search, a Node must be expanded before it can be added to a Tree. Children of the node are stored in a dictionary with the key being the state and the value the child node itself.

property action_visits: ndarray

Return number of time each action has been selected in this node.

property actions: List[MCTSAction]

Return possible actions in state of node.

property actions_names: List[str]

Return the human-readable names of actions in the node.

add_child(child: Node)[source]

Add a new child to the dictionary of children.

add_reward_result(key: Tuple[str], reward_results: Reward)[source]

Add a new reward outcome to the node if the search has ended here.

property children: Dict[Tuple, Node]

Return the dictionary of children.

property descendants

Return all descendants of this node.

expand()[source]
property is_leaf: bool

Return true if the node has no children.

property key: Tuple

Unique hashable key identifying the node and the sequence of actions that lead to it.

property q_values: ndarray

Return the Q-values corresponding to each action.

property reward_results: Dict[str, List[Reward]]

Returns a dictionary of reward outcomes where the keys are all possible actions in the node.

property run_result: RunResult

Return a list of the simulated runs results for this node.

property state: Dict[int, AgentState]

Return the state corresponding to this node.

property state_visits: int

Return number of time this state has been selected.

store_q_values()[source]

Save the current q_values into the last element of run_results.

igp2.planning.policy module

class igp2.planning.policy.MaxPolicy[source]

Bases: Policy

Policy selecting the action with highest Q-value at a node.

select(node: Node)[source]

Select an action from the node’s list of actions using its Q-values.

Returns:

the action and its index in the list of actions of the node

class igp2.planning.policy.Policy[source]

Bases: ABC

Abstract class for implementing various selection policies

select(node: Node) Tuple[Any, int][source]

Select an action from the node’s list of actions using its Q-values.

Returns:

the action and its index in the list of actions of the node

class igp2.planning.policy.UCB1(c: float = 1.4142135623730951)[source]

Bases: Policy

Policy implementing the UCB1 selection policy. Ref: https://en.wikipedia.org/wiki/Monte_Carlo_tree_search#Exploration_and_exploitation

select(node: Node)[source]

Select an action from the node’s list of actions using its Q-values.

Returns:

the action and its index in the list of actions of the node

igp2.planning.reward module

class igp2.planning.reward.Reward(time_discount: float = 0.99, factors: Dict[str, float] | None = None, default_rewards: Dict[str, float] | None = None)[source]

Bases: object

Used to calculate rollout rewards in MCTS.

property cost_components: Dict[str, float]
property default_rewards: Dict[str, float]

Default rewards for rollout events e.g. collision, termination, and death.

property factors: Dict[str, float]

Reward component factors.

reset()[source]

Reset rewards to initialisation values.

property reward: float

The last calculate reward.

property reward_components: Dict[str, float]

The current reward components

property time_discount: float

Discounting factor for time-to-goal reward component.

trajectory_reward(trajectory: StateTrajectory, goal: Goal) Dict[str, float][source]

Calculate reward components for a given trajectory.

igp2.planning.rollout module

class igp2.planning.rollout.Rollout(ego_id: int, initial_frame: Dict[int, AgentState], metadata: Dict[int, AgentMetadata], scenario_map: Map, fps: int = 10, open_loop_agents: bool = False, trajectory_agents: bool = False, t_max: int = 300)[source]

Bases: object

Lightweight environment simulator useful for rolling out scenarios in MCTS.

One agent is designated as the ego vehicle, while the other agents follow predefined trajectories calculated during goal recognition. Simulation is performed at a given frequency with collision checking.

property agents: Dict[int, Agent]

Return current agents of the environment

property ego_id: int

ID of the ego vehicle

property fps: int

Executing frame rate of the simulator

property initial_frame: Dict[int, AgentState]

Return the initial state of the environment

property metadata: Dict[int, AgentMetadata]

Metadata of agents in the current frame

plot(t: int, axis: Axes | None = None) Axes[source]

Plot the current agents and the road layout for visualisation purposes.

Parameters:

axis – Axis to draw on

reset()[source]

Reset the internal states of the environment to initialisation defaults.

run(start_frame: Dict[int, AgentState], plot_rollout: bool = False) Tuple[StateTrajectory, Dict[int, AgentState], bool, bool, List[Agent]][source]

Execute current macro action of ego and forward the state of the environment with collision checking.

Returns:

A 5-tuple giving the new state of the environment, the final frame of the simulation, whether the ego has reached its goal, whether the ego is still alive, and if it has collided with another (possible multiple) agents and if so the colliding agents.

update_ego_action(action: MacroAction, args: Dict, frame: Dict[int, AgentState]) MacroAction[source]

Update the current macro action of the ego vehicle.

Parameters:
  • action – new macro action to execute

  • args – MA initialisation arguments

  • frame – Current state of the environment

Returns:

The currently execute MA of the ego

update_ego_goal(goal: Goal)[source]

Update the final goal of the ego vehicle.

Parameters:

goal – new goal to reach

update_trajectory(agent_id: int, new_trajectory: Trajectory, new_plan: List[MacroAction])[source]

Update the predicted trajectory of the non-ego agent. Has no effect for ego or if agent_id not in agents

Parameters:
  • agent_id – ID of agent to update

  • new_trajectory – new trajectory for agent

  • new_plan – The macro action plan that generated new_trajectory

igp2.planning.tree module

class igp2.planning.tree.Tree(root: Node, action_policy: Policy | None = None, plan_policy: Policy | None = None, predictions: Dict[int, GoalsProbabilities] | None = None)[source]

Bases: object

Defines an abstract tree consisting of nodes.

A Tree is represented as a dictionary where each key corresponds to a key of the node and the values are the corresponding node object. A node is a leaf if it has no children and keys correspond to the action selection history that led to the node.

add_child(parent: Node, child: Node)[source]

Add a new child to the tree and assign it under an existing parent node.

backprop(r: float, final_key: Tuple)[source]

Back-propagate the reward through the search branches.

Parameters:
  • r – reward at end of simulation

  • final_key – final node key including final action

property max_depth: int

The maximal depth of the search tree.

on_finish()[source]

Function called when MCTS finishes execution.

property predictions: Dict[int, GoalsProbabilities]

Predictions associated with this tree.

print(node: Node | None = None)[source]

Print the nodes of the tree recursively starting from the given node.

property root: Node

Return the root node of the tree.

select_action(node: Node) MCTSAction[source]

Select one of the actions in the node using the specified policy and update node statistics

select_plan() List[source]

Return the best sequence of actions from the root according to the specified policy.

set_samples(samples: Dict[int, Tuple[GoalWithType, VelocityTrajectory]])[source]

Overwrite the currently stored samples in the tree.

property tree: Dict

The dictionary representing the tree itself.

Module contents