Source code for igp2.simplesim.simulation
from collections import defaultdict
import logging
import numpy as np
from typing import Dict, List
from igp2.opendrive.map import Map
from igp2.agents.agent import Agent
from igp2.core.vehicle import Action, Observation
from igp2.core.agentstate import AgentState
logger = logging.getLogger(__name__)
[docs]
class Simulation:
""" A lightweight simulator for IGP2 to perform rapid testing. """
def __init__(self,
scenario_map: Map,
fps: int = 20,
open_loop: bool = False):
""" Initialise new simulation.
Args:
scenario_map: The current road layout.
fps: Execution frame-rate.
open_loop: If true then no physical controller will be applied.
"""
self.__scenario_map = scenario_map
self.__fps = fps
self.__open_loop = open_loop
self.__t = 0
self.__state = {}
self.__agents = {}
self.__actions = defaultdict(list)
[docs]
def add_agent(self, new_agent: Agent, rolename: str = None, override: bool = False):
""" Add a new agent to the simulation.
Args:
new_agent: Agent to add.
rolename: Currently unused. Optional string to describe role of the vehicle.
override: If true, then allow replacing existing agents with same ID.
"""
verb = "Added"
if new_agent.agent_id in self.__agents \
and self.__agents[new_agent.agent_id] is not None:
if not override:
raise ValueError(f"Agent with ID {new_agent.agent_id} already exists.")
verb = "Overrode"
self.__agents[new_agent.agent_id] = new_agent
self.__state[new_agent.agent_id] = new_agent.vehicle.get_state(0)
logger.debug(f"{verb} Agent {new_agent.agent_id}")
[docs]
def remove_agent(self, agent_id: int):
""" Remove an agent from the simulation.
Args:
agent_id: Agent ID to remove.
"""
self.__agents[agent_id].alive = False
self.__agents[agent_id] = None
self.__state[agent_id] = None
self.get_observations(0) # Get observations for ego, updating things.
logger.debug(f"Removed Agent {agent_id}")
[docs]
def reset(self):
""" Remove all agents and reset internal state of simulation. """
self.__t = 0
self.__agents = {}
self.__state = {}
[docs]
def step(self) -> bool:
""" Advance simulation by one time step.
Returns:
True if any agent is still alive else False.
"""
if 0 in self.__state:
logger.info(f"Simulation step {self.__t} - "
f"Pos: {np.round(self.__state[0].position, 2)} - "
f"Vel: {np.round(self.__state[0].speed, 2)} - "
f"Mcr: {self.__state[0].macro_action} - "
f"Man: {self.__state[0].maneuver}")
else:
logger.info(f"Simulation step {self.__t}")
logger.info(f"N agents: {len(self.__agents)}")
alive, _ = self.take_actions()
return alive
[docs]
def take_actions(self, actions: dict[int, Action] = None) -> bool:
""" Take actions for all agents in the simulation.
Args:
actions: Optional actions to apply to each agent.
"""
new_frame = {}
colliding_agents = defaultdict(list)
for agent_id, agent in self.__agents.items():
if agent is None:
continue
observation = self.get_observations(agent_id)
if not agent.alive:
self.remove_agent(agent_id)
continue
if self.__t > 0 and agent.done(observation):
agent.alive = False
continue
if actions is not None and agent_id in actions:
if isinstance(actions[agent_id], tuple):
action, macro, maneuver = actions[agent_id]
else:
action, macro, maneuver = actions[agent_id], None, None
agent.vehicle.execute_action(action, self.__state[0])
new_state = agent.vehicle.get_state(observation.frame[agent_id].time + 1)
new_state.macro_action = str(macro)
new_state.maneuver = str(maneuver)
if hasattr(agent, "update_observations"):
agent.update_observations(observation)
else:
new_state, action = agent.next_state(observation, return_action=True)
agent.trajectory_cl.add_state(new_state, reload_path=False)
self.__actions[agent_id].append(action)
new_frame[agent_id] = new_state
on_road = len(self.__scenario_map.roads_at(new_state.position)) > 0
if not on_road:
logger.debug(f"Agent {agent_id} went off-road.")
for aid, ag in self.agents.items():
if aid == agent_id or ag is None or not ag.alive:
continue
if ag.vehicle.overlaps(agent.vehicle):
colliding_agents[agent_id].append(ag)
colliding_agents[aid].append(agent)
collision = any(colliding_agents[agent_id])
if collision:
logger.debug(f"Agent {agent_id} collided with agent(s) {colliding_agents[agent_id]}")
for colliding_agent in colliding_agents[agent_id]:
if self.agents[colliding_agent.agent_id] is not None:
self.agents[colliding_agent.agent_id].alive = False
agent.alive = on_road and not collision
self.__state = new_frame
self.__t += 1
return any(agent.alive if agent is not None else False for agent in self.__agents.values()), colliding_agents
[docs]
def get_observations(self, agent_id: int = 0):
""" Get observations for the given agent. Can be overridden to add occlusions to the environment for example.
Args:
agent_id: The ID of the agent for which to retrieve observations.
"""
state = {aid: state for aid, state in self.__state.items() if state is not None}
return Observation(state, self.__scenario_map)
@property
def scenario_map(self) -> Map:
""" The road layout of the simulation. """
return self.__scenario_map
@property
def agents(self) -> Dict[int, Agent]:
""" Agents in the simulation, mapping agent IDs to agents. """
return self.__agents
@property
def actions(self) -> Dict[int, List[Action]]:
""" List of actions (acceleration and steering) taken by every vehicle. """
return self.__actions
@property
def t(self) -> int:
""" The current time step of the simulation. """
return self.__t
@property
def state(self) -> Dict[int, AgentState]:
""" Current joint state of the simulation. """
return self.__state