Source code for igp2.carlasim.visualisation

#!/usr/bin/env python

# Adapted from the file manual_control.py
# Copyright (c) 2019 Computer Vision Center (CVC) at the Universitat Autonoma de
# Barcelona (UAB).
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.

# Allows controlling a vehicle with a keyboard. For a simpler and more
# documented example, please take a look at tutorial.py.


"""
Welcome to CARLA manual control.

    L            : toggle next light type
    SHIFT + L    : toggle high beam
    Z/X          : toggle right/left blinker
    I            : toggle interior light

    TAB          : change sensor position
    ` or N       : next sensor
    [1-9]        : change to sensor [1-9]
    G            : toggle radar visualization
    C            : change weather (Shift+C reverse)
    Backspace    : change vehicle

    V            : Select next map layer (Shift+V reverse)
    B            : Load current selected map layer (Shift+B to unload)

    R            : toggle recording images to disk
    T            : toggle vehicle's telemetry

    CTRL + R     : toggle recording of simulation (replacing any previous)
    CTRL + P     : start replaying last recorded simulation
    CTRL + +     : increments the start time of the replay by 1 second (+SHIFT = 10 seconds)
    CTRL + -     : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds)

    F1           : toggle HUD
    H/?          : toggle help
    ESC          : quit
"""

import os
from typing import Dict

os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"

import sys

import carla
from carla import ColorConverter as cc

import collections
import datetime
import logging
import math
import weakref
import pygame
from pygame.locals import KMOD_CTRL
from pygame.locals import KMOD_SHIFT
from pygame.locals import K_0
from pygame.locals import K_9
from pygame.locals import K_BACKQUOTE
from pygame.locals import K_ESCAPE
from pygame.locals import K_F1
from pygame.locals import K_SLASH
from pygame.locals import K_TAB
from pygame.locals import K_b
from pygame.locals import K_c
from pygame.locals import K_g
from pygame.locals import K_h
from pygame.locals import K_i
from pygame.locals import K_l
from pygame.locals import K_n
from pygame.locals import K_p
from pygame.locals import K_q
from pygame.locals import K_r
from pygame.locals import K_t
from pygame.locals import K_v
from pygame.locals import K_x
from pygame.locals import K_z
from pygame.locals import K_MINUS
from pygame.locals import K_EQUALS
import numpy as np

from igp2.carlasim import CarlaSim, find_weather_presets, get_actor_display_name
from igp2.carlasim.carla_agent_wrapper import CarlaAgentWrapper

logger = logging.getLogger(__name__)


# ==============================================================================
# -- World ---------------------------------------------------------------------
# ==============================================================================


[docs]class World(object): """ Wrapper to manage the CARLA world and its rendering to pygame. """ def __init__(self, carla_world: carla.World, ego: CarlaAgentWrapper, hud: "HUD", igp2_hud: "Igp2HUD", gamma: float): """ Initialise a new World. Args: carla_world: The current CARLA world. ego: The ego agent wrapper. hud: Heads-up display to render onto the pygame display. igp2_hud: HUD to display ego predictions gamma: Display gamma value. """ self.world = carla_world self.sync = True try: self.map = self.world.get_map() except RuntimeError as error: print('RuntimeError: {}'.format(error)) print(' The server could not send the OpenDRIVE (.xodr) file:') print(' Make sure it exists, has the same name of your town, and is correct.') sys.exit(1) self.hud = hud self.igp2_hud = igp2_hud self.ego = ego self.player = ego.actor self.collision_sensor = None self.lane_invasion_sensor = None self.gnss_sensor = None self.imu_sensor = None self.radar_sensor = None self.camera_manager = None self._weather_presets = find_weather_presets() self._weather_index = 0 self._gamma = gamma self.restart() self.world.on_tick(hud.on_world_tick) self.recording_enabled = False self.recording_start = 0 self.show_vehicle_telemetry = False self.current_map_layer = 0 self.map_layer_names = [ carla.MapLayer.NONE, carla.MapLayer.Buildings, carla.MapLayer.Decals, carla.MapLayer.Foliage, carla.MapLayer.Ground, carla.MapLayer.ParkedVehicles, carla.MapLayer.Particles, carla.MapLayer.Props, carla.MapLayer.StreetLights, carla.MapLayer.Walls, carla.MapLayer.All ]
[docs] def restart(self): # Keep same camera config if the camera manager exists. cam_index = self.camera_manager.index if self.camera_manager is not None else 0 cam_pos_index = self.camera_manager.transform_index if self.camera_manager is not None else 0 # Set up the sensors. self.collision_sensor = CollisionSensor(self.player, self.hud) self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud) self.gnss_sensor = GnssSensor(self.player) self.imu_sensor = IMUSensor(self.player) self.camera_manager = CameraManager(self.player, self.hud, self._gamma) self.camera_manager.transform_index = cam_pos_index self.camera_manager.set_sensor(cam_index, notify=False) actor_type = get_actor_display_name(self.player) self.hud.notification(actor_type) self.world.tick()
[docs] def next_weather(self, reverse=False): self._weather_index += -1 if reverse else 1 self._weather_index %= len(self._weather_presets) preset = self._weather_presets[self._weather_index] self.hud.notification('Weather: %s' % preset[1]) self.player.get_world().set_weather(preset[0])
[docs] def next_map_layer(self, reverse=False): self.current_map_layer += -1 if reverse else 1 self.current_map_layer %= len(self.map_layer_names) selected = self.map_layer_names[self.current_map_layer] self.hud.notification('LayerMap selected: %s' % selected)
[docs] def load_map_layer(self, unload=False): selected = self.map_layer_names[self.current_map_layer] if unload: self.hud.notification('Unloading map layer: %s' % selected) self.world.unload_map_layer(selected) else: self.hud.notification('Loading map layer: %s' % selected) self.world.load_map_layer(selected)
[docs] def toggle_radar(self): if self.radar_sensor is None: self.radar_sensor = RadarSensor(self.player) elif self.radar_sensor.sensor is not None: self.radar_sensor.sensor.destroy() self.radar_sensor = None
[docs] def tick(self, clock): self.hud.tick(self, clock) self.igp2_hud.tick(self, clock)
[docs] def render(self, display): self.camera_manager.render(display) self.hud.render(display) self.igp2_hud.render(display)
[docs] def destroy_sensors(self): self.camera_manager.sensor.destroy() self.camera_manager.sensor = None self.camera_manager.index = None
[docs] def destroy(self): if self.radar_sensor is not None: self.toggle_radar() sensors = [ self.camera_manager.sensor, self.collision_sensor.sensor, self.lane_invasion_sensor.sensor, self.gnss_sensor.sensor, self.imu_sensor.sensor] for sensor in sensors: if sensor is not None: sensor.stop() sensor.destroy()
# ============================================================================== # -- KeyboardControl ----------------------------------------------------------- # ==============================================================================
[docs]class KeyboardControl(object): """Class that handles keyboard input.""" def __init__(self, world): if isinstance(world.player, carla.Vehicle): self._control = carla.VehicleControl() self._lights = carla.VehicleLightState.NONE world.player.set_autopilot(False) world.player.set_light_state(self._lights) elif isinstance(world.player, carla.Walker): self._control = carla.WalkerControl() self._autopilot_enabled = False self._rotation = world.player.get_transform().rotation else: raise NotImplementedError("Actor type not supported") self._steer_cache = 0.0 world.hud.notification("Press 'H' or '?' for help.", seconds=4.0)
[docs] def parse_events(self, client, world): if isinstance(self._control, carla.VehicleControl): current_lights = self._lights for event in pygame.event.get(): if event.type == pygame.QUIT: return True elif event.type == pygame.KEYUP: if self._is_quit_shortcut(event.key): return True elif event.key == K_F1: world.hud.toggle_info() elif event.key == K_v and pygame.key.get_mods() & KMOD_SHIFT: world.next_map_layer(reverse=True) elif event.key == K_v: world.next_map_layer() elif event.key == K_b and pygame.key.get_mods() & KMOD_SHIFT: world.load_map_layer(unload=True) elif event.key == K_b: world.load_map_layer() elif event.key == K_h or (event.key == K_SLASH and pygame.key.get_mods() & KMOD_SHIFT): world.hud.help.toggle() elif event.key == K_TAB: world.camera_manager.toggle_camera() elif event.key == K_c and pygame.key.get_mods() & KMOD_SHIFT: world.next_weather(reverse=True) elif event.key == K_c: world.next_weather() elif event.key == K_g: world.toggle_radar() elif event.key == K_BACKQUOTE: world.camera_manager.next_sensor() elif event.key == K_n: world.camera_manager.next_sensor() elif event.key == K_t: if world.show_vehicle_telemetry: world.player.show_debug_telemetry(False) world.show_vehicle_telemetry = False world.hud.notification("Disabled Vehicle Telemetry") else: try: world.player.show_debug_telemetry(True) world.show_vehicle_telemetry = True world.hud.notification("Enabled Vehicle Telemetry") except Exception: pass elif K_0 < event.key <= K_9: index_ctrl = 0 if pygame.key.get_mods() & KMOD_CTRL: index_ctrl = 9 world.camera_manager.set_sensor(event.key - 1 - K_0 + index_ctrl) elif event.key == K_r and not (pygame.key.get_mods() & KMOD_CTRL): world.camera_manager.toggle_recording() elif event.key == K_r and (pygame.key.get_mods() & KMOD_CTRL): if world.recording_enabled: client.stop_recorder() world.recording_enabled = False world.hud.notification("Recorder is OFF") else: client.start_recorder("manual_recording.rec") world.recording_enabled = True world.hud.notification("Recorder is ON") elif event.key == K_p and (pygame.key.get_mods() & KMOD_CTRL): # stop recorder client.stop_recorder() world.recording_enabled = False # work around to fix camera at start of replaying current_index = world.camera_manager.index world.destroy_sensors() # disable autopilot self._autopilot_enabled = False world.player.set_autopilot(self._autopilot_enabled) world.hud.notification("Replaying file 'manual_recording.rec'") # replayer client.replay_file("manual_recording.rec", world.recording_start, 0, 0) world.camera_manager.set_sensor(current_index) elif event.key == K_MINUS and (pygame.key.get_mods() & KMOD_CTRL): if pygame.key.get_mods() & KMOD_SHIFT: world.recording_start -= 10 else: world.recording_start -= 1 world.hud.notification("Recording start time is %d" % (world.recording_start)) elif event.key == K_EQUALS and (pygame.key.get_mods() & KMOD_CTRL): if pygame.key.get_mods() & KMOD_SHIFT: world.recording_start += 10 else: world.recording_start += 1 world.hud.notification("Recording start time is %d" % (world.recording_start)) if isinstance(self._control, carla.VehicleControl): if event.key == K_l and pygame.key.get_mods() & KMOD_CTRL: current_lights ^= carla.VehicleLightState.Special1 elif event.key == K_l and pygame.key.get_mods() & KMOD_SHIFT: current_lights ^= carla.VehicleLightState.HighBeam elif event.key == K_l: # Use 'L' key to switch between lights: # closed -> position -> low beam -> fog if not self._lights & carla.VehicleLightState.Position: world.hud.notification("Position lights") current_lights |= carla.VehicleLightState.Position else: world.hud.notification("Low beam lights") current_lights |= carla.VehicleLightState.LowBeam if self._lights & carla.VehicleLightState.LowBeam: world.hud.notification("Fog lights") current_lights |= carla.VehicleLightState.Fog if self._lights & carla.VehicleLightState.Fog: world.hud.notification("Lights off") current_lights ^= carla.VehicleLightState.Position current_lights ^= carla.VehicleLightState.LowBeam current_lights ^= carla.VehicleLightState.Fog elif event.key == K_i: current_lights ^= carla.VehicleLightState.Interior elif event.key == K_z: current_lights ^= carla.VehicleLightState.LeftBlinker elif event.key == K_x: current_lights ^= carla.VehicleLightState.RightBlinker
@staticmethod def _is_quit_shortcut(key): return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL)
# ============================================================================== # -- HUD ----------------------------------------------------------------------- # ==============================================================================
[docs]class HUD(object): def __init__(self, width, height): self.dim = (width, height) font = pygame.font.Font(pygame.font.get_default_font(), 20) font_name = 'courier' if os.name == 'nt' else 'mono' fonts = [x for x in pygame.font.get_fonts() if font_name in x] default_font = 'ubuntumono' mono = default_font if default_font in fonts else fonts[0] mono = pygame.font.match_font(mono) self._font_mono = pygame.font.Font(mono, 12 if os.name == 'nt' else 14) self._notifications = FadingText(font, (width, 40), (0, height - 40)) self.help = HelpText(pygame.font.Font(mono, 16), width, height) self.server_fps = 0 self.frame = 0 self.simulation_time = 0 self._show_info = True self._info_text = [] self._server_clock = pygame.time.Clock()
[docs] def on_world_tick(self, timestamp): self._server_clock.tick() self.server_fps = self._server_clock.get_fps() self.frame = timestamp.frame self.simulation_time = timestamp.elapsed_seconds
[docs] def tick(self, world, clock): self._notifications.tick(world, clock) if not self._show_info: return t = world.player.get_transform() v = world.player.get_velocity() c = world.player.get_control() compass = world.imu_sensor.compass heading = 'N' if compass > 270.5 or compass < 89.5 else '' heading += 'S' if 90.5 < compass < 269.5 else '' heading += 'E' if 0.5 < compass < 179.5 else '' heading += 'W' if 180.5 < compass < 359.5 else '' colhist = world.collision_sensor.get_collision_history() collision = [colhist[x + self.frame - 200] for x in range(0, 200)] max_col = max(1.0, max(collision)) collision = [x / max_col for x in collision] vehicles = world.world.get_actors().filter('vehicle.*') self._info_text = [ 'Server: % 16.0f FPS' % self.server_fps, 'Client: % 16.0f FPS' % clock.get_fps(), '', 'Vehicle: % 20s' % get_actor_display_name(world.player, truncate=20), 'Map: % 20s' % world.map.name.split('/')[-1], 'Simulation time: % 12s' % datetime.timedelta(seconds=int(self.simulation_time)), '', 'Speed: % 15.0f km/h' % (3.6 * math.sqrt(v.x ** 2 + v.y ** 2 + v.z ** 2)), u'Compass:% 17.0f\N{DEGREE SIGN} % 2s' % (compass, heading), 'Accelero: (%5.1f,%5.1f,%5.1f)' % (world.imu_sensor.accelerometer), 'Gyroscop: (%5.1f,%5.1f,%5.1f)' % (world.imu_sensor.gyroscope), 'Location:% 20s' % ('(% 5.1f, % 5.1f)' % (t.location.x, t.location.y)), 'GNSS:% 24s' % ('(% 2.6f, % 3.6f)' % (world.gnss_sensor.lat, world.gnss_sensor.lon)), 'Height: % 18.0f m' % t.location.z, ''] if isinstance(c, carla.VehicleControl): self._info_text += [ ('Throttle:', c.throttle, 0.0, 1.0), ('Steer:', c.steer, -1.0, 1.0), ('Brake:', c.brake, 0.0, 1.0), ('Reverse:', c.reverse), ('Hand brake:', c.hand_brake), ('Manual:', c.manual_gear_shift), 'Gear: %s' % {-1: 'R', 0: 'N'}.get(c.gear, c.gear)] elif isinstance(c, carla.WalkerControl): self._info_text += [ ('Speed:', c.speed, 0.0, 5.556), ('Jump:', c.jump)] self._info_text += [ '', 'Collision:', collision, '', 'Number of vehicles: % 8d' % len(vehicles)] if len(vehicles) > 1: self._info_text += ['Nearby vehicles:'] distance = lambda l: math.sqrt( (l.x - t.location.x) ** 2 + (l.y - t.location.y) ** 2 + (l.z - t.location.z) ** 2) vehicles = [(distance(x.get_location()), x) for x in vehicles if x.id != world.player.id] for d, vehicle in sorted(vehicles, key=lambda vehicles: vehicles[0]): if d > world.ego.agent.view_radius: break vehicle_type = get_actor_display_name(vehicle, truncate=22) self._info_text.append('% 4dm %s' % (d, vehicle_type))
[docs] def toggle_info(self): self._show_info = not self._show_info
[docs] def notification(self, text, seconds=2.0): self._notifications.set_text(text, seconds=seconds)
[docs] def error(self, text): self._notifications.set_text('Error: %s' % text, (255, 0, 0))
[docs] def render(self, display): if self._show_info: info_surface = pygame.Surface((220, self.dim[1])) info_surface.set_alpha(100) display.blit(info_surface, (0, 0)) v_offset = 4 bar_h_offset = 100 bar_width = 106 for item in self._info_text: if v_offset + 18 > self.dim[1]: break if isinstance(item, list): if len(item) > 1: points = [(x + 8, v_offset + 8 + (1.0 - y) * 30) for x, y in enumerate(item)] pygame.draw.lines(display, (255, 136, 0), False, points, 2) item = None v_offset += 18 elif isinstance(item, tuple): if isinstance(item[1], bool): rect = pygame.Rect((bar_h_offset, v_offset + 8), (6, 6)) pygame.draw.rect(display, (255, 255, 255), rect, 0 if item[1] else 1) else: rect_border = pygame.Rect((bar_h_offset, v_offset + 8), (bar_width, 6)) pygame.draw.rect(display, (255, 255, 255), rect_border, 1) f = (item[1] - item[2]) / (item[3] - item[2]) if item[2] < 0.0: rect = pygame.Rect((bar_h_offset + f * (bar_width - 6), v_offset + 8), (6, 6)) else: rect = pygame.Rect((bar_h_offset, v_offset + 8), (f * bar_width, 6)) pygame.draw.rect(display, (255, 255, 255), rect) item = item[0] if item: # At this point has to be a str. surface = self._font_mono.render(item, True, (255, 255, 255)) display.blit(surface, (8, v_offset)) v_offset += 18 self._notifications.render(display) self.help.render(display)
# ============================================================================== # -- Igp2HUD ----------------------------------------------------------------------- # ==============================================================================
[docs]class Igp2HUD(object): def __init__(self, width: int, height: int, agents: Dict[int, CarlaAgentWrapper]): """ Initialise a HUD showing ego goal predictions. Args: width: Width of the visible screen height: Height of the visible screen agents: Dictionary of agent wrappers from an instance of CarlaSim """ self.dim = (width, height) self.agents = agents font_name = 'courier' if os.name == 'nt' else 'mono' fonts = [x for x in pygame.font.get_fonts() if font_name in x] default_font = 'ubuntumono' mono = default_font if default_font in fonts else fonts[0] mono = pygame.font.match_font(mono) self._font_mono = pygame.font.Font(mono, 12 if os.name == 'nt' else 14) self.frame = 0 self._show_info = True self._info_text = [] self._server_clock = pygame.time.Clock()
[docs] def on_world_tick(self, timestamp): self._server_clock.tick() self.frame = timestamp.frame
[docs] def tick(self, world, clock): if not self._show_info: return t = world.player.get_transform() self._info_text = [] self._info_text.append(f"Macro: {world.ego.agent.current_macro}") self._info_text.append(f"Manoeuvre: {world.ego.agent.current_macro.current_maneuver}") self._info_text.append("") self._info_text.append("Visible Goals:") for gid, goal in enumerate(world.ego.agent.possible_goals): self._info_text.append(f" {gid}: {np.round(goal.center, 2)}") self._info_text.append("") self._info_text.append("Goal Predictions:") if world.ego.agent.goal_probabilities is not None: for agent_id, predictions in world.ego.agent.goal_probabilities.items(): if self.agents[agent_id] is None: continue vehicle = self.agents[agent_id].actor vehicle_type = get_actor_display_name(vehicle, truncate=22) self._info_text.append(f"Agent {agent_id} - {vehicle_type}:") for i, (goal_with_type, prob) in enumerate(predictions.goals_probabilities.items()): if not np.isclose(prob, 0.0): self._info_text.append(f" {i}: {np.round(prob, 2)}")
[docs] def toggle_info(self): self._show_info = not self._show_info
[docs] def render(self, display): if not self._show_info: return width = 220 info_surface = pygame.Surface((width, self.dim[1])) info_surface.set_alpha(100) v_offset = 4 h_offset = self.dim[0] - width display.blit(info_surface, (h_offset, 0)) for item in self._info_text: if v_offset + 18 > self.dim[1]: break surface = self._font_mono.render(item, True, (255, 255, 255)) display.blit(surface, (h_offset, v_offset)) v_offset += 18
# ============================================================================== # -- FadingText ---------------------------------------------------------------- # ==============================================================================
[docs]class FadingText(object): def __init__(self, font, dim, pos): self.font = font self.dim = dim self.pos = pos self.seconds_left = 0 self.surface = pygame.Surface(self.dim)
[docs] def set_text(self, text, color=(255, 255, 255), seconds=2.0): text_texture = self.font.render(text, True, color) self.surface = pygame.Surface(self.dim) self.seconds_left = seconds self.surface.fill((0, 0, 0, 0)) self.surface.blit(text_texture, (10, 11))
[docs] def tick(self, _, clock): delta_seconds = 1e-3 * clock.get_time() self.seconds_left = max(0.0, self.seconds_left - delta_seconds) self.surface.set_alpha(500.0 * self.seconds_left)
[docs] def render(self, display): display.blit(self.surface, self.pos)
# ============================================================================== # -- HelpText ------------------------------------------------------------------ # ==============================================================================
[docs]class HelpText(object): """Helper class to handle text output using pygame""" def __init__(self, font, width, height): lines = __doc__.split('\n') self.font = font self.line_space = 18 self.dim = (780, len(lines) * self.line_space + 12) self.pos = (0.5 * width - 0.5 * self.dim[0], 0.5 * height - 0.5 * self.dim[1]) self.seconds_left = 0 self.surface = pygame.Surface(self.dim) self.surface.fill((0, 0, 0, 0)) for n, line in enumerate(lines): text_texture = self.font.render(line, True, (255, 255, 255)) self.surface.blit(text_texture, (22, n * self.line_space)) self._render = False self.surface.set_alpha(220)
[docs] def toggle(self): self._render = not self._render
[docs] def render(self, display): if self._render: display.blit(self.surface, self.pos)
# ============================================================================== # -- CollisionSensor ----------------------------------------------------------- # ==============================================================================
[docs]class CollisionSensor(object): def __init__(self, parent_actor, hud): self.sensor = None self.history = [] self._parent = parent_actor self.hud = hud world = self._parent.get_world() bp = world.get_blueprint_library().find('sensor.other.collision') self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) # We need to pass the lambda a weak reference to self to avoid circular # reference. weak_self = weakref.ref(self) self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event))
[docs] def get_collision_history(self): history = collections.defaultdict(int) for frame, intensity in self.history: history[frame] += intensity return history
@staticmethod def _on_collision(weak_self, event): self = weak_self() if not self: return actor_type = get_actor_display_name(event.other_actor) self.hud.notification('Collision with %r' % actor_type) impulse = event.normal_impulse intensity = math.sqrt(impulse.x ** 2 + impulse.y ** 2 + impulse.z ** 2) self.history.append((event.frame, intensity)) if len(self.history) > 4000: self.history.pop(0)
# ============================================================================== # -- LaneInvasionSensor -------------------------------------------------------- # ==============================================================================
[docs]class LaneInvasionSensor(object): def __init__(self, parent_actor, hud): self.sensor = None # If the spawn object is not a vehicle, we cannot use the Lane Invasion Sensor if parent_actor.type_id.startswith("vehicle."): self._parent = parent_actor self.hud = hud world = self._parent.get_world() bp = world.get_blueprint_library().find('sensor.other.lane_invasion') self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) # We need to pass the lambda a weak reference to self to avoid circular # reference. weak_self = weakref.ref(self) self.sensor.listen(lambda event: LaneInvasionSensor._on_invasion(weak_self, event)) @staticmethod def _on_invasion(weak_self, event): self = weak_self() if not self: return lane_types = set(x.type for x in event.crossed_lane_markings) text = ['%r' % str(x).split()[-1] for x in lane_types] self.hud.notification('Crossed line %s' % ' and '.join(text))
# ============================================================================== # -- GnssSensor ---------------------------------------------------------------- # ==============================================================================
[docs]class GnssSensor(object): def __init__(self, parent_actor): self.sensor = None self._parent = parent_actor self.lat = 0.0 self.lon = 0.0 world = self._parent.get_world() bp = world.get_blueprint_library().find('sensor.other.gnss') self.sensor = world.spawn_actor(bp, carla.Transform(carla.Location(x=1.0, z=2.8)), attach_to=self._parent) # We need to pass the lambda a weak reference to self to avoid circular # reference. weak_self = weakref.ref(self) self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event)) @staticmethod def _on_gnss_event(weak_self, event): self = weak_self() if not self: return self.lat = event.latitude self.lon = event.longitude
# ============================================================================== # -- IMUSensor ----------------------------------------------------------------- # ==============================================================================
[docs]class IMUSensor(object): def __init__(self, parent_actor): self.sensor = None self._parent = parent_actor self.accelerometer = (0.0, 0.0, 0.0) self.gyroscope = (0.0, 0.0, 0.0) self.compass = 0.0 world = self._parent.get_world() bp = world.get_blueprint_library().find('sensor.other.imu') self.sensor = world.spawn_actor( bp, carla.Transform(), attach_to=self._parent) # We need to pass the lambda a weak reference to self to avoid circular # reference. weak_self = weakref.ref(self) self.sensor.listen( lambda sensor_data: IMUSensor._IMU_callback(weak_self, sensor_data)) @staticmethod def _IMU_callback(weak_self, sensor_data): self = weak_self() if not self: return limits = (-99.9, 99.9) self.accelerometer = ( max(limits[0], min(limits[1], sensor_data.accelerometer.x)), max(limits[0], min(limits[1], sensor_data.accelerometer.y)), max(limits[0], min(limits[1], sensor_data.accelerometer.z))) self.gyroscope = ( max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.x))), max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.y))), max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.z)))) self.compass = math.degrees(sensor_data.compass)
# ============================================================================== # -- RadarSensor --------------------------------------------------------------- # ==============================================================================
[docs]class RadarSensor(object): def __init__(self, parent_actor): self.sensor = None self._parent = parent_actor bound_x = 0.5 + self._parent.bounding_box.extent.x bound_y = 0.5 + self._parent.bounding_box.extent.y bound_z = 0.5 + self._parent.bounding_box.extent.z self.velocity_range = 7.5 # m/s world = self._parent.get_world() self.debug = world.debug bp = world.get_blueprint_library().find('sensor.other.radar') bp.set_attribute('horizontal_fov', str(35)) bp.set_attribute('vertical_fov', str(20)) self.sensor = world.spawn_actor( bp, carla.Transform( carla.Location(x=bound_x + 0.05, z=bound_z + 0.05), carla.Rotation(pitch=5)), attach_to=self._parent) # We need a weak reference to self to avoid circular reference. weak_self = weakref.ref(self) self.sensor.listen( lambda radar_data: RadarSensor._radar_callback(weak_self, radar_data)) @staticmethod def _radar_callback(weak_self, radar_data): self = weak_self() if not self: return # To get a numpy [[vel, altitude, azimuth, depth],...[,,,]]: # points = np.frombuffer(radar_data.raw_data, dtype=np.dtype('f4')) # points = np.reshape(points, (len(radar_data), 4)) current_rot = radar_data.transform.rotation for detect in radar_data: azi = math.degrees(detect.azimuth) alt = math.degrees(detect.altitude) # The 0.25 adjusts a bit the distance so the dots can # be properly seen fw_vec = carla.Vector3D(x=detect.depth - 0.25) carla.Transform( carla.Location(), carla.Rotation( pitch=current_rot.pitch + alt, yaw=current_rot.yaw + azi, roll=current_rot.roll)).transform(fw_vec) def clamp(min_v, max_v, value): return max(min_v, min(value, max_v)) norm_velocity = detect.velocity / self.velocity_range # range [-1, 1] r = int(clamp(0.0, 1.0, 1.0 - norm_velocity) * 255.0) g = int(clamp(0.0, 1.0, 1.0 - abs(norm_velocity)) * 255.0) b = int(abs(clamp(- 1.0, 0.0, - 1.0 - norm_velocity)) * 255.0) self.debug.draw_point( radar_data.transform.location + fw_vec, size=0.075, life_time=0.06, persistent_lines=False, color=carla.Color(r, g, b))
# ============================================================================== # -- CameraManager ------------------------------------------------------------- # ==============================================================================
[docs]class CameraManager(object): def __init__(self, parent_actor, hud, gamma_correction): self.sensor = None self.surface = None self._parent = parent_actor self.hud = hud self.recording = False bound_x = 0.5 + self._parent.bounding_box.extent.x bound_y = 0.5 + self._parent.bounding_box.extent.y bound_z = 0.5 + self._parent.bounding_box.extent.z Attachment = carla.AttachmentType if not self._parent.type_id.startswith("walker.pedestrian"): self._camera_transforms = [ (carla.Transform(carla.Location(x=-2.0 * bound_x, y=+0.0 * bound_y, z=2.0 * bound_z), carla.Rotation(pitch=8.0)), Attachment.SpringArm), ( carla.Transform(carla.Location(x=+0.8 * bound_x, y=+0.0 * bound_y, z=1.3 * bound_z)), Attachment.Rigid), (carla.Transform(carla.Location(x=+1.9 * bound_x, y=+1.0 * bound_y, z=1.2 * bound_z)), Attachment.SpringArm), (carla.Transform(carla.Location(x=-2.8 * bound_x, y=+0.0 * bound_y, z=4.6 * bound_z), carla.Rotation(pitch=6.0)), Attachment.SpringArm), (carla.Transform(carla.Location(x=-1.0, y=-1.0 * bound_y, z=0.4 * bound_z)), Attachment.Rigid), (carla.Transform(carla.Location(x=-0.15, y=-0.4, z=1.2), carla.Rotation()), Attachment.Rigid) ] else: self._camera_transforms = [ (carla.Transform(carla.Location(x=-2.5, z=0.0), carla.Rotation(pitch=-8.0)), Attachment.SpringArm), (carla.Transform(carla.Location(x=1.6, z=1.7)), Attachment.Rigid), ( carla.Transform(carla.Location(x=2.5, y=0.5, z=0.0), carla.Rotation(pitch=-8.0)), Attachment.SpringArm), (carla.Transform(carla.Location(x=-4.0, z=2.0), carla.Rotation(pitch=6.0)), Attachment.SpringArm), (carla.Transform(carla.Location(x=0, y=-2.5, z=-0.0), carla.Rotation(yaw=90.0)), Attachment.Rigid)] self.transform_index = 1 self.sensors = [ ['sensor.camera.rgb', cc.Raw, 'Camera RGB', {}], ['sensor.camera.depth', cc.Raw, 'Camera Depth (Raw)', {}], ['sensor.camera.depth', cc.Depth, 'Camera Depth (Gray Scale)', {}], ['sensor.camera.depth', cc.LogarithmicDepth, 'Camera Depth (Logarithmic Gray Scale)', {}], ['sensor.camera.semantic_segmentation', cc.Raw, 'Camera Semantic Segmentation (Raw)', {}], ['sensor.camera.semantic_segmentation', cc.CityScapesPalette, 'Camera Semantic Segmentation (CityScapes Palette)', {}], ['sensor.lidar.ray_cast', None, 'Lidar (Ray-Cast)', {'range': '50'}], ['sensor.camera.dvs', cc.Raw, 'Dynamic Vision Sensor', {}], ['sensor.camera.rgb', cc.Raw, 'Camera RGB Distorted', {'lens_circle_multiplier': '3.0', 'lens_circle_falloff': '3.0', 'chromatic_aberration_intensity': '0.5', 'chromatic_aberration_offset': '0'}], ['sensor.camera.optical_flow', cc.Raw, 'Optical Flow', {}], ] world = self._parent.get_world() bp_library = world.get_blueprint_library() for item in self.sensors: bp = bp_library.find(item[0]) if item[0].startswith('sensor.camera'): bp.set_attribute('image_size_x', str(hud.dim[0])) bp.set_attribute('image_size_y', str(hud.dim[1])) if bp.has_attribute('gamma'): bp.set_attribute('gamma', str(gamma_correction)) for attr_name, attr_value in item[3].items(): bp.set_attribute(attr_name, attr_value) elif item[0].startswith('sensor.lidar'): self.lidar_range = 50 for attr_name, attr_value in item[3].items(): bp.set_attribute(attr_name, attr_value) if attr_name == 'range': self.lidar_range = float(attr_value) item.append(bp) self.index = None
[docs] def toggle_camera(self): self.transform_index = (self.transform_index + 1) % len(self._camera_transforms) self.set_sensor(self.index, notify=False, force_respawn=True)
[docs] def set_sensor(self, index, notify=True, force_respawn=False): index = index % len(self.sensors) needs_respawn = True if self.index is None else \ (force_respawn or (self.sensors[index][2] != self.sensors[self.index][2])) if needs_respawn: if self.sensor is not None: self.sensor.destroy() self.surface = None self.sensor = self._parent.get_world().spawn_actor( self.sensors[index][-1], self._camera_transforms[self.transform_index][0], attach_to=self._parent, attachment_type=self._camera_transforms[self.transform_index][1]) # We need to pass the lambda a weak reference to self to avoid # circular reference. weak_self = weakref.ref(self) self.sensor.listen(lambda image: CameraManager._parse_image(weak_self, image)) if notify: self.hud.notification(self.sensors[index][2]) self.index = index
[docs] def next_sensor(self): self.set_sensor(self.index + 1)
[docs] def toggle_recording(self): self.recording = not self.recording self.hud.notification('Recording %s' % ('On' if self.recording else 'Off'))
[docs] def render(self, display): if self.surface is not None: display.blit(self.surface, (0, 0))
@staticmethod def _parse_image(weak_self, image): self = weak_self() if not self: return if self.sensors[self.index][0].startswith('sensor.lidar'): points = np.frombuffer(image.raw_data, dtype=np.dtype('f4')) points = np.reshape(points, (int(points.shape[0] / 4), 4)) lidar_data = np.array(points[:, :2]) lidar_data *= min(self.hud.dim) / (2.0 * self.lidar_range) lidar_data += (0.5 * self.hud.dim[0], 0.5 * self.hud.dim[1]) lidar_data = np.fabs(lidar_data) # pylint: disable=E1111 lidar_data = lidar_data.astype(np.int32) lidar_data = np.reshape(lidar_data, (-1, 2)) lidar_img_size = (self.hud.dim[0], self.hud.dim[1], 3) lidar_img = np.zeros((lidar_img_size), dtype=np.uint8) lidar_img[tuple(lidar_data.T)] = (255, 255, 255) self.surface = pygame.surfarray.make_surface(lidar_img) elif self.sensors[self.index][0].startswith('sensor.camera.dvs'): # Example of converting the raw_data from a carla.DVSEventArray # sensor into a NumPy array and using it as an image dvs_events = np.frombuffer(image.raw_data, dtype=np.dtype([ ('x', np.uint16), ('y', np.uint16), ('t', np.int64), ('pol', np.bool)])) dvs_img = np.zeros((image.height, image.width, 3), dtype=np.uint8) # Blue is positive, red is negative dvs_img[dvs_events[:]['y'], dvs_events[:]['x'], dvs_events[:]['pol'] * 2] = 255 self.surface = pygame.surfarray.make_surface(dvs_img.swapaxes(0, 1)) elif self.sensors[self.index][0].startswith('sensor.camera.optical_flow'): image = image.get_color_coded_flow() array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) array = np.reshape(array, (image.height, image.width, 4)) array = array[:, :, :3] array = array[:, :, ::-1] self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) else: image.convert(self.sensors[self.index][1]) array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) array = np.reshape(array, (image.height, image.width, 4)) array = array[:, :, :3] array = array[:, :, ::-1] self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) if self.recording: image.save_to_disk('_out/%08d' % image.frame)
# ============================================================================== # -- Visualiser --------------------------------------------------------------- # ==============================================================================
[docs]class Visualiser(object): """ Create a pygame-based visualisation with HUD to show the evolution of scenarios. """ def __init__(self, carla_simulation: CarlaSim, ego_role_name: str = "ego", res_width: int = 1920, res_height: int = 1080, gamma: float = 2.2): """ Initialise a new visualiser. Args: carla_simulation: The simulation to visualise. ego_role_name: The role name of the ego vehicle. res_width: The width of the screen in pixels. res_height: The height of the screen in pixels. gamma: Gamma correction value """ self.carla_sim = carla_simulation self.ego_rolename = ego_role_name self.width = res_width self.height = res_height self.gamma = gamma
[docs] def initialize(self): """ Start pygame window and create subcomponents for running simulation. """ pygame.init() pygame.font.init() sim_world = self.carla_sim.world display = pygame.display.set_mode( (self.width, self.height), pygame.HWSURFACE | pygame.DOUBLEBUF) display.fill((0, 0, 0)) pygame.display.flip() hud = HUD(self.width, self.height) pred_hud = Igp2HUD(self.width, self.height, self.carla_sim.agents) ego = self.carla_sim.get_ego(self.ego_rolename) if ego is None: raise ValueError("Ego not found in simulation.") world = World(sim_world, ego, hud, pred_hud, self.gamma) controller = KeyboardControl(world) clock = pygame.time.Clock() return clock, world, display, controller
[docs] def run(self, steps: int = None): """ This method will initiate the simulation and begin its visualisation. Args: steps: The number of execution steps. If None, then execute indefinitely. """ world = None try: clock, world, display, controller = self.initialize() if steps is None: done = False while not done: done = self.step(clock, world, display, controller) else: for i in range(steps): self.step(clock, world, display, controller) finally: if world is not None: world.destroy() del world pygame.quit()
[docs] def step(self, clock: pygame.time.Clock, world: World, display: pygame.Surface, controller: KeyboardControl) -> bool: """ Take one step of the simulation. Internally calls the step method of the corresponding CarlaSim. """ self.carla_sim.world.tick() clock.tick_busy_loop(self.carla_sim.fps) self.carla_sim.step(tick=False) if self.carla_sim.get_ego(self.ego_rolename) is None: return True if controller.parse_events(self.carla_sim.client, world): return True world.tick(clock) world.render(display) pygame.display.flip() return False