import random
from copy import copy
from operator import itemgetter
from typing import List, Dict, Tuple
import matplotlib.pyplot as plt
from igp2.core.goal import Goal
from igp2.opendrive.map import Map
from igp2.opendrive.plot_map import plot_map
from igp2.core.trajectory import VelocityTrajectory
from igp2.core.cost import Cost
from igp2.planlibrary.macro_action import MacroAction
[docs]class GoalWithType:
"""Tuple of a Goal object and a goal_type string defining an action the vehicle has to perform to
reach the goal (e.g. turn left). In the current implementation, goal_types are not defined."""
def __init__(self):
print("never called in this case")
def __new__(cls, goal: Goal = None, goal_type: str = None):
return goal, goal_type
[docs]class GoalsProbabilities:
"""Class used to store and update goal probabilities, as well as store useful results such as the priors,
likelihoods, generated trajectories and rewards """
def __init__(self, goals: List[Goal] = None, goal_types: List[List[str]] = None, priors: List[float] = None):
"""Creates a new GoalsProbabilities object.
Args:
goals: a list of goals objects representing the scenarios goals.
goal_types: optionally, refine the goals with goal types.
priors: optionally, a list of goal priors measured from the dataset.
If unused, the priors will be set to a uniform distribution.
"""
self._goals_and_types = []
if goal_types is None:
for goal in goals:
goal_and_type = GoalWithType(goal, None)
self._goals_and_types.append(goal_and_type)
else:
for goal, goal_type_arr in zip(goals, goal_types):
for goal_type in goal_type_arr:
goal_and_type = GoalWithType(goal, goal_type)
self._goals_and_types.append(goal_and_type)
if priors is None:
self._goals_priors = dict.fromkeys(self._goals_and_types, self.uniform_distribution())
else:
self._goals_priors = dict(zip(self._goals_and_types, priors))
# Actual normalised goal and trajectories probabilities
self._goals_probabilities = copy(self._goals_priors)
self._trajectories_probabilities = dict.fromkeys(self._goals_and_types, [])
# To store trajectory data
self._optimum_trajectory = dict.fromkeys(self._goals_and_types, None)
self._current_trajectory = copy(self._optimum_trajectory)
self._all_trajectories = {key: [] for key in self._goals_and_types}
# To store the plans that generated the trajectories
self._optimum_plan = dict.fromkeys(self._goals_and_types, None)
self._all_plans = {key: [] for key in self._goals_and_types}
# Reward data
self._optimum_reward = copy(self._optimum_trajectory)
self._current_reward = copy(self._optimum_trajectory)
self._all_rewards = {key: [] for key in self._goals_and_types}
# Reward difference data
self._reward_difference = copy(self._optimum_trajectory)
self._all_reward_differences = {key: [] for key in self._goals_and_types}
# The goal likelihoods
self._likelihood = copy(self._optimum_trajectory)
[docs] def sample_goals(self, k: int = 1) -> List[GoalWithType]:
"""Used to randomly sample a goal according to the goals probability distribution."""
goals = list(self.goals_probabilities.keys())
weights = self.goals_probabilities.values()
return random.choices(goals, weights=weights, k=k)
[docs] def sample_trajectories_to_goal(self, goal: GoalWithType, k: int = 1) \
-> Tuple[List[VelocityTrajectory], List[List[MacroAction]]]:
""" Randomly sample up to k trajectories from all_trajectories to the given goal
using the trajectory distributions"""
assert goal in self.trajectories_probabilities, f"Goal {goal} not in trajectories_probabilities!"
assert goal in self.all_trajectories, f"Goal {goal} not in all_trajectories!"
trajectories = self._all_trajectories[goal]
if trajectories:
weights = self._trajectories_probabilities[goal]
trajectories = random.choices(trajectories, weights=weights, k=k)
plans = [self.trajectory_to_plan(goal, traj) for traj in trajectories]
return trajectories, plans
[docs] def trajectory_to_plan(self, goal: GoalWithType, trajectory: VelocityTrajectory) -> List[MacroAction]:
""" Return the plan that generated the trajectory. Not used for optimal trajectories. """
idx = self.all_trajectories[goal].index(trajectory)
return self.all_plans[goal][idx]
[docs] def map_prediction(self) -> Tuple[GoalWithType, VelocityTrajectory]:
""" Return the MAP goal and trajectory prediction for each agent. """
goal = max(self.goals_probabilities, key=self.goals_probabilities.get)
trajectory, p_trajectory = \
max(zip(self.all_trajectories[goal],
self.trajectories_probabilities[goal]),
key=itemgetter(1))
return goal, trajectory
[docs] def add_smoothing(self, alpha: float = 1., uniform_goals: bool = False):
""" Perform add-alpha smoothing on the probability distribution in place.
Args:
alpha: Additive factor for smoothing.
uniform_goals: Whether to normalise goal probabilities to uniform distribution,
"""
n_reachable = sum(map(lambda x: len(x) > 0, self.trajectories_probabilities.values()))
for goal, trajectory_prob in self.trajectories_probabilities.items():
trajectory_len = len(trajectory_prob)
if trajectory_len > 0:
self.goals_probabilities[goal] = 1 / n_reachable
self.trajectories_probabilities[goal] = \
[(prob + alpha) / (1 + trajectory_len * alpha) for prob in trajectory_prob]
[docs] def plot(self,
scenario_map: Map = None,
max_n_trajectories: int = 1,
cost: Cost = None) -> plt.Axes:
""" Plot the optimal, and predicted trajectories.
Args:
scenario_map: Optional road layout to be plotted as background.
max_n_trajectories: The maximum number of trajectories to plot for each goal if they exist.
cost: If given, re-calculate cost factors for plotting
"""
def plot_trajectory(traj, ax_, cmap, goal_, title=""):
plot_map(scenario_map, markings=True, ax=ax_)
path, vel = traj.path, traj.velocity
ax_.scatter(path[:, 0], path[:, 1], c=vel, cmap=cmap, vmin=-4, vmax=20, s=8)
if cost is not None:
cost.trajectory_cost(traj, goal_)
plt.rc('axes', titlesize=8)
t = str(cost.cost_components)
t = t[:len(t) // 2] + "\n" + t[len(t) // 2:]
ax_.set_title(t)
else:
ax_.set_title(title)
color_map_optimal = plt.cm.get_cmap('Reds')
color_map = plt.cm.get_cmap('Blues')
valid_goals = [g for g, ts in self._all_trajectories.items() if len(ts) > 0]
fig, axes = plt.subplots(len(valid_goals), 2, figsize=(12, 9))
for gid, goal in enumerate(valid_goals):
plot_trajectory(self._optimum_trajectory[goal], axes[gid, 0], color_map_optimal, goal[0])
for tid, trajectory in enumerate(self._all_trajectories[goal][:max_n_trajectories], 1):
plot_trajectory(trajectory, axes[gid, 1], color_map, goal[0])
return axes
@property
def goals_probabilities(self) -> Dict[GoalWithType, float]:
"""Returns the current goals probabilities."""
return self._goals_probabilities
@property
def goals_priors(self) -> Dict[GoalWithType, float]:
"""Return the goals priors."""
return self._goals_priors
@property
def trajectories_probabilities(self) -> Dict[GoalWithType, List[float]]:
""" Return the trajectories probability distribution to each goal"""
return self._trajectories_probabilities
@property
def optimum_trajectory(self) -> Dict[GoalWithType, VelocityTrajectory]:
"""Returns the trajectory from initial vehicle position generated to each goal to calculate the likelihood."""
return self._optimum_trajectory
@property
def optimum_plan(self) -> Dict[GoalWithType, List[MacroAction]]:
""" Returns the plan from initial vehicle position generated to each goal."""
return self._optimum_plan
@property
def current_trajectory(self) -> Dict[GoalWithType, VelocityTrajectory]:
"""Returns the real vehicle trajectory, extended by the trajectory
from current vehicle position that was generated to each goal to calculate the likelihood."""
return self._current_trajectory
@property
def all_trajectories(self) -> Dict[GoalWithType, List[VelocityTrajectory]]:
""" Returns the real vehicle trajectory, extended by all possible generated paths to a given goal."""
return self._all_trajectories
@property
def all_plans(self) -> Dict[GoalWithType, List[List[MacroAction]]]:
""" Returns all plans from the most recent vehicle position generated to each goal."""
return self._all_plans
@property
def optimum_reward(self) -> Dict[GoalWithType, float]:
"""Returns the reward generated by the optimum_trajectory for each goal"""
return self._optimum_reward
@property
def current_reward(self) -> Dict[GoalWithType, float]:
"""Returns the reward generated by the current_trajectory for each goal"""
return self._current_reward
@property
def all_rewards(self) -> Dict[GoalWithType, List[float]]:
"""Returns a list of rewards generated by all_trajectories for each goal"""
return self._all_rewards
@property
def reward_difference(self) -> Dict[GoalWithType, float]:
"""Returns the reward generated by the optimum_trajectory for each goal,
if we are not using the reward_as_difference toggle."""
return self._reward_difference
@property
def all_reward_differences(self) -> Dict[GoalWithType, List[float]]:
"""Returns the rewards generated by all_trajectories for each goal,
if we are using the reward_as_difference toggle."""
return self._all_reward_differences
@property
def likelihood(self) -> Dict[GoalWithType, float]:
"""Returns the computed likelihoods for each goal"""
return self._likelihood
@property
def goals_and_types(self) -> List[GoalWithType]:
""" Return each goal and the possible corresponding type """
return self._goals_and_types