import json
import abc
import logging
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict
from igp2.data.episode import EpisodeConfig, EpisodeLoader, Episode
from igp2.opendrive.map import Map
logger = logging.getLogger(__name__)
[docs]
class ScenarioConfig:
"""Metadata about a scenario used for goal recognition"""
def __init__(self, config_dict):
self.config_dict = config_dict
[docs]
@classmethod
def load(cls, file_path):
"""Loads the scenario metadata into from a json file
Args:
file_path (str): path to the file to load
Returns:
ScenarioConfig: metadata about the scenario
"""
with open(file_path) as f:
scenario_meta_dict = json.load(f)
return cls(scenario_meta_dict)
@property
def goals(self) -> List[Tuple[int, int]]:
"""Possible goals for agents in this scenario"""
return self.config_dict.get('goals')
@property
def goals_priors(self) -> List[float]:
"""Priors for goals in this scenario"""
return self.config_dict.get('goals_priors')
@property
def name(self) -> str:
"""Name of the scenario"""
return self.config_dict.get('name')
@property
def goal_types(self) -> List[List[str]]:
""" Possible goals for agents in this scenario"""
return self.config_dict.get('goal_types')
@property
def opendrive_file(self) -> str:
""" Path to the *.xodr file specifying the OpenDrive map"""
return self.config_dict.get('opendrive_file')
@property
def lat_origin(self) -> float:
"""Latitude of the origin"""
return self.config_dict.get('lat_origin')
@property
def lon_origin(self) -> float:
""" Longitude of the origin"""
return self.config_dict.get('lon_origin')
@property
def data_format(self) -> str:
"""Format in which the data is stored"""
return self.config_dict.get('data_format')
@property
def data_root(self) -> str:
""" Path to directory in which the data is stored"""
return self.config_dict.get('data_root')
@property
def episodes(self) -> List[EpisodeConfig]:
"""list of dict: Configuration for all episodes for this scenario"""
return [EpisodeConfig(c) for c in self.config_dict.get('episodes')]
@property
def background_image(self) -> str:
"""Path to background image"""
return self.config_dict.get('background_image')
@property
def background_px_to_meter(self) -> float:
""" Pixels per meter in background image"""
return self.config_dict.get('background_px_to_meter')
@property
def scale_down_factor(self) -> int:
""" Scale down factor for visualisation"""
return self.config_dict.get('scale_down_factor')
@property
def check_lanes(self) -> bool:
""" True if Lane data should be checked when loading frames for agents"""
return self.config_dict.get("check_lanes", False)
@property
def check_oncoming(self) -> bool:
""" True if ChangeLane macro action should check for other agents in the lane before switching."""
return self.config_dict.get("check_oncoming", True)
@property
def reachable_pairs(self) -> List[List[List[float]]]:
""" Pairs of points, where the second point should be reachable from the first
Can be used for validating maps"""
return self.config_dict.get('reachable_pairs')
@property
def dataset_split(self) -> Dict[str, List[int]]:
""" Get the which data split each episode belongs to """
return self.config_dict.get('dataset_split', None)
@property
def agent_types(self) -> List[str]:
""" Gets which types of agents to keep from the data set """
return self.config_dict.get("agent_types", None)
@property
def goal_threshold(self) -> float:
""" Threshold for checking goal completion of agents' trajectories """
return self.config_dict.get("goal_threshold", None)
@property
def scaling_factor(self) -> float:
""" Constant factor to account for mismatch in the scale of the recordings and the size of the map """
return self.config_dict.get("scaling_factor", None)
@property
def target_switch_length(self) -> float:
"""Target length for lane switch maneuver."""
return self.config_dict.get("target_switch_length")
@property
def cost_factors(self) -> Dict[str, float]:
"""Default cost weights."""
return self.config_dict.get("cost_factors")
@property
def buildings(self) -> List[List[List[float]]]:
"""Return the vertices of the buildings in the map."""
return self.config_dict.get("buildings")
[docs]
class Scenario(abc.ABC):
""" Represents an arbitrary driving scenario with interactions broken to episodes. """
def __init__(self, config: ScenarioConfig):
""" Initialize new Scenario based on the given ScenarioConfig and read map data from config. """
self.config = config
self._episodes = None
self._opendrive_map = None
self._loader = EpisodeLoader.get_loader(self.config)
self.load_map()
[docs]
def load_map(self):
if self.config.opendrive_file:
self._opendrive_map = Map.parse_from_opendrive(self.config.opendrive_file)
else:
raise ValueError(f"OpenDrive map was not specified!")
@property
def opendrive_map(self) -> Map:
""" Return the OpenDrive Map of the Scenario. """
return self._opendrive_map
@property
def episodes(self) -> List[Episode]:
""" Retrieve a list of loaded Episodes. """
return self._episodes
@property
def loader(self) -> EpisodeLoader:
""" The EpisodeLoader of the Scenario. """
return self._loader
[docs]
@classmethod
def load(cls, file_path: str, split: List[str] = None):
""" Initialise a new Scenario from the given config file.
Args:
file_path: Path to the file defining the scenario
split: The data set splits to load as given by indices. If None, load all.
Returns:
A new Scenario instance
"""
raise NotImplementedError
[docs]
def plot_goals(self, axes, scale=1, flipy=False):
# plot goals
goal_locations = self.config.goals
for idx, g in enumerate(goal_locations):
x = g[0] / scale
y = g[1] / scale * (1 - 2 * int(flipy))
circle = plt.Circle((x, y), self.config.goal_threshold / scale, color='r')
axes.add_artist(circle)
label = 'G{}'.format(idx)
axes.annotate(label, (x, y), color='white')
[docs]
class InDScenario(Scenario):
[docs]
@classmethod
def load(cls, file_path: str, split: List[str] = None):
config = ScenarioConfig.load(file_path)
scenario = cls(config)
scenario.load_episodes(split)
scenario.filter_by_goal_completion()
return scenario
[docs]
def load_episodes(self, split: List[str] = None) -> List[Episode]:
""" Load all/the specified Episodes as given in the ScenarioConfig. Store episodes in field episode """
if split is not None:
indices = []
for s in split:
indices.extend(self.config.dataset_split[s])
to_load = [conf for i, conf in enumerate(sorted(self.config.episodes, key=lambda x: x.recording_id))
if i in indices]
else:
to_load = sorted(self.config.episodes, key=lambda x: x.recording_id)
logger.info(f"Loading {len(to_load)} episode(s).")
episodes = []
for idx, config in enumerate(to_load):
logger.info(f"Loading Episode {idx + 1}/{len(to_load)}")
episode = self._loader.load(config,
self._opendrive_map if self.config.check_lanes else None,
agent_types=self.config.agent_types,
scale=self.config.scaling_factor)
episodes.append(episode)
self._episodes = episodes
return episodes
[docs]
def load_episode(self, episode_id) -> Episode:
""" Load specific Episode with the given ID. Does not append episode to member field episode. """
return self._loader.load(self.config.episodes[episode_id])
[docs]
def filter_by_goal_completion(self):
""" Filter out all agents which do not arrive at a specified goal """
threshold = self.config.goal_threshold
if threshold is None:
return
possible_goals = np.array(self.config.goals)
for episode in self.episodes:
dead_agents = set()
for agent_id, agent in episode.agents.items():
if np.allclose(agent.trajectory.path[0], agent.trajectory.path[-1], atol=0.1):
agent.goal_reached = False
dead_agents.add(agent_id)
continue
for goal in possible_goals:
distances = np.linalg.norm(agent.trajectory.path - goal, axis=1)
if np.any(distances < threshold):
break
else:
agent.goal_reached = False
dead_agents.add(agent_id)
for frame in episode.frames:
for agent_id, agent in frame.all_agents.items():
if agent_id in dead_agents:
frame.dead_ids.add(agent_id)