-
Aditya Prakash authoredAditya Prakash authored
aim_agent.py 8.14 KiB
import os
import json
import datetime
import pathlib
import time
import cv2
import carla
from collections import deque
import torch
import carla
import numpy as np
from PIL import Image
from leaderboard.autoagents import autonomous_agent
from aim.model import AIM
from aim.config import GlobalConfig
from aim.data import scale_and_crop_image
from team_code.planner import RoutePlanner
SAVE_PATH = os.environ.get('SAVE_PATH', None)
def get_entry_point():
return 'AIMAgent'
class AIMAgent(autonomous_agent.AutonomousAgent):
def setup(self, path_to_conf_file):
self.track = autonomous_agent.Track.SENSORS
self.config_path = path_to_conf_file
self.step = -1
self.wall_start = time.time()
self.initialized = False
self.input_buffer = {'rgb': deque(), 'rgb_left': deque(), 'rgb_right': deque(),
'rgb_rear': deque(), 'gps': deque(), 'thetas': deque()}
self.config = GlobalConfig()
self.net = AIM(self.config, 'cuda')
self.net.load_state_dict(torch.load(os.path.join(path_to_conf_file, 'best_model.pth')))
self.net.cuda()
self.net.eval()
self.save_path = None
if SAVE_PATH is not None:
now = datetime.datetime.now()
string = pathlib.Path(os.environ['ROUTES']).stem + '_'
string += '_'.join(map(lambda x: '%02d' % x, (now.month, now.day, now.hour, now.minute, now.second)))
print (string)
self.save_path = pathlib.Path(os.environ['SAVE_PATH']) / string
self.save_path.mkdir(parents=True, exist_ok=False)
(self.save_path / 'rgb').mkdir()
(self.save_path / 'meta').mkdir()
def _init(self):
self._route_planner = RoutePlanner(4.0, 50.0)
self._route_planner.set_route(self._global_plan, True)
self.initialized = True
def _get_position(self, tick_data):
gps = tick_data['gps']
gps = (gps - self._route_planner.mean) * self._route_planner.scale
return gps
def sensors(self):
return [
{
'type': 'sensor.camera.rgb',
'x': 1.3, 'y': 0.0, 'z':2.3,
'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0,
'width': 400, 'height': 300, 'fov': 100,
'id': 'rgb'
},
{
'type': 'sensor.camera.rgb',
'x': 1.3, 'y': 0.0, 'z':2.3,
'roll': 0.0, 'pitch': 0.0, 'yaw': -60.0,
'width': 400, 'height': 300, 'fov': 100,
'id': 'rgb_left'
},
{
'type': 'sensor.camera.rgb',
'x': 1.3, 'y': 0.0, 'z':2.3,
'roll': 0.0, 'pitch': 0.0, 'yaw': 60.0,
'width': 400, 'height': 300, 'fov': 100,
'id': 'rgb_right'
},
{
'type': 'sensor.camera.rgb',
'x': -1.3, 'y': 0.0, 'z':2.3,
'roll': 0.0, 'pitch': 0.0, 'yaw': -180.0,
'width': 400, 'height': 300, 'fov': 100,
'id': 'rgb_rear'
},
{
'type': 'sensor.other.imu',
'x': 0.0, 'y': 0.0, 'z': 0.0,
'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0,
'sensor_tick': 0.05,
'id': 'imu'
},
{
'type': 'sensor.other.gnss',
'x': 0.0, 'y': 0.0, 'z': 0.0,
'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0,
'sensor_tick': 0.01,
'id': 'gps'
},
{
'type': 'sensor.speedometer',
'reading_frequency': 20,
'id': 'speed'
}
]
def tick(self, input_data):
self.step += 1
rgb = cv2.cvtColor(input_data['rgb'][1][:, :, :3], cv2.COLOR_BGR2RGB)
rgb_left = cv2.cvtColor(input_data['rgb_left'][1][:, :, :3], cv2.COLOR_BGR2RGB)
rgb_right = cv2.cvtColor(input_data['rgb_right'][1][:, :, :3], cv2.COLOR_BGR2RGB)
rgb_rear = cv2.cvtColor(input_data['rgb_rear'][1][:, :, :3], cv2.COLOR_BGR2RGB)
gps = input_data['gps'][1][:2]
speed = input_data['speed'][1]['speed']
compass = input_data['imu'][1][-1]
result = {
'rgb': rgb,
'rgb_left': rgb_left,
'rgb_right': rgb_right,
'rgb_rear': rgb_rear,
'gps': gps,
'speed': speed,
'compass': compass,
}
pos = self._get_position(result)
result['gps'] = pos
next_wp, next_cmd = self._route_planner.run_step(pos)
result['next_command'] = next_cmd.value
theta = compass + np.pi/2
R = np.array([
[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]
])
local_command_point = np.array([next_wp[0]-pos[0], next_wp[1]-pos[1]])
local_command_point = R.T.dot(local_command_point)
result['target_point'] = tuple(local_command_point)
return result
@torch.no_grad()
def run_step(self, input_data, timestamp):
if not self.initialized:
self._init()
tick_data = self.tick(input_data)
if self.step < self.config.seq_len:
rgb = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb']), scale=self.config.scale, crop=self.config.input_resolution)).unsqueeze(0)
self.input_buffer['rgb'].append(rgb.to('cuda', dtype=torch.float32))
if not self.config.ignore_sides:
rgb_left = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_left']), scale=self.config.scale, crop=self.config.input_resolution)).unsqueeze(0)
self.input_buffer['rgb_left'].append(rgb_left.to('cuda', dtype=torch.float32))
rgb_right = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_right']), scale=self.config.scale, crop=self.config.input_resolution)).unsqueeze(0)
self.input_buffer['rgb_right'].append(rgb_right.to('cuda', dtype=torch.float32))
if not self.config.ignore_rear:
rgb_rear = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_rear']), scale=self.config.scale, crop=self.config.input_resolution)).unsqueeze(0)
self.input_buffer['rgb_rear'].append(rgb_rear.to('cuda', dtype=torch.float32))
control = carla.VehicleControl()
control.steer = 0.0
control.throttle = 0.0
control.brake = 0.0
return control
gt_velocity = torch.FloatTensor([tick_data['speed']]).to('cuda', dtype=torch.float32)
command = torch.FloatTensor([tick_data['next_command']]).to('cuda', dtype=torch.float32)
tick_data['target_point'] = [torch.FloatTensor([tick_data['target_point'][0]]),
torch.FloatTensor([tick_data['target_point'][1]])]
target_point = torch.stack(tick_data['target_point'], dim=1).to('cuda', dtype=torch.float32)
encoding = []
rgb = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb']), scale=self.config.scale, crop=self.config.input_resolution)).unsqueeze(0)
self.input_buffer['rgb'].popleft()
self.input_buffer['rgb'].append(rgb.to('cuda', dtype=torch.float32))
encoding.append(self.net.image_encoder(list(self.input_buffer['rgb'])))
if not self.config.ignore_sides:
rgb_left = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_left']), scale=self.config.scale, crop=self.config.input_resolution)).unsqueeze(0)
self.input_buffer['rgb_left'].popleft()
self.input_buffer['rgb_left'].append(rgb_left.to('cuda', dtype=torch.float32))
encoding.append(self.net.image_encoder(list(self.input_buffer['rgb_left'])))
rgb_right = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_right']), scale=self.config.scale, crop=self.config.input_resolution)).unsqueeze(0)
self.input_buffer['rgb_right'].popleft()
self.input_buffer['rgb_right'].append(rgb_right.to('cuda', dtype=torch.float32))
encoding.append(self.net.image_encoder(list(self.input_buffer['rgb_right'])))
if not self.config.ignore_rear:
rgb_rear = torch.from_numpy(scale_and_crop_image(Image.fromarray(tick_data['rgb_rear']), scale=self.config.scale, crop=self.config.input_resolution)).unsqueeze(0)
self.input_buffer['rgb_rear'].popleft()
self.input_buffer['rgb_rear'].append(rgb_rear.to('cuda', dtype=torch.float32))
encoding.append(self.net.image_encoder(list(self.input_buffer['rgb_rear'])))
pred_wp = self.net(encoding, target_point)
steer, throttle, brake, metadata = self.net.control_pid(pred_wp, gt_velocity)
self.pid_metadata = metadata
if brake < 0.05: brake = 0.0
if throttle > brake: brake = 0.0
control = carla.VehicleControl()
control.steer = float(steer)
control.throttle = float(throttle)
control.brake = float(brake)
if SAVE_PATH is not None and self.step % 10 == 0:
self.save(tick_data)
return control
def save(self, tick_data):
frame = self.step // 10
Image.fromarray(tick_data['rgb']).save(self.save_path / 'rgb' / ('%04d.png' % frame))
outfile = open(self.save_path / 'meta' / ('%04d.json' % frame), 'w')
json.dump(self.pid_metadata, outfile, indent=4)
outfile.close()
def destroy(self):
del self.net