import os import json import datetime import pathlib import time import cv2 import carla from collections import deque import torch import carla import numpy as np from PIL import Image from leaderboard.autoagents import autonomous_agent from transfuser.model import TransFuser from transfuser.config import GlobalConfig from transfuser.data import scale_and_crop_image, lidar_to_histogram_features, transform_2d_points from team_code.planner import RoutePlanner import math from matplotlib import cm SAVE_PATH = os.environ.get('SAVE_PATH', None) def get_entry_point(): return 'TransFuserAgent' class TransFuserAgent(autonomous_agent.AutonomousAgent): def setup(self, path_to_conf_file): self.lidar_processed = list() self.track = autonomous_agent.Track.SENSORS self.config_path = path_to_conf_file self.step = -1 self.wall_start = time.time() self.initialized = False self.input_buffer = {'rgb': deque(), 'rgb_left': deque(), 'rgb_right': deque(), 'rgb_rear': deque(), 'lidar': deque(), 'gps': deque(), 'thetas': deque()} self.config = GlobalConfig() self.net = TransFuser(self.config, 'cuda') self.net.load_state_dict(torch.load(os.path.join(path_to_conf_file, 'best_model.pth'))) self.net.cuda() self.net.eval() self.save_path = None if SAVE_PATH is not None: now = datetime.datetime.now() string = pathlib.Path(os.environ['ROUTES']).stem + '_' string += '_'.join(map(lambda x: '%02d' % x, (now.month, now.day, now.hour, now.minute, now.second))) print (string) self.save_path = pathlib.Path(os.environ['SAVE_PATH']) / string self.save_path.mkdir(parents=True, exist_ok=False) (self.save_path / 'rgb').mkdir(parents=True, exist_ok=False) (self.save_path / 'lidar_0').mkdir(parents=True, exist_ok=False) (self.save_path / 'lidar_1').mkdir(parents=True, exist_ok=False) (self.save_path / 'meta').mkdir(parents=True, exist_ok=False) def _init(self): self._route_planner = RoutePlanner(4.0, 50.0) self._route_planner.set_route(self._global_plan, True) self.initialized = True def _get_position(self, tick_data): gps = tick_data['gps'] gps = (gps - self._route_planner.mean) * self._route_planner.scale return gps def sensors(self): return [ { 'type': 'sensor.camera.rgb', 'x': 1.3, 'y': 0.0, 'z':2.3, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 'width': 400, 'height': 300, 'fov': 100, 'id': 'rgb' }, { 'type': 'sensor.camera.rgb', 'x': 1.3, 'y': 0.0, 'z':2.3, 'roll': 0.0, 'pitch': 0.0, 'yaw': -60.0, 'width': 400, 'height': 300, 'fov': 100, 'id': 'rgb_left' }, { 'type': 'sensor.camera.rgb', 'x': 1.3, 'y': 0.0, 'z':2.3, 'roll': 0.0, 'pitch': 0.0, 'yaw': 60.0, 'width': 400, 'height': 300, 'fov': 100, 'id': 'rgb_right' }, { 'type': 'sensor.camera.rgb', 'x': -1.3, 'y': 0.0, 'z':2.3, 'roll': 0.0, 'pitch': 0.0, 'yaw': -180.0, 'width': 400, 'height': 300, 'fov': 100, 'id': 'rgb_rear' }, { 'type': 'sensor.lidar.ray_cast', 'x': 1.3, 'y': 0.0, 'z': 2.5, 'roll': 0.0, 'pitch': 0.0, 'yaw': -90.0, 'id': 'lidar' }, { 'type': 'sensor.other.imu', 'x': 0.0, 'y': 0.0, 'z': 0.0, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 'sensor_tick': 0.05, 'id': 'imu' }, { 'type': 'sensor.other.gnss', 'x': 0.0, 'y': 0.0, 'z': 0.0, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 'sensor_tick': 0.01, 'id': 'gps' }, { 'type': 'sensor.speedometer', 'reading_frequency': 20, 'id': 'speed' } ] def tick(self, input_data): self.step += 1 rgb = cv2.cvtColor(input_data['rgb'][1][:, :, :3], cv2.COLOR_BGR2RGB) rgb_left = cv2.cvtColor(input_data['rgb_left'][1][:, :, :3], cv2.COLOR_BGR2RGB) rgb_right = cv2.cvtColor(input_data['rgb_right'][1][:, :, :3], cv2.COLOR_BGR2RGB) rgb_rear = cv2.cvtColor(input_data['rgb_rear'][1][:, :, :3], cv2.COLOR_BGR2RGB) gps = input_data['gps'][1][:2] speed = input_data['speed'][1]['speed'] compass = input_data['imu'][1][-1] if (math.isnan(compass) == True): #It can happen that the compass sends nan for a few frames compass = 0.0 lidar = input_data['lidar'][1][:, :3] result = { 'rgb': rgb, 'rgb_left': rgb_left, 'rgb_right': rgb_right, 'rgb_rear': rgb_rear, 'lidar': lidar, 'gps': gps, 'speed': speed, 'compass': compass, } pos = self._get_position(result) result['gps'] = pos next_wp, next_cmd = self._route_planner.run_step(pos) result['next_command'] = next_cmd.value theta = compass + np.pi/2 R = np.array([ [np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)] ]) local_command_point = np.array([next_wp[0]-pos[0], next_wp[1]-pos[1]]) local_command_point = R.T.dot(local_command_point) result['target_point'] = tuple(local_command_point) return result @torch.no_grad() def run_step(self, input_data, timestamp): if not self.initialized: self._init() tick_data = self.tick(input_data) if self.step < self.config.seq_len: rgb = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb']), crop=self.config.input_resolution)).unsqueeze(0) self.input_buffer['rgb'].append(rgb.to('cuda', dtype=torch.float32)) if not self.config.ignore_sides: rgb_left = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_left']), crop=self.config.input_resolution)).unsqueeze(0) self.input_buffer['rgb_left'].append(rgb_left.to('cuda', dtype=torch.float32)) rgb_right = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_right']), crop=self.config.input_resolution)).unsqueeze(0) self.input_buffer['rgb_right'].append(rgb_right.to('cuda', dtype=torch.float32)) if not self.config.ignore_rear: rgb_rear = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_rear']), crop=self.config.input_resolution)).unsqueeze(0) self.input_buffer['rgb_rear'].append(rgb_rear.to('cuda', dtype=torch.float32)) self.input_buffer['lidar'].append(tick_data['lidar']) self.input_buffer['gps'].append(tick_data['gps']) self.input_buffer['thetas'].append(tick_data['compass']) control = carla.VehicleControl() control.steer = 0.0 control.throttle = 0.0 control.brake = 0.0 return control gt_velocity = torch.FloatTensor([tick_data['speed']]).to('cuda', dtype=torch.float32) command = torch.FloatTensor([tick_data['next_command']]).to('cuda', dtype=torch.float32) tick_data['target_point'] = [torch.FloatTensor([tick_data['target_point'][0]]), torch.FloatTensor([tick_data['target_point'][1]])] target_point = torch.stack(tick_data['target_point'], dim=1).to('cuda', dtype=torch.float32) encoding = [] rgb = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb']), crop=self.config.input_resolution)).unsqueeze(0) self.input_buffer['rgb'].popleft() self.input_buffer['rgb'].append(rgb.to('cuda', dtype=torch.float32)) if not self.config.ignore_sides: rgb_left = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_left']), crop=self.config.input_resolution)).unsqueeze(0) self.input_buffer['rgb_left'].popleft() self.input_buffer['rgb_left'].append(rgb_left.to('cuda', dtype=torch.float32)) rgb_right = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_right']), crop=self.config.input_resolution)).unsqueeze(0) self.input_buffer['rgb_right'].popleft() self.input_buffer['rgb_right'].append(rgb_right.to('cuda', dtype=torch.float32)) if not self.config.ignore_rear: rgb_rear = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_rear']), crop=self.config.input_resolution)).unsqueeze(0) self.input_buffer['rgb_rear'].popleft() self.input_buffer['rgb_rear'].append(rgb_rear.to('cuda', dtype=torch.float32)) self.input_buffer['lidar'].popleft() self.input_buffer['lidar'].append(tick_data['lidar']) self.input_buffer['gps'].popleft() self.input_buffer['gps'].append(tick_data['gps']) self.input_buffer['thetas'].popleft() self.input_buffer['thetas'].append(tick_data['compass']) # transform the lidar point clouds to local coordinate frame ego_theta = self.input_buffer['thetas'][-1] ego_x, ego_y = self.input_buffer['gps'][-1] #Only predict every second step because we only get a LiDAR every second frame. if(self.step % 2 == 0 or self.step <= 4): for i, lidar_point_cloud in enumerate(self.input_buffer['lidar']): curr_theta = self.input_buffer['thetas'][i] curr_x, curr_y = self.input_buffer['gps'][i] lidar_point_cloud[:,1] *= -1 # inverts x, y lidar_transformed = transform_2d_points(lidar_point_cloud, np.pi/2-curr_theta, -curr_x, -curr_y, np.pi/2-ego_theta, -ego_x, -ego_y) lidar_transformed = torch.from_numpy(lidar_to_histogram_features(lidar_transformed, crop=self.config.input_resolution)).unsqueeze(0) self.lidar_processed = list() self.lidar_processed.append(lidar_transformed.to('cuda', dtype=torch.float32)) self.pred_wp = self.net(self.input_buffer['rgb'] + self.input_buffer['rgb_left'] + \ self.input_buffer['rgb_right']+self.input_buffer['rgb_rear'], \ self.lidar_processed, target_point, gt_velocity) steer, throttle, brake, metadata = self.net.control_pid(self.pred_wp, gt_velocity) self.pid_metadata = metadata if brake < 0.05: brake = 0.0 if throttle > brake: brake = 0.0 control = carla.VehicleControl() control.steer = float(steer) control.throttle = float(throttle) control.brake = float(brake) if SAVE_PATH is not None and self.step % 10 == 0: self.save(tick_data) return control def save(self, tick_data): frame = self.step // 10 Image.fromarray(tick_data['rgb']).save(self.save_path / 'rgb' / ('%04d.png' % frame)) Image.fromarray(cm.gist_earth(self.lidar_processed[0].cpu().numpy()[0, 0], bytes=True)).save(self.save_path / 'lidar_0' / ('%04d.png' % frame)) Image.fromarray(cm.gist_earth(self.lidar_processed[0].cpu().numpy()[0, 1], bytes=True)).save(self.save_path / 'lidar_1' / ('%04d.png' % frame)) outfile = open(self.save_path / 'meta' / ('%04d.json' % frame), 'w') json.dump(self.pid_metadata, outfile, indent=4) outfile.close() def destroy(self): del self.net