import time
from typing import List, Dict, Optional, Union
import os
from datetime import datetime
from pathlib import Path
import numpy as np
import carla
import subprocess
import psutil
import platform
import logging
from carla import Transform, Location, Rotation, Vector3D
from igp2.opendrive import Map
from igp2.agents.agent import Agent
from igp2.carlasim.traffic_manager import TrafficManager
from igp2.carlasim.carla_agent_wrapper import CarlaAgentWrapper
from igp2.core.vehicle import Observation
from igp2.core.agentstate import AgentState
logger = logging.getLogger(__name__)
[docs]
class CarlaSim:
""" An interface to the CARLA simulator """
TIMEOUT = 20.0
def __init__(self,
fps: int = 20,
xodr: Union[str, Map] = None,
map_name: str = None,
server: str = "localhost",
port: int = 2000,
launch_process: bool = False,
carla_path: str = None,
record: bool = False,
rendering: bool = True):
""" Launch the CARLA simulator and define a CarlaSim object, which keeps the connection to the CARLA
server, manages agents and advances the environment.
Note:
The first agent of type MCTSAgent to be added to the simulation will be treated as the ego vehicle.
Args:
fps: number of frames simulated per second
xodr: path to a .xodr OpenDrive file defining the road layout
port: port to use for communication with the CARLA simulator
launch_process: whether to launch a new CARLA instance
carla_path: path to the root directory of the CARLA simulator
rendering: controls whether graphics are rendered
"""
logger.info(f"Launching CARLA simulation (server={server}, port={port}).")
self.__launch_process = launch_process
self.__carla_process = None
if self.__launch_process:
sys_name = platform.system()
if sys_name == "Windows":
if carla_path is None:
carla_path = r"C:\\Carla"
if "CarlaUE4.exe" not in [p.name() for p in psutil.process_iter()]:
args = [os.path.join(carla_path, 'CarlaUE4.exe'), '-quality-level=Low',
f'-carla-rpc-port={port}', '-dx11']
self.__carla_process = subprocess.Popen(args)
elif sys_name == "Linux":
if carla_path is None:
carla_path = "/opt/carla-simulator"
if "CarlaUE4.sh" not in [p.name() for p in psutil.process_iter()]:
args = [os.path.join(carla_path, 'CarlaUE4.sh'), '-quality-level=Low', f'-carla-rpc-port={port}']
self.__carla_process = subprocess.Popen(args)
else:
raise RuntimeError("Unsupported system!")
self.__record = record
self.__port = port
self.__client = carla.Client(server, port)
self.__client.set_timeout(self.TIMEOUT) # seconds
self.__wait_for_server()
self.__scenario_map = None
if isinstance(xodr, Map):
self.__scenario_map = xodr
if map_name is not None:
if self.__scenario_map is None:
self.__scenario_map = Map.parse_from_opendrive(f"scenarios/maps/{map_name}.xodr")
if not self.__client.get_world().get_map().name.endswith(map_name):
self.__client.load_world(map_name)
elif xodr is not None:
if self.__scenario_map is None:
self.__scenario_map = Map.parse_from_opendrive(xodr)
self.load_opendrive_world(self.__scenario_map.xodr_path)
else:
raise RuntimeError("Cannot load a map with the given parameters!")
self.__fps = fps
self.__timestep = 0
self.__world = self.__client.get_world()
self.__map = self.__world.get_map()
self.__original_settings = self.__world.get_settings()
settings = self.__world.get_settings()
settings.fixed_delta_seconds = 1 / fps
settings.synchronous_mode = True
settings.no_rendering_mode = not rendering
self.__world.apply_settings(settings)
self.__record_path = None
if self.__record:
now = datetime.now()
log_name = now.strftime("%d-%m-%Y_%H-%M-%S") + ".log"
dir_path = os.path.dirname(os.path.realpath(__file__))
path = Path(dir_path)
repo_path = str(path.parent.parent)
self.__record_path = os.path.join(repo_path, "scripts", "experiments", "data", "carla_recordings", log_name)
logger.info(f"Recording simulation under path: {self.__client.start_recorder(self.__record_path, True)}")
self.__agents = {}
self.__spectator = self.__world.get_spectator()
self.__spectator_parent = None
self.__spectator_transform = None
self.__traffic_manager = TrafficManager(self.__scenario_map)
self.__world.tick()
def __del__(self):
self.__clear_agents()
self.__world.apply_settings(self.__original_settings)
if self.__record:
self.__client.stop_recorder()
self.__world.tick()
if self.__launch_process:
if self.__carla_process is not None:
for child in psutil.Process(self.__carla_process.pid).children(recursive=True):
child.kill()
[docs]
def run(self, steps=400):
""" Run the simulation for a number of time steps """
for i in range(steps):
self.step()
time.sleep(1 / self.__fps)
[docs]
def step(self, tick: bool = True):
""" Advance the simulation by one time step.
Returns:
Current observation of the environment before taking actions this step, and the actions that will be taken.
"""
logger.debug(f"CARLA step {self.__timestep}.")
if tick:
self.__world.tick()
self.__timestep += 1
observation = self.__get_current_observation()
actions = self.__take_actions(observation)
self.__traffic_manager.update(self, observation)
self.__update_spectator()
return observation, actions
[docs]
def add_agent(self,
agent: Agent,
rolename: str = None,
blueprint: carla.ActorBlueprint = None):
""" Add a vehicle to the simulation. Defaults to an Audi A2 for blueprints if not explicitly given.
Args:
agent: Agent to add.
rolename: Unique name for the actor to spawn.
blueprint: Optional blueprint defining the properties of the actor.
Returns:
The newly added actor.
"""
if blueprint is None:
blueprint_library = self.__world.get_blueprint_library()
blueprint = blueprint_library.find('vehicle.audi.a2')
if rolename is not None:
blueprint.set_attribute('role_name', rolename)
state = agent.state
yaw = np.rad2deg(-state.heading)
transform = Transform(Location(x=state.position[0], y=-state.position[1], z=0.1),
Rotation(yaw=yaw, roll=0.0, pitch=0.0))
actor = self.__world.spawn_actor(blueprint, transform)
actor.set_target_velocity(Vector3D(state.velocity[0], -state.velocity[1], 0.))
carla_agent = CarlaAgentWrapper(agent, actor)
self.agents[carla_agent.agent_id] = carla_agent
self.__world.tick()
logger.info(f"Added agent {carla_agent.agent_id} (actor {carla_agent.actor_id}).")
[docs]
def remove_agent(self, agent_id: int):
""" Remove the given agent from the simulation.
Args:
agent_id: The ID of the agent to remove
"""
logger.debug(f"Removing Agent {agent_id} with Actor {self.agents[agent_id].actor}")
actor = self.agents[agent_id].actor
actor.destroy()
self.agents[agent_id].agent.alive = False
self.agents[agent_id] = None
[docs]
def get_traffic_manager(self) -> "TrafficManager":
""" Enables and returns the internal traffic manager of the simulation."""
self.__traffic_manager.enabled = True
spawn_points = []
for p in self.__world.get_map().get_spawn_points():
distances = [np.isclose((q.location - p.location).x, 0.0) and
np.isclose((q.location - p.location).y, 0.0) for q in spawn_points]
if not any(distances) and len(self.scenario_map.roads_at((p.location.x, -p.location.y), drivable=True)) > 0:
spawn_points.append(p)
self.__traffic_manager.spawns = spawn_points
return self.__traffic_manager
[docs]
def load_opendrive_world(self, xodr: str):
with open(xodr, 'r') as f:
opendrive = f.read()
self.__client.set_timeout(60.0)
self.__client.generate_opendrive_world(opendrive)
self.__client.set_timeout(self.TIMEOUT)
[docs]
def attach_camera(self,
actor: carla.Actor,
transform: carla.Transform = carla.Transform(carla.Location(x=1.6, z=1.7))):
""" Attach a camera to the back of the given actor in third-person view.
Args:
actor: The actor to follow
transform: Optional transform to set the position of the camera
"""
self.__spectator_parent = actor
self.__spectator_transform = transform
[docs]
def get_ego(self, ego_name: str = "ego") -> Optional[CarlaAgentWrapper]:
""" Returns the ego agent if it exists. """
for agent in self.agents.values():
if agent is not None and agent.name == ego_name:
return agent
return None
def __update_spectator(self):
if self.__spectator_parent is not None and self.__spectator_transform is not None:
actor_transform = self.__spectator_parent.get_transform()
actor_transform.location += self.__spectator_transform.location
# actor_transform.rotation += self.__spectator_transform.rotation
self.__spectator.set_transform(actor_transform)
def __take_actions(self, observation: Observation):
commands = []
controls = {}
for agent_id, agent in self.agents.items():
if agent is None:
continue
control = agent.next_control(observation)
if control is None:
self.remove_agent(agent.agent_id)
continue
controls[agent_id] = control
command = carla.command.ApplyVehicleControl(agent.actor, control)
commands.append(command)
self.__client.apply_batch_sync(commands)
return controls
def __get_current_observation(self) -> Observation:
actor_list = self.__world.get_actors()
vehicle_list = actor_list.filter("*vehicle*")
agent_id_lookup = dict([(a.actor_id, a.agent_id) for a in self.agents.values() if a is not None])
frame = {}
for vehicle in vehicle_list:
transform = vehicle.get_transform()
heading = np.deg2rad(-transform.rotation.yaw)
velocity = vehicle.get_velocity()
acceleration = vehicle.get_acceleration()
state = AgentState(time=self.__timestep,
position=np.array([transform.location.x, -transform.location.y]),
velocity=np.array([velocity.x, -velocity.y]),
acceleration=np.array([acceleration.x, -acceleration.y]),
heading=heading)
if vehicle.id in agent_id_lookup:
agent_id = agent_id_lookup[vehicle.id]
else:
agent_id = vehicle.id
frame[agent_id] = state
return Observation(frame, self.scenario_map)
def __wait_for_server(self):
for i in range(10):
try:
self.__client = carla.Client('localhost', self.__port)
self.__client.set_timeout(self.TIMEOUT) # seconds
self.__client.get_world()
return
except RuntimeError:
pass
self.__client.get_world()
def __clear_agents(self):
for agent_id, agent in self.agents.items():
if agent is not None:
self.remove_agent(agent_id)
@property
def client(self) -> carla.Client:
""" The CARLA client to the server. """
return self.__client
@property
def world(self) -> carla.World:
"""The current CARLA world"""
return self.__world
@property
def map(self) -> carla.Map:
""" The CARLA map. """
return self.__map
@property
def scenario_map(self) -> Map:
"""The current road layout. """
return self.__scenario_map
@property
def agents(self) -> Dict[int, CarlaAgentWrapper]:
""" All IGP2 agents that are present or were present during the simulation."""
return self.__agents
@property
def spectator(self) -> carla.Actor:
""" The spectator camera of the world. """
return self.__spectator
@property
def fps(self) -> int:
""" Execution frequency of the simulation. """
return self.__fps
@property
def timestep(self) -> int:
""" Current timestep of the simulation"""
return self.__timestep
@property
def dead_ids(self) -> List[int]:
""" List of Agent IDs that have been used previously during the simulation"""
return self.__dead_agent_ids
@property
def recording(self) -> bool:
""" Whether we are recording the simulation. """
return self.__record
@property
def recording_path(self) -> str:
""" The save path of the recording. """
return self.__record_path