Source code for igp2.carlasim.carla_agent_wrapper
import numpy as np
import carla
from typing import Optional
from igp2.carlasim.local_planner import LocalPlanner, RoadOption
from igp2.core.vehicle import Observation
from igp2.core.agentstate import AgentState
from igp2.core.trajectory import VelocityTrajectory
from igp2.agents.agent import Agent
[docs]
class CarlaAgentWrapper:
""" Wrapper class that provides a simple way to retrieve control for the attached actor. """
def __init__(self, agent: Agent, actor: carla.Actor):
self.__agent = agent
self.__actor = actor
self.__name = self.__actor.attributes["role_name"]
self.__world = self.__actor.get_world()
self.__map = self.__world.get_map()
self.__local_planner = LocalPlanner(self.__actor, self.__world, self.__map,
dt=1.0 / self.__agent.fps)
self.__waypoints = [] # List of CARLA waypoints to follow
self.__current_ma = None
def __repr__(self):
return f"Actor {self.actor_id}; Agent {self.agent_id}"
[docs]
def next_control(self, observation: Observation) -> Optional[carla.VehicleControl]:
limited_observation = self.__apply_view_radius(observation)
action = self.__agent.next_action(limited_observation)
self.agent.vehicle.execute_action(action, observation.frame[self.agent_id])
if action is None or self.__agent.done(observation):
return None
if hasattr(self.agent, "current_macro"):
if self.__current_ma != self.agent.current_macro:
self.__current_ma = self.agent.current_macro
self.__trajectory_to_waypoints(self.__current_ma.get_trajectory())
self.__local_planner.set_global_plan(
self.__waypoints, stop_waypoint_creation=True, clean_queue=True)
target_speed = action.target_speed
self.__local_planner.set_speed(target_speed * 3.6)
return self.__local_planner.run_step()
[docs]
def done(self, observation: Observation) -> bool:
""" Returns whether the wrapped agent is done. """
return self.__agent.done(observation)
[docs]
def reset_waypoints(self):
self.__waypoints = []
def __apply_view_radius(self, observation: Observation):
if hasattr(self.agent, "view_radius"):
pos = observation.frame[self.agent_id].position
new_frame = {aid: state for aid, state in observation.frame.items()
if np.linalg.norm(pos - state.position) <= self.agent.view_radius}
return Observation(new_frame, observation.scenario_map)
return observation
def __trajectory_to_waypoints(self, trajectory: VelocityTrajectory):
self.__waypoints = []
for point in trajectory.path[:-1]:
wp = self.__map.get_waypoint(carla.Location(point[0], -point[1]))
wp = (wp, RoadOption.LANEFOLLOW)
assert wp is not None, f"Invalid waypoint found at {point}."
self.__waypoints.append(wp)
@property
def state(self) -> AgentState:
return self.agent.state
@property
def agent_id(self) -> int:
return self.__agent.agent_id
@property
def actor_id(self) -> int:
return self.__actor.id
@property
def actor(self) -> carla.Actor:
return self.__actor
@property
def agent(self) -> Agent:
return self.__agent
@property
def name(self):
""" The role name of the wrapped Actor. """
return self.__name