Source code for igp2.core.results

from dataclasses import dataclass
import logging
import numpy as np
from typing import List, Dict, Tuple
from matplotlib import pyplot as plt
from gui.tracks_import import calculate_rotated_bboxes

from igp2.recognition.goalprobabilities import GoalWithType, GoalsProbabilities
from igp2.opendrive.map import Map
from igp2.opendrive.plot_map import plot_map
from igp2.core.trajectory import Trajectory, VelocityTrajectory
from igp2.core.agentstate import AgentState
from igp2.core.util import list_startswith


logger = logging.getLogger(__name__)


[docs]class AgentResult: """This class will store GoalsProbabilities objects containing goal prediction results belonging to a specific agent""" def __init__(self, true_goal: int, datum: Tuple[int, GoalsProbabilities, float, np.ndarray] = None): """Initialises the class, specifying the index of associated to the true goal and optionally adding the first data point, in the form of the tuple (frame_id, GoalsProbabilities object, inference time, current position)""" if datum is not None: self.data = [datum] else: self.data = [] self.true_goal = true_goal
[docs] def add_data(self, datum: Tuple[int, GoalsProbabilities, float, np.ndarray]): """Adds a new data point, in the form of the tuple (frame_id, GoalsProbabilities object, inference time, current position)""" self.data.append(datum)
@property def true_goal_probability(self) -> np.ndarray: """Returns the probabilities of the true goal for each data point.""" arr = [] for datum in self.data: true_goal_probability = list(datum[1].goals_probabilities.values())[self.true_goal] arr.append(true_goal_probability) return np.array(arr) @property def goal_accuracy(self) -> np.ndarray: """Returns True if the true goal is the most likely, false otherwise, for each data point.""" arr = [] for datum in self.data: goal_probs = np.nan_to_num(list(datum[1].goals_probabilities.values()), posinf=0., neginf=0.) goal_accuracy = (goal_probs[self.true_goal] == goal_probs.max() and np.count_nonzero( goal_probs == goal_probs.max()) == 1) arr.append(goal_accuracy) return np.array(arr) @property def zero_probability(self) -> np.ndarray: """Returns true if the true goal has a zero probability (considered unfeasible), false otherwise, for each data point.""" arr = [] for datum in self.data: goal_probs = np.nan_to_num(list(datum[1].goals_probabilities.values()), posinf=0., neginf=0.) arr.append(goal_probs[self.true_goal] == 0) return np.array(arr) @property def reward_difference(self) -> np.ndarray: """Returns reward difference associated with the true goal, for each data point.""" # check if reward difference data is available arr = [] for datum in self.data: reward_difference = list(datum[1].reward_difference.values())[self.true_goal] if reward_difference is None: arr.append(np.NaN) else: arr.append(reward_difference) # if data is unavailable, try to compute manually if np.isnan(arr).all(): arr = [] for datum in self.data: optimum_reward = list(datum[1].optimum_reward.values())[self.true_goal] current_reward = list(datum[1].current_reward.values())[self.true_goal] if current_reward is None or optimum_reward is None: arr.append(np.NaN) else: arr.append(current_reward - optimum_reward) return np.array(arr) @property def inference_time(self) -> float: """Returns inference time for each data point.""" arr = np.array([datum[2] for datum in self.data]) return arr.mean() @property def position(self) -> np.ndarray: """Returns agent current position for each data point.""" return np.array([datum[3] for datum in self.data])
[docs]class EpisodeResult: """This class stores result for an entire episode, where each data point contains an AgentResult object""" def __init__(self, metadata: "EpisodeMetadata", id: int, cost_factors: Dict[str, float], datum: Tuple[int, AgentResult] = None): """Initialises the class, storing the episode metadata, cost factors and optionally add a data point in the form of the tuple (agent_id, AgentResult object)""" self.cost_factors = cost_factors self.metadata = metadata self.id = id if datum is not None: self.data = [datum] else: self.data = []
[docs] def add_data(self, datum: Tuple[int, AgentResult]): """Adds a new data point, in the form of the tuple (agent_id, AgentResult object)""" self.data.append(datum)
@property def true_goal_probability(self) -> np.ndarray: """The mean true goal probability across the episode.""" arr = np.array([datum[1].true_goal_probability for datum in self.data]) arr = np.nan_to_num(arr, posinf=0., neginf=0.) return np.mean(arr, axis=0) @property def true_goal_ste(self) -> np.ndarray: """The standard error of the true goal probability across the episode.""" arr = np.array([datum[1].true_goal_probability for datum in self.data]) arr = np.nan_to_num(arr, posinf=0., neginf=0.) return np.std(arr, axis=0) / np.sqrt(len(self.data)) @property def goal_accuracy(self) -> np.ndarray: """The accuracy across the episode, defined as the fraction of how many times the true goal was most likely, over the total number of data points.""" arr = np.array([datum[1].goal_accuracy for datum in self.data]) return np.mean(arr, axis=0) @property def goal_accuracy_ste(self) -> np.ndarray: """The standard error of the goal accuracy.""" arr = np.array([datum[1].goal_accuracy for datum in self.data]) return np.std(arr, axis=0) / np.sqrt(len(self.data)) @property def zero_probability(self) -> np.ndarray: """The fraction of times the true goal was considered unfeasible, over the total number of data points.""" arr = np.array([datum[1].zero_probability for datum in self.data]) return np.mean(arr, axis=0) @property def reward_difference(self) -> np.ndarray: """The average reward difference across the episode.""" arr = np.array([datum[1].reward_difference for datum in self.data]) return np.nanmean(arr, axis=0) @property def reward_difference_std(self) -> np.ndarray: """The standard deviation associated with the reward difference.""" arr = np.array([datum[1].reward_difference for datum in self.data]) return np.nanstd(arr, axis=0) @property def reward_difference_median(self) -> np.ndarray: """The median reward difference.""" arr = np.array([datum[1].reward_difference for datum in self.data]) return np.nanmedian(arr, axis=0) @property def inference_time(self) -> float: """The average inference time accross the episode.""" arr = np.array([datum[1].inference_time for datum in self.data]) return arr.mean()
[docs]class ExperimentResult: """This class stores results for an entire experiment ran across multiple scenarios, each data point contains an EpisodeResult object""" def __init__(self, datum: Tuple[int, EpisodeResult] = None): """Initialises class and optionally adds a data point, in the form of the tuple (recording_id, EpisodeResult object) """ if datum is not None: self.data = [datum] else: self.data = []
[docs] def add_data(self, datum: Tuple[int, EpisodeResult]): """Adds a data point, in the form of the tuple (recording_id, EpisodeResult object)""" self.data.append(datum)
@property def true_goal_probability(self) -> np.ndarray: """Calculates the average true goal probability across all agents evaluated in the experiment.""" total_agents = 0 arr = np.zeros(len(self.data[0][1].true_goal_probability)) for datum in self.data: num_agents = len(datum[1].data) total_agents += num_agents arr += datum[1].true_goal_probability * num_agents return arr / total_agents @property def goal_accuracy(self) -> np.ndarray: """Calculates the average goal accuracy across all agents evaluated in the experiment.""" total_agents = 0 arr = np.zeros(len(self.data[0][1].goal_accuracy)) for datum in self.data: num_agents = len(datum[1].data) total_agents += num_agents arr += datum[1].goal_accuracy * num_agents return arr / total_agents @property def zero_probability(self) -> np.ndarray: """Calculates the average zero true goal probability across all agents evaluated in the experiment.""" total_agents = 0 arr = np.zeros(len(self.data[0][1].zero_probability)) for datum in self.data: num_agents = len(datum[1].data) total_agents += num_agents arr += datum[1].zero_probability * num_agents return arr / total_agents @property def reward_difference(self) -> np.ndarray: """Calculates the average reward difference across all agents evaluated in the experiment.""" total_agents = 0 arr = np.zeros(len(self.data[0][1].reward_difference)) for datum in self.data: num_agents = len(datum[1].data) total_agents += num_agents arr += datum[1].reward_difference * num_agents return arr / total_agents @property def inference_time(self) -> float: """Calculates the average inference_time across all agents evaluated in the experiment.""" total_agents = 0 t = 0. for datum in self.data: num_agents = len(datum[1].data) total_agents += num_agents t += datum[1].inference_time * num_agents return t / total_agents @property def inference_time_ste(self) -> float: """Calculates the standard error of the inference time across all agents evaluated in the experiment.""" arr = [] for ep_datum in self.data: for agent_datum in ep_datum[1].data: for frame_datum in agent_datum[1].data: arr.append(frame_datum[2]) arr = np.array(arr) return np.std(arr) / np.sqrt(len(arr))
[docs]@dataclass class RunResult: """ Class storing results of the simulated rollout in MCTS. """ agents: Dict[int, "Agent"] ego_id: int ego_trajectory: Trajectory collided_agents_ids: List[int] goal_reached: bool selected_action: "MCTSAction" @property def ego_maneuvers(self) -> List[str]: maneuvers = self.agents[self.ego_id].current_macro.maneuvers return [man.__repr__() for man in maneuvers] @property def ego_maneuvers_trajectories(self) -> List[VelocityTrajectory]: """This returns the generated open loop trajectories for each maneuver.""" maneuvers = self.agents[self.ego_id].current_macro.maneuvers return [man.trajectory for man in maneuvers]
[docs] def plot(self, t: int, scenario_map: Map, axis: plt.Axes = None) -> plt.Axes: """ Plot the current agents and the road layout at timestep t for visualisation purposes. Plots the OPEN LOOP trajectories that the agents will attempt to follow, alongside a colormap representing velocities. Args: t: timestep at which the plot should be generated scenario_map: The road layout axis: Axis to draw on """ if axis is None: fig, axis = plt.subplots() color_map_ego = plt.cm.get_cmap('Reds') color_map_non_ego = plt.cm.get_cmap('Blues') color_ego = 'r' color_non_ego = 'b' color_bar_non_ego = None plot_map(scenario_map, markings=True, ax=axis) for agent_id, agent in self.agents.items(): if hasattr(agent, "current_macro"): color = color_ego color_map = color_map_ego path = [] velocity = [] for man in agent.current_macro.maneuvers: path.extend(man.trajectory.path) velocity.extend(man.trajectory.velocity) path = np.array(path) velocity = np.array(velocity) elif hasattr(agent, "trajectory"): color = color_non_ego color_map = color_map_non_ego path = agent.trajectory.path velocity = agent.trajectory.velocity vehicle = agent.vehicle bounding_box = calculate_rotated_bboxes(agent.trajectory_cl.path[t][0], agent.trajectory_cl.path[t][1], vehicle.length, vehicle.width, agent.trajectory_cl.heading[t]) pol = plt.Polygon(bounding_box[0], color=color) axis.add_patch(pol) agent_plot = axis.scatter(path[:, 0], path[:, 1], c=velocity, cmap=color_map, vmin=-4, vmax=20, s=8) if hasattr(agent, "current_macro"): plt.colorbar(agent_plot) plt.text(0, 0.1, 'Current Macro Action: ' + self.ego_macro_action, horizontalalignment='left', verticalalignment='bottom', transform=axis.transAxes) plt.text(0, 0.05, 'Maneuvers: ' + str(self.ego_maneuvers), horizontalalignment='left', verticalalignment='bottom', transform=axis.transAxes) if len(self.ego_maneuvers) > 1: maneuver_end_idx = agent.maneuver_end_idx if len(maneuver_end_idx) < len(self.ego_maneuvers): maneuver_end_idx.append(len(agent.trajectory_cl.states)) current_maneuver_id = np.min(np.nonzero(np.array(maneuver_end_idx) > t)) plt.text(0, 0.0, 'Current Maneuver: ' + self.ego_maneuvers[current_maneuver_id], horizontalalignment='left', verticalalignment='bottom', transform=axis.transAxes) elif hasattr(agent, "trajectory") and color_bar_non_ego is None: color_bar_non_ego = plt.colorbar(agent_plot) plt.text(*agent.trajectory_cl.path[t], agent_id) return axis
[docs]class MCTSResultTemplate: pass
[docs]class MCTSResult(MCTSResultTemplate): """ Store results for a single MCTS rollout. """ def __init__(self, tree: "Tree" = None, samples: dict = None, trace: tuple = None): self.__tree = tree self.__samples = samples self.__trace = trace def __repr__(self) -> str: return str(self.__trace) @property def samples(self) -> Dict[int, Tuple[GoalWithType, VelocityTrajectory]]: """ Dictionary mapping agent IDs to their corresponding goal and trajectory sample in this rollout. """ return self.__samples @property def tree(self) -> "Tree": """ The MCTS search tree at the completion of the rollout. """ return self.__tree @tree.setter def tree(self, value): self.__tree = value @property def trace(self) -> Tuple[str, ...]: """ The final trace that was chosen for the ego in this rollout. """ return self.__trace @property def leaf(self) -> "Node": """ Return the leaf node of this rollout. """ return self.tree[self.trace[:-1]]
[docs]class AllMCTSResult(MCTSResultTemplate): """ Class to store results for all rollouts of an MCTS planning run. """ def __init__(self, mcts_result: MCTSResult = None): if mcts_result is None: self.mcts_results = [] else: self.mcts_results = [mcts_result] self.final_plan = None def __getitem__(self, item) -> MCTSResult: return self.mcts_results[item]
[docs] def add_data(self, mcts_result: MCTSResult): """ Add a new rollout to the results collection. """ if mcts_result.trace is None or mcts_result.tree is None or mcts_result.samples is None: logger.warning(f"Trying to save MCTSResult with missing fields.") self.mcts_results.append(mcts_result)
[docs] def plot_q_values(self, key: Tuple, axis: plt.Axes = None) -> plt.Axes: if axis is None: fig, axis = plt.subplots() node = self.mcts_results[-1].tree[key] all_q = np.empty((len(self.mcts_results), len(node.actions)), float) for i, rollout in enumerate(self.mcts_results): all_q[i, :] = rollout.tree[key].q_values for i in range(0, len(node.actions)): label = "Macro Action: " + node.actions_names[i] plt.plot(all_q[:, i], label=label) axis.set(ylabel="Q Value", xlabel="Run number") axis.legend() return axis
@property def optimal_rollouts(self) -> List[MCTSResult]: """ Return a list of results that match the final plan of the ego. """ opt_trace = self.optimal_trace return [rollout for rollout in self.mcts_results if list_startswith(rollout.trace, opt_trace)] @property def optimal_trace(self) -> tuple: """ A tuple of strings that gives the trace of the final selected optimal plan. """ return ("Root", ) + tuple([action.__repr__() for action in self.final_plan])
[docs]class PlanningResult: def __init__(self, scenario_map: Map, mcts_result: MCTSResultTemplate = None, timestep: float = None, frame: Dict[int, AgentState] = None, goals_probabilities: GoalsProbabilities = None): self.scenario_map = scenario_map if mcts_result is None: self.results = [] else: self.results = [mcts_result] if timestep is None: self.timesteps = [] else: self.timesteps = [timestep] if frame is None: self.frames = [] else: self.frames = [frame] if goals_probabilities is None: self.goal_probabilities = [] else: self.goal_probabilities = [goals_probabilities]
[docs] def add_data(self, mcts_result: MCTSResultTemplate = None, timestep: float = None, frame: Dict[ int, AgentState] = None, goals_probabilities: GoalsProbabilities = None): self.results.append(mcts_result) self.timesteps.append(timestep) self.frames.append(frame) self.goal_probabilities.append(goals_probabilities)