from collections import deque import numpy as np import torch from torch import nn import torch.nn.functional as F from torchvision import models class ImageCNN(nn.Module): """ Encoder network for image input list. Args: c_dim (int): output dimension of the latent embedding normalize (bool): whether the input images should be normalized """ def __init__(self, c_dim, normalize=True): super().__init__() self.normalize = normalize self.features = models.resnet34(pretrained=True) self.features.fc = nn.Sequential() def forward(self, inputs): c = 0 for x in inputs: if self.normalize: x = normalize_imagenet(x) c += self.features(x) return c def normalize_imagenet(x): """ Normalize input images according to ImageNet standards. Args: x (tensor): input images """ x = x.clone() x[:, 0] = (x[:, 0] - 0.485) / 0.229 x[:, 1] = (x[:, 1] - 0.456) / 0.224 x[:, 2] = (x[:, 2] - 0.406) / 0.225 return x class PIDController(object): def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20): self._K_P = K_P self._K_I = K_I self._K_D = K_D self._window = deque([0 for _ in range(n)], maxlen=n) self._max = 0.0 self._min = 0.0 def step(self, error): self._window.append(error) self._max = max(self._max, abs(error)) self._min = -abs(self._max) if len(self._window) >= 2: integral = np.mean(self._window) derivative = (self._window[-1] - self._window[-2]) else: integral = 0.0 derivative = 0.0 return self._K_P * error + self._K_I * integral + self._K_D * derivative class AIM(nn.Module): ''' Image encoder with waypoint output and pid controller ''' def __init__(self, config, device): super().__init__() self.pred_len = config.pred_len self.config = config self.device = device self.turn_controller = PIDController(K_P=config.turn_KP, K_I=config.turn_KI, K_D=config.turn_KD, n=config.turn_n) self.speed_controller = PIDController(K_P=config.speed_KP, K_I=config.speed_KI, K_D=config.speed_KD, n=config.speed_n) self.image_encoder = ImageCNN(512, normalize=True).to(self.device) self.join = nn.Sequential( nn.Linear(512, 256), nn.ReLU(inplace=True), nn.Linear(256, 128), nn.ReLU(inplace=True), nn.Linear(128, 64), nn.ReLU(inplace=True), ).to(self.device) self.decoder = nn.GRUCell(input_size=4, hidden_size=64).to(self.device) self.output = nn.Linear(64, 2).to(self.device) def forward(self, feature_emb, target_point): ''' Predicts future waypoints from image features and target point (goal location) Args: feature_emb (list): list of feature tensors target_point (tensor): goal location registered to ego-frame ''' feature_emb = sum(feature_emb) z = self.join(feature_emb) output_wp = list() # initial input variable to GRU x = torch.zeros(size=(z.shape[0], 2), dtype=z.dtype).to(self.device) # autoregressive generation of output waypoints for _ in range(self.pred_len): x_in = torch.cat([x, target_point], dim=1) z = self.decoder(x_in, z) dx = self.output(z) x = dx + x output_wp.append(x) pred_wp = torch.stack(output_wp, dim=1) return pred_wp def control_pid(self, waypoints, velocity): ''' Predicts vehicle control with a PID controller. Args: waypoints (tensor): predicted waypoints velocity (tensor): speedometer input ''' assert(waypoints.size(0)==1) waypoints = waypoints[0].data.cpu().numpy() # flip y is (forward is negative in our waypoints) waypoints[:,1] *= -1 speed = velocity[0].data.cpu().numpy() aim = (waypoints[1] + waypoints[0]) / 2.0 angle = np.degrees(np.pi / 2 - np.arctan2(aim[1], aim[0])) / 90 steer = self.turn_controller.step(angle) steer = np.clip(steer, -1.0, 1.0) desired_speed = np.linalg.norm(waypoints[0] - waypoints[1]) * 2.0 brake = desired_speed < self.config.brake_speed or (speed / desired_speed) > self.config.brake_ratio delta = np.clip(desired_speed - speed, 0.0, self.config.clip_delta) throttle = self.speed_controller.step(delta) throttle = np.clip(throttle, 0.0, self.config.max_throttle) throttle = throttle if not brake else 0.0 metadata = { 'speed': float(speed.astype(np.float64)), 'steer': float(steer), 'throttle': float(throttle), 'brake': float(brake), 'wp_2': tuple(waypoints[1].astype(np.float64)), 'wp_1': tuple(waypoints[0].astype(np.float64)), 'desired_speed': float(desired_speed.astype(np.float64)), 'angle': float(angle.astype(np.float64)), 'aim': tuple(aim.astype(np.float64)), 'delta': float(delta.astype(np.float64)), } return steer, throttle, brake, metadata