Source code for igp2.simplesim.simulation

from collections import defaultdict
import logging
import numpy as np
from typing import Dict, List

from igp2.opendrive.map import Map
from igp2.agents.agent import Agent
from igp2.core.vehicle import Action, Observation
from igp2.core.agentstate import AgentState

logger = logging.getLogger(__name__)


[docs] class Simulation: """ A lightweight simulator for IGP2 to perform rapid testing. """ def __init__(self, scenario_map: Map, fps: int = 20, open_loop: bool = False): """ Initialise new simulation. Args: scenario_map: The current road layout. fps: Execution frame-rate. open_loop: If true then no physical controller will be applied. """ self.__scenario_map = scenario_map self.__fps = fps self.__open_loop = open_loop self.__t = 0 self.__state = {} self.__agents = {} self.__actions = defaultdict(list)
[docs] def add_agent(self, new_agent: Agent, rolename: str = None): """ Add a new agent to the simulation. Args: new_agent: Agent to add. rolename: Currently unused. Optional string to describe role of the vehicle. """ if new_agent.agent_id in self.__agents \ and self.__agents[new_agent.agent_id] is not None: raise ValueError(f"Agent with ID {new_agent.agent_id} already exists.") self.__agents[new_agent.agent_id] = new_agent self.__state[new_agent.agent_id] = new_agent.vehicle.get_state(0) logger.debug(f"Added Agent {new_agent.agent_id}")
[docs] def remove_agent(self, agent_id: int): """ Remove an agent from the simulation. Args: agent_id: Agent ID to remove. """ self.__agents[agent_id].alive = False self.__agents[agent_id] = None logger.debug(f"Removed Agent {agent_id}")
[docs] def reset(self): """ Remove all agents and reset internal state of simulation. """ self.__t = 0 self.__agents = {} self.__state = {}
[docs] def step(self) -> bool: """ Advance simulation by one time step. Returns: True if any agent is still alive else False. """ if 0 in self.__state: logger.info(f"Simulation step {self.__t} - " f"Pos: {np.round(self.__state[0].position, 2)} - " f"Vel: {np.round(self.__state[0].speed, 2)} - " f"Mcr: {self.__state[0].macro_action} - " f"Man: {self.__state[0].maneuver}") else: logger.info(f"Simulation step {self.__t}") alive = self.take_actions() self.__t += 1 return alive
[docs] def take_actions(self): new_frame = {} for agent_id, agent in self.__agents.items(): if agent is None: continue observation = self.get_observations(agent_id) if not agent.alive or self.__t > 0 and agent.done(observation): self.remove_agent(agent_id) continue new_state, action = agent.next_state(observation, return_action=True) agent.trajectory_cl.add_state(new_state, reload_path=False) self.__actions[agent_id].append(action) new_frame[agent_id] = new_state on_road = len(self.__scenario_map.roads_at(new_state.position)) > 0 if not on_road: logger.debug(f"Agent {agent_id} went off-road.") collision = any(agent.bounding_box.overlaps(ag.bounding_box) for aid, ag in self.__agents.items() if aid != agent_id and ag is not None) if collision: logger.debug(f"Agent {agent_id} collided with another agent(s)") agent.alive = on_road and not collision self.__state = new_frame return any([agent.alive if agent is not None else False for agent in self.__agents.values()])
[docs] def get_observations(self, agent_id: int = 0): """ Get observations for the given agent. Can be overridden to add occlusions to the environment for example. Args: agent_id: The ID of the agent for which to retrieve observations. """ return Observation(self.__state, self.__scenario_map)
@property def scenario_map(self) -> Map: """ The road layout of the simulation. """ return self.__scenario_map @property def agents(self) -> Dict[int, Agent]: """ Agents in the simulation, mapping agent IDs to agents. """ return self.__agents @property def actions(self) -> Dict[int, List[Action]]: """ List of actions (acceleration and steering) taken by every vehicle. """ return self.__actions @property def t(self) -> int: """ The current time step of the simulation. """ return self.__t @property def state(self) -> Dict[int, AgentState]: """ Current joint state of the simulation. """ return self.__state