diff --git a/controls/trajectory_adapter.py b/controls/trajectory_adapter.py new file mode 100644 index 0000000000000000000000000000000000000000..244d1cc15ecde21d608f7003223678b7e2abab8c --- /dev/null +++ b/controls/trajectory_adapter.py @@ -0,0 +1,33 @@ + + +class TrajectoryFollowingController(object): + + def __init__(self, controller, update_controller_fcn): + """ + + :param controller: low-level controller + :param update_controller_fcn: updates controllers' setpoints, + given the parameters at the current time z(t) + """ + self.controller = controller + self.update_controller_fcn = update_controller_fcn + self.z_spline = None + self.t_max = 0 + + def set_setpoint(self, z_spline, t_max=0): + """ + Set trajectory to follow + :param z_spline: t -> z(t) | parameters given to controller + as a function of time + :param t_max: final time for which spline is parameterized + :return: + """ + self.z_spline = z_spline + self.t_max = t_max + + def __call__(self, x, t, **kwargs): + t_spline = self.t_max if 0 < self.t_max < t else t + z_t = self.z_spline(t_spline) + self.update_controller_fcn(self.controller, z_t) + u = self.controller(x, t, **kwargs) + return u diff --git a/examples/display_prm_pathing.py b/examples/display_prm_pathing.py new file mode 100644 index 0000000000000000000000000000000000000000..bda16100aa43f000ca54b63aa256f1b1953ac4ea --- /dev/null +++ b/examples/display_prm_pathing.py @@ -0,0 +1,52 @@ +import numpy as np +from planners import prm +from scene_elements.scene import Scene +from scene_elements.sphere import Sphere +from scene_elements.water import Water +from scene_elements.markers import Markers +from time import time + + +def main(): + seed = np.random.randint(1000) + # seed = 82 + np.random.seed(seed) + print('seed: {}'.format(seed)) + + n_obstacles = 20 + obstacle_xy = np.random.rand(n_obstacles, 2) * np.array([10, 8]) + \ + np.array([-5., 2]) + x_0 = np.array([0, 0.]) + x_goal = np.array([0, 12]) + t_0 = time() + path_vertices = prm.prm( + x_0, x_goal, + obstacle_xy, np.ones((n_obstacles,)), + sample_kwargs=dict(n_samples=8, mu_r=1.6, sig_r=.2), + ) + print('planning time elapsed: {:.3f}s'.format(time() - t_0)) + + colors = ['r', 'y', 'blue', 'black', 'green'] + scene = Scene( + [Sphere(obstacle_xy[i], color=np.random.choice(colors)) + for i in range(obstacle_xy.shape[0])] + + [Water()] + + [Markers(path_vertices)] + ) + + import matplotlib.pyplot as plt + + fig, ax = plt.subplots() + ax.set_xticks(np.mgrid[-50:50:1]) + ax.set_yticks(np.mgrid[-50:50:1]) + ax.set_xlim([-7, 7]) + ax.set_ylim([-1, 13]) + ax.set_aspect('equal') + plt.grid(True) + ax.set_axisbelow(True) + scene.draw_top_view(ax) + plt.show() + + +if __name__ == '__main__': + main() diff --git a/examples/display_prm_trajectory_following.py b/examples/display_prm_trajectory_following.py new file mode 100644 index 0000000000000000000000000000000000000000..f67b174112a4dd442b7ea5de3bada858344430ad --- /dev/null +++ b/examples/display_prm_trajectory_following.py @@ -0,0 +1,143 @@ +import numpy as np +from planners import prm +from misc import interpolation as itp +from scipy.interpolate import InterpolatedUnivariateSpline +from controls import sdre +from controls import trajectory_adapter + +from rigid_body_models import boat +from state_space_models import empirical_drag +from controls import stepping +from vis import planar_thruster_model + +from scene_elements.scene import Scene +from scene_elements.sphere import Sphere +from scene_elements.water import Water +from scene_elements.markers import Markers +from scene_elements.camera import Camera + + +def build_camera(): + cam_f = 800. + cam_rows = 480 + cam_cols = 640 + cam_K = np.array([ + [cam_f, 0, cam_rows/2, 0], + [0, cam_f, cam_cols/2, 0], + [0, 0, 1., 0], + ]) + cam_Rt = np.array([ + [0, 0, 1, -.2], + [0, -1, 0, 0], + [1, 0, 0, 0], + [0, 0, 0, 1], + ]) + camera = Camera(K=cam_K, Rt=cam_Rt, cam_cols=cam_cols, cam_rows=cam_rows) + return camera + + +def make_obstacles(n): + xy = np.random.rand(n, 2) * np.array([10, 8]) + np.array([-5., 2]) + return xy + + +def build_parameter_spline( + path_vertices, x1, Q0_diag, Q1_diag, + cumulative_dist, v_nominal): + """ + Of course include shortest path found by prm as time-dependent parameters, + but also modify state-weights Q over time. + In this case we care less about the particular orientation or speed at + all but the terminal state. + - for slightly more realistic pointing constraints, + set target orientations to that of next position + """ + n_state = Q0_diag.size + z_t_points = np.zeros((path_vertices.shape[0], n_state + n_state + 1)) + z_t_points[:, :2] = path_vertices + z_t_points[-1, 2:n_state] = x1[2:] + z_t_points[:-1, n_state:-1] = Q0_diag + z_t_points[-1, n_state:-1] = Q1_diag + z_t_points[1:, -1] = cumulative_dist / v_nominal + np.arctan2( + z_t_points[1:, 1] - z_t_points[:-1, 1], + z_t_points[1:, 0] - z_t_points[:-1, 0], + out=z_t_points[:-1, 4], + ) + z_t_points[:, 4] = itp.order_circular_1d(z_t_points[:, 4], 2*np.pi) + z_spline = itp.build_nd_interpolator( + z_t_points[:, :-1], z_t_points[:, -1], + InterpolatedUnivariateSpline, k=1, + ) + t_max = z_t_points[-1, -1] + return z_spline, t_max + + +def main(): + seed = np.random.randint(1000) + # seed = 82 + np.random.seed(seed) + print('seed: {}'.format(seed)) + + n_obstacles = 20 + obstacle_xy = make_obstacles(n_obstacles) + x0 = np.array([0, 0, 0, 0, np.pi / 2, 0]) + x1 = np.array([0, 12, 0, 0, np.pi / 2, 0]) + n_state = x0.size + path_vertices = prm.prm( + x0[:2], x1[:2], + obstacle_xy, np.ones((n_obstacles,)), + sample_kwargs=dict(n_samples=8, mu_r=1.6, sig_r=.2), + ) + cumulative_dist = np.linalg.norm( + path_vertices[1:] - path_vertices[:-1], axis=-1).cumsum() + v_nominal = 1. + + eps = 1e-2 + Q0_diag = np.array([1., 1, eps, eps, 2, 1]) * 1. + Q1_diag = np.array([1., 1, 1, 1, 1, 2]) * 1. + z_spline, t_max = build_parameter_spline( + path_vertices, x1, Q0_diag, Q1_diag, cumulative_dist, v_nominal) + R = np.diag([1., 1, 1, 1]) * 1. + sdre_controller = sdre.SdreController(np.diag(Q0_diag), R) + + def controller_update_fcn(controller, z_t): + x_goal_t = z_t[:n_state] + Qt_diag = z_t[n_state:] + controller.set_setpoint(x_goal_t) + controller.Q = np.diag(Qt_diag) + + traj_controller = trajectory_adapter.TrajectoryFollowingController( + sdre_controller, controller_update_fcn) + traj_controller.set_setpoint(z_spline, t_max=t_max) + + boat_model = boat.DiamondMountedThrusters() + cts_sim_model = empirical_drag.EmpiricalDragCts( + boat_model, + np.array([.1, .5]) * 1., + np.array([.5, .1]) * 1., + ) + DT = 0.1 + T = t_max + 1 + x, u = stepping.solve_cts_eqns( + cts_sim_model, x0, traj_controller, DT, T, + cts_sim_model=cts_sim_model, + ) + np.set_printoptions(suppress=True) + print(x.T.round(2)) + print(u.T.round(2)) + + colors = ['r', 'y', 'blue', 'black', 'green'] + scene = Scene( + [Sphere(obstacle_xy[i], color=np.random.choice(colors)) + for i in range(obstacle_xy.shape[0])] + + [Water()] + + [Markers(path_vertices)] + ) + camera = build_camera() + planar_thruster_model.loop_camera_sim( + boat_model, x.T[:, [0, 1]], x.T[:, 4], u.T, DT, scene, camera) + + +if __name__ == '__main__': + main() diff --git a/misc/interpolation.py b/misc/interpolation.py index 4d54218850f2afae3d7b22044ce94f1dcf48d064..effd2ea35f7bf1e20c52461c77e5bc94830d4e98 100644 --- a/misc/interpolation.py +++ b/misc/interpolation.py @@ -15,3 +15,41 @@ def build_nd_interpolator(x, t, interp_builder_fcn, **kwargs): def spl(t_): return np.array([spl_i(t_) for spl_i in spl_1d_list]).T return spl + + +def order_circular_1d(x, c): + """ + Process sequence of circular values so that euclidean space + interpolation will perform the "shortest path" interpolation + eg: x = [359 1 5 1 359] + -> y = [359 360+1 360+5 359] + :param x: n, | values in circular space (eg S^1) to interpolate + :param c: period of circular space [0, c) (eg 2pi) + :return: + """ + y = x % c # ensure y[0] \in [0, c) + # y = x.copy() # preserve first value in sequence + b = np.array([-c, 0, c]) + for i in range(1, y.size): + y[i:] = y[i:] % c + c * (y[i-1] // c) + ind = np.argmin(np.abs(y[i] + b - y[i-1])) + y[i] += b[ind] + return y + + +def _example_order_circular_1d(): + x = np.array([359, 1, 5, 1, 359]) + # [359 361 365 361 359] + print(order_circular_1d(x, 360)) + + x = np.array([359, 1, 179, 182, 1]) + # [359 361 539 542 721] + print(order_circular_1d(x, 360)) + + x = np.array([-1-360, 1, 5, 1, 359]) + # [-361 -359 -355 -359 -361] + print(order_circular_1d(x, 360)) + + +if __name__ == '__main__': + _example_order_circular_1d() diff --git a/misc/projection.py b/misc/projection.py new file mode 100644 index 0000000000000000000000000000000000000000..b1b7d4be7a1f74434b7bd2f867c04516a2dd74c6 --- /dev/null +++ b/misc/projection.py @@ -0,0 +1,22 @@ +import numpy as np + + +def pt2line_min_dists(a, b0, b1): + """ + Find distance of each point to each line segment + :param a: n, d | n points in \reals^d + :param b0: k, d | start points of k lines + :param b1: k, d | end points of k lines + :return: dists: n, k | [i, j] = distance of a[i] to line b0[j]-b1[j] + """ + line_vectors = b1 - b0 # k, d + pt2start_vectors = a - b0[:, np.newaxis] # k, n, d + dots = (line_vectors[:, np.newaxis] * pt2start_vectors).sum(axis=2) # k, n + norms = (line_vectors ** 2).sum(axis=1) # k, + norms[norms < 1e-8] = 1e-8 + t = dots.T/norms # n, k + np.clip(t, 0, 1, out=t) + proj = b0.T + (t[:, np.newaxis] * line_vectors.T) # n, 2, k + rej = a[:, :, np.newaxis] - proj # n, 2, k + sq_dists = (rej ** 2).sum(axis=1) # n, k + return np.sqrt(sq_dists) diff --git a/planners/__init__.py b/planners/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/planners/prm.py b/planners/prm.py new file mode 100644 index 0000000000000000000000000000000000000000..95ecb39611f9d0c6250261eaed7e3a4daab5efe4 --- /dev/null +++ b/planners/prm.py @@ -0,0 +1,117 @@ +""" +Implements Probabilistic Road Map algorithm for global planning in euclidean space. + +This basically does: +0) sample attainable configurations in the world +1) make a weighted graph describing cost of moving between the configurations +2) apply shortest path algorithm to find lowest weight path between given graph vertices + +The version here does not maintain state, and cannot be iteratively updated. +""" +import numpy as np +from misc import projection as prj +import scipy.sparse as sp + + +def sample_near_obstacles_radially(obs, n_samples=4, mu_r=1., sig_r=.2): + """ + Around each obstacle sample n_sample positions \in \reals^d. + Set each sample to be at a distance ~ N(mu_r, sig_r^2) from center. + - avoid dividing by zero (though, this chance is a.s. zero) + :param obs: n, d | n obstacle centers in d dimensions + :param n_samples: + :param mu_r: + :param sig_r: + :return: x: n_samples*n, d | sampled positions + """ + n, d = obs.shape + x = np.random.randn(n_samples, n, d) + norms = np.linalg.norm(x, axis=-1, keepdims=True) + zero_mask = norms < 1e-8 + norms[zero_mask] = 1. + x /= norms + unit_vec = np.zeros((d,)) + unit_vec[0] = 1. + x[zero_mask[..., 0]] = unit_vec + r = np.random.randn(*norms.shape) * sig_r + mu_r + x *= r + x += obs + return x.reshape(n_samples * n, d) + + +def make_complete_graph(x): + """ + Connect all points with edges with weight given by + euclidean distance. + :param x: n, d | n points \in \reals^d + :return: g: n, n | adjacency matrix for undirected weighted graph + - [i, j] = distance(x[i], x[j]) + """ + dif = x[np.newaxis, ...] - x[:, np.newaxis] + g = np.linalg.norm(dif, axis=-1) + return g + + +def remove_colliding_edges(g, vertices, obs, obs_r): + """ + Set edges in given graph to zero for edges that collide with obstacles. + :param g: n, n | adjacency matrix for undirected weighted graph + :param vertices: n, d | n points in \reals^d + :param obs: m, d | obstacle positions + :param obs_r: m, | obstacle radii - used for removing graph edges + :return: + modify graph by setting colliding distances to zero (= no-edge) + """ + n = vertices.shape[0] + i_ind, j_ind = np.mgrid[:n, :n] + i_ind, j_ind = i_ind.ravel(), j_ind.ravel() + mask = i_ind < j_ind + i_ind = i_ind[mask] # k, + j_ind = j_ind[mask] + dists = prj.pt2line_min_dists(obs, vertices[i_ind], vertices[j_ind]) # m, k + invalid_mask = np.any(dists.T < obs_r, axis=1) + g[i_ind[invalid_mask], j_ind[invalid_mask]] = 0 + g[g.T == 0] = 0 + + +def get_path_list(predecessors, i, j): + """ + Retrieve shortest path i->j from predecessors list + :param predecessors: n, | predecessor indices for for graph with n vertices + :param i: index of starting point in graph + :param j: index of end point in graph + :return: path_inds | list of indices [i, ..., j] + """ + path_inds = [] + while j != i: + path_inds.append(j) + j = predecessors[j] + path_inds.append(i) + return path_inds[::-1] + + +def prm(x, y, obs, obs_r, sample_kwargs=None, graph_fcn=make_complete_graph): + """ + + :param x: d, | start position in euclidean space + :param y: d, | end position + :param obs: m, d | obstacle positions + :param obs_r: m, | obstacle radii - used for removing graph edges + :param sample_kwargs: + :param graph_fcn: n, d -> n, n | builds adjacency matrix + for undirected weighted graph + :return: + path_vertices: k, d | vertices forming shortest path found + - [0] = x, [-1] = y + """ + sample_kwargs = sample_kwargs or {} + vertices = sample_near_obstacles_radially(obs, **sample_kwargs) + vertices = np.vstack((x, vertices, y)) + n = vertices.shape[0] + g = graph_fcn(vertices) + remove_colliding_edges(g, vertices, obs, obs_r) + res = sp.csgraph.dijkstra( + csgraph=g, directed=False, indices=0, return_predecessors=True) + dist_matrix, predecessors = res + path_inds = get_path_list(predecessors, 0, n - 1) + return vertices[path_inds] diff --git a/scene_elements/markers.py b/scene_elements/markers.py new file mode 100644 index 0000000000000000000000000000000000000000..ef81a2c71261121a14bbd4ce30f661e2b01e4767 --- /dev/null +++ b/scene_elements/markers.py @@ -0,0 +1,19 @@ +import numpy as np + + +class Markers(object): + + def __init__(self, xy, kwargs=None): + self.xy = np.asarray(xy) + kwargs = kwargs or dict( + color='magenta', marker='^', linestyle='dashed', + linewidth=2, markersize=5, alpha=0.8, + zorder=10, + ) + self.kwargs = kwargs + + def draw_top_view(self, ax): + ax.plot(self.xy[:, 0], self.xy[:, 1], **self.kwargs) + + def draw_image_view(self, ax, camera, zorder=0): + pass