Source code for igp2.simplesim.simulation
from collections import defaultdict
import logging
from typing import Dict, List
from igp2.opendrive.map import Map
from igp2.agents.agent import Agent
from igp2.core.vehicle import Action, Observation
logger = logging.getLogger(__name__)
[docs]class Simulation:
""" A lightweight simulator for IGP2 to perform rapid testing. """
def __init__(self,
scenario_map: Map,
fps: int = 20,
open_loop: bool = False):
""" Initialise new simulation.
Args:
scenario_map: The current road layout.
fps: Execution frame-rate.
open_loop: If true then no physical controller will be applied.
"""
self.__scenario_map = scenario_map
self.__fps = fps
self.__open_loop = open_loop
self.__t = 0
self.__state = {}
self.__agents = {}
self.__actions = defaultdict(list)
[docs] def add_agent(self, new_agent: Agent, rolename: str = None):
""" Add a new agent to the simulation.
Args:
new_agent: Agent to add.
rolename: Currently unused. Optional string to describe role of the vehicle.
"""
if new_agent.agent_id in self.__agents \
and self.__agents[new_agent.agent_id] is not None:
raise ValueError(f"Agent with ID {new_agent.agent_id} already exists.")
self.__agents[new_agent.agent_id] = new_agent
self.__state[new_agent.agent_id] = new_agent.vehicle.get_state(0)
logger.debug(f"Added Agent {new_agent.agent_id}")
[docs] def remove_agent(self, agent_id: int):
""" Remove an agent from the simulation.
Args:
agent_id: Agent ID to remove.
"""
self.__agents[agent_id].alive = False
self.__agents[agent_id] = None
logger.debug(f"Removed Agent {agent_id}")
[docs] def reset(self):
""" Remove all agents and reset internal state of simulation. """
self.__t = 0
self.__agents = {}
self.__state = {}
[docs] def step(self):
""" Advance simulation by one time step. """
logger.debug(f"Simulation step {self.__t}")
self.__take_actions()
self.__t += 1
def __take_actions(self):
new_frame = {}
observation = Observation(self.__state, self.__scenario_map)
for agent_id, agent in self.__agents.items():
if agent is None or not agent.alive:
continue
if not agent.alive or self.__t > 0 and agent.done(observation):
self.remove_agent(agent_id)
continue
new_state, action = agent.next_state(observation, return_action=True)
agent.trajectory_cl.add_state(new_state, reload_path=False)
self.__actions[agent_id].append(action)
new_frame[agent_id] = new_state
agent.alive = len(self.__scenario_map.roads_at(new_state.position)) > 0
self.__state = new_frame
@property
def scenario_map(self) -> Map:
""" The road layout of the simulation. """
return self.__scenario_map
@property
def agents(self) -> Dict[int, Agent]:
""" Agents in the simulation, mapping agent IDs to agents. """
return self.__agents
@property
def actions(self) -> Dict[int, List[Action]]:
""" List of actions (acceleration and steering) taken by every vehicle. """
return self.__actions
@property
def t(self) -> int:
""" The current time step of the simulation. """
return self.__t