import json
import abc
import logging
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict
from import EpisodeConfig, EpisodeLoader, Episode
from import Map
logger = logging.getLogger(__name__)
class ScenarioConfig:
"""Metadata about a scenario used for goal recognition"""
def __init__(self, config_dict):
self.config_dict = config_dict
def load(cls, file_path):
"""Loads the scenario metadata into from a json file
file_path (str): path to the file to load
ScenarioConfig: metadata about the scenario
with open(file_path) as f:
scenario_meta_dict = json.load(f)
return cls(scenario_meta_dict)
def goals(self) -> List[Tuple[int, int]]:
"""Possible goals for agents in this scenario"""
return self.config_dict.get('goals')
def goals_priors(self) -> List[float]:
"""Priors for goals in this scenario"""
return self.config_dict.get('goals_priors')
def name(self) -> str:
"""Name of the scenario"""
return self.config_dict.get('name')
def goal_types(self) -> List[List[str]]:
""" Possible goals for agents in this scenario"""
return self.config_dict.get('goal_types')
def opendrive_file(self) -> str:
""" Path to the *.xodr file specifying the OpenDrive map"""
return self.config_dict.get('opendrive_file')
def lat_origin(self) -> float:
"""Latitude of the origin"""
return self.config_dict.get('lat_origin')
def lon_origin(self) -> float:
""" Longitude of the origin"""
return self.config_dict.get('lon_origin')
def data_format(self) -> str:
"""Format in which the data is stored"""
return self.config_dict.get('data_format')
def data_root(self) -> str:
""" Path to directory in which the data is stored"""
return self.config_dict.get('data_root')
def episodes(self) -> List[EpisodeConfig]:
"""list of dict: Configuration for all episodes for this scenario"""
return [EpisodeConfig(c) for c in self.config_dict.get('episodes')]
def background_image(self) -> str:
"""Path to background image"""
return self.config_dict.get('background_image')
def background_px_to_meter(self) -> float:
""" Pixels per meter in background image"""
return self.config_dict.get('background_px_to_meter')
def scale_down_factor(self) -> int:
""" Scale down factor for visualisation"""
return self.config_dict.get('scale_down_factor')
def check_lanes(self) -> bool:
""" True if Lane data should be checked when loading frames for agents"""
return self.config_dict.get("check_lanes", False)
def check_oncoming(self) -> bool:
""" True if ChangeLane macro action should check for other agents in the lane before switching."""
return self.config_dict.get("check_oncoming", True)
def reachable_pairs(self) -> List[List[List[float]]]:
""" Pairs of points, where the second point should be reachable from the first
Can be used for validating maps"""
return self.config_dict.get('reachable_pairs')
def dataset_split(self) -> Dict[str, List[int]]:
""" Get the which data split each episode belongs to """
return self.config_dict.get('dataset_split', None)
def agent_types(self) -> List[str]:
""" Gets which types of agents to keep from the data set """
return self.config_dict.get("agent_types", None)
def goal_threshold(self) -> float:
""" Threshold for checking goal completion of agents' trajectories """
return self.config_dict.get("goal_threshold", None)
def scaling_factor(self) -> float:
""" Constant factor to account for mismatch in the scale of the recordings and the size of the map """
return self.config_dict.get("scaling_factor", None)
def target_switch_length(self) -> float:
"""Target length for lane switch maneuver."""
return self.config_dict.get("target_switch_length")
def cost_factors(self) -> Dict[str, float]:
"""Default cost weights."""
return self.config_dict.get("cost_factors")
def buildings(self) -> List[List[List[float]]]:
"""Return the vertices of the buildings in the map."""
return self.config_dict.get("buildings")
class Scenario(abc.ABC):
""" Represents an arbitrary driving scenario with interactions broken to episodes. """
def __init__(self, config: ScenarioConfig):
""" Initialize new Scenario based on the given ScenarioConfig and read map data from config. """
self.config = config
self._episodes = None
self._opendrive_map = None
self._loader = EpisodeLoader.get_loader(self.config)
def load_map(self):
if self.config.opendrive_file:
self._opendrive_map = Map.parse_from_opendrive(self.config.opendrive_file)
raise ValueError(f"OpenDrive map was not specified!")
def opendrive_map(self) -> Map:
""" Return the OpenDrive Map of the Scenario. """
return self._opendrive_map
def episodes(self) -> List[Episode]:
""" Retrieve a list of loaded Episodes. """
return self._episodes
def loader(self) -> EpisodeLoader:
""" The EpisodeLoader of the Scenario. """
return self._loader
def load(cls, file_path: str, split: List[str] = None):
""" Initialise a new Scenario from the given config file.
file_path: Path to the file defining the scenario
split: The data set splits to load as given by indices. If None, load all.
A new Scenario instance
raise NotImplementedError
def plot_goals(self, axes, scale=1, flipy=False):
# plot goals
goal_locations = self.config.goals
for idx, g in enumerate(goal_locations):
x = g[0] / scale
y = g[1] / scale * (1 - 2 * int(flipy))
circle = plt.Circle((x, y), self.config.goal_threshold / scale, color='r')
label = 'G{}'.format(idx)
axes.annotate(label, (x, y), color='white')
class InDScenario(Scenario):
def load(cls, file_path: str, split: List[str] = None):
config = ScenarioConfig.load(file_path)
scenario = cls(config)
return scenario
def load_episodes(self, split: List[str] = None) -> List[Episode]:
""" Load all/the specified Episodes as given in the ScenarioConfig. Store episodes in field episode """
if split is not None:
indices = []
for s in split:
to_load = [conf for i, conf in enumerate(sorted(self.config.episodes, key=lambda x: x.recording_id))
if i in indices]
to_load = sorted(self.config.episodes, key=lambda x: x.recording_id)"Loading {len(to_load)} episode(s).")
episodes = []
for idx, config in enumerate(to_load):"Loading Episode {idx + 1}/{len(to_load)}")
episode = self._loader.load(config,
self._opendrive_map if self.config.check_lanes else None,
self._episodes = episodes
return episodes
def load_episode(self, episode_id) -> Episode:
""" Load specific Episode with the given ID. Does not append episode to member field episode. """
return self._loader.load(self.config.episodes[episode_id])
def filter_by_goal_completion(self):
""" Filter out all agents which do not arrive at a specified goal """
threshold = self.config.goal_threshold
if threshold is None:
possible_goals = np.array(self.config.goals)
for episode in self.episodes:
dead_agents = set()
for agent_id, agent in episode.agents.items():
if np.allclose(agent.trajectory.path[0], agent.trajectory.path[-1], atol=0.1):
agent.goal_reached = False
for goal in possible_goals:
distances = np.linalg.norm(agent.trajectory.path - goal, axis=1)
if np.any(distances < threshold):
agent.goal_reached = False
for frame in episode.frames:
for agent_id, agent in frame.all_agents.items():
if agent_id in dead_agents: