Skip to content
Snippets Groups Projects
Commit 38ca9953 authored by andersct's avatar andersct
Browse files

Add global planning

- add probabilistic road map algorithm
- add point to line distance calculation
- add marker objects for overlaying path in scene
- add example to display prm
- add adapter for low level controllers to follow trajectories
- add processing to interpolate values on circular space
- add example for trajectory following
parent 9db0348a
No related branches found
No related tags found
No related merge requests found
class TrajectoryFollowingController(object):
def __init__(self, controller, update_controller_fcn):
:param controller: low-level controller
:param update_controller_fcn: updates controllers' setpoints,
given the parameters at the current time z(t)
self.controller = controller
self.update_controller_fcn = update_controller_fcn
self.z_spline = None
self.t_max = 0
def set_setpoint(self, z_spline, t_max=0):
Set trajectory to follow
:param z_spline: t -> z(t) | parameters given to controller
as a function of time
:param t_max: final time for which spline is parameterized
self.z_spline = z_spline
self.t_max = t_max
def __call__(self, x, t, **kwargs):
t_spline = self.t_max if 0 < self.t_max < t else t
z_t = self.z_spline(t_spline)
self.update_controller_fcn(self.controller, z_t)
u = self.controller(x, t, **kwargs)
return u
import numpy as np
from planners import prm
from scene_elements.scene import Scene
from scene_elements.sphere import Sphere
from scene_elements.water import Water
from scene_elements.markers import Markers
from time import time
def main():
seed = np.random.randint(1000)
# seed = 82
print('seed: {}'.format(seed))
n_obstacles = 20
obstacle_xy = np.random.rand(n_obstacles, 2) * np.array([10, 8]) + \
np.array([-5., 2])
x_0 = np.array([0, 0.])
x_goal = np.array([0, 12])
t_0 = time()
path_vertices = prm.prm(
x_0, x_goal,
obstacle_xy, np.ones((n_obstacles,)),
sample_kwargs=dict(n_samples=8, mu_r=1.6, sig_r=.2),
print('planning time elapsed: {:.3f}s'.format(time() - t_0))
colors = ['r', 'y', 'blue', 'black', 'green']
scene = Scene(
[Sphere(obstacle_xy[i], color=np.random.choice(colors))
for i in range(obstacle_xy.shape[0])]
+ [Water()]
+ [Markers(path_vertices)]
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.set_xlim([-7, 7])
ax.set_ylim([-1, 13])
if __name__ == '__main__':
import numpy as np
from planners import prm
from misc import interpolation as itp
from scipy.interpolate import InterpolatedUnivariateSpline
from controls import sdre
from controls import trajectory_adapter
from rigid_body_models import boat
from state_space_models import empirical_drag
from controls import stepping
from vis import planar_thruster_model
from scene_elements.scene import Scene
from scene_elements.sphere import Sphere
from scene_elements.water import Water
from scene_elements.markers import Markers
from import Camera
def build_camera():
cam_f = 800.
cam_rows = 480
cam_cols = 640
cam_K = np.array([
[cam_f, 0, cam_rows/2, 0],
[0, cam_f, cam_cols/2, 0],
[0, 0, 1., 0],
cam_Rt = np.array([
[0, 0, 1, -.2],
[0, -1, 0, 0],
[1, 0, 0, 0],
[0, 0, 0, 1],
camera = Camera(K=cam_K, Rt=cam_Rt, cam_cols=cam_cols, cam_rows=cam_rows)
return camera
def make_obstacles(n):
xy = np.random.rand(n, 2) * np.array([10, 8]) + np.array([-5., 2])
return xy
def build_parameter_spline(
path_vertices, x1, Q0_diag, Q1_diag,
cumulative_dist, v_nominal):
Of course include shortest path found by prm as time-dependent parameters,
but also modify state-weights Q over time.
In this case we care less about the particular orientation or speed at
all but the terminal state.
- for slightly more realistic pointing constraints,
set target orientations to that of next position
n_state = Q0_diag.size
z_t_points = np.zeros((path_vertices.shape[0], n_state + n_state + 1))
z_t_points[:, :2] = path_vertices
z_t_points[-1, 2:n_state] = x1[2:]
z_t_points[:-1, n_state:-1] = Q0_diag
z_t_points[-1, n_state:-1] = Q1_diag
z_t_points[1:, -1] = cumulative_dist / v_nominal
z_t_points[1:, 1] - z_t_points[:-1, 1],
z_t_points[1:, 0] - z_t_points[:-1, 0],
out=z_t_points[:-1, 4],
z_t_points[:, 4] = itp.order_circular_1d(z_t_points[:, 4], 2*np.pi)
z_spline = itp.build_nd_interpolator(
z_t_points[:, :-1], z_t_points[:, -1],
InterpolatedUnivariateSpline, k=1,
t_max = z_t_points[-1, -1]
return z_spline, t_max
def main():
seed = np.random.randint(1000)
# seed = 82
print('seed: {}'.format(seed))
n_obstacles = 20
obstacle_xy = make_obstacles(n_obstacles)
x0 = np.array([0, 0, 0, 0, np.pi / 2, 0])
x1 = np.array([0, 12, 0, 0, np.pi / 2, 0])
n_state = x0.size
path_vertices = prm.prm(
x0[:2], x1[:2],
obstacle_xy, np.ones((n_obstacles,)),
sample_kwargs=dict(n_samples=8, mu_r=1.6, sig_r=.2),
cumulative_dist = np.linalg.norm(
path_vertices[1:] - path_vertices[:-1], axis=-1).cumsum()
v_nominal = 1.
eps = 1e-2
Q0_diag = np.array([1., 1, eps, eps, 2, 1]) * 1.
Q1_diag = np.array([1., 1, 1, 1, 1, 2]) * 1.
z_spline, t_max = build_parameter_spline(
path_vertices, x1, Q0_diag, Q1_diag, cumulative_dist, v_nominal)
R = np.diag([1., 1, 1, 1]) * 1.
sdre_controller = sdre.SdreController(np.diag(Q0_diag), R)
def controller_update_fcn(controller, z_t):
x_goal_t = z_t[:n_state]
Qt_diag = z_t[n_state:]
controller.Q = np.diag(Qt_diag)
traj_controller = trajectory_adapter.TrajectoryFollowingController(
sdre_controller, controller_update_fcn)
traj_controller.set_setpoint(z_spline, t_max=t_max)
boat_model = boat.DiamondMountedThrusters()
cts_sim_model = empirical_drag.EmpiricalDragCts(
np.array([.1, .5]) * 1.,
np.array([.5, .1]) * 1.,
DT = 0.1
T = t_max + 1
x, u = stepping.solve_cts_eqns(
cts_sim_model, x0, traj_controller, DT, T,
colors = ['r', 'y', 'blue', 'black', 'green']
scene = Scene(
[Sphere(obstacle_xy[i], color=np.random.choice(colors))
for i in range(obstacle_xy.shape[0])]
+ [Water()]
+ [Markers(path_vertices)]
camera = build_camera()
boat_model, x.T[:, [0, 1]], x.T[:, 4], u.T, DT, scene, camera)
if __name__ == '__main__':
......@@ -15,3 +15,41 @@ def build_nd_interpolator(x, t, interp_builder_fcn, **kwargs):
def spl(t_):
return np.array([spl_i(t_) for spl_i in spl_1d_list]).T
return spl
def order_circular_1d(x, c):
Process sequence of circular values so that euclidean space
interpolation will perform the "shortest path" interpolation
eg: x = [359 1 5 1 359]
-> y = [359 360+1 360+5 359]
:param x: n, | values in circular space (eg S^1) to interpolate
:param c: period of circular space [0, c) (eg 2pi)
y = x % c # ensure y[0] \in [0, c)
# y = x.copy() # preserve first value in sequence
b = np.array([-c, 0, c])
for i in range(1, y.size):
y[i:] = y[i:] % c + c * (y[i-1] // c)
ind = np.argmin(np.abs(y[i] + b - y[i-1]))
y[i] += b[ind]
return y
def _example_order_circular_1d():
x = np.array([359, 1, 5, 1, 359])
# [359 361 365 361 359]
print(order_circular_1d(x, 360))
x = np.array([359, 1, 179, 182, 1])
# [359 361 539 542 721]
print(order_circular_1d(x, 360))
x = np.array([-1-360, 1, 5, 1, 359])
# [-361 -359 -355 -359 -361]
print(order_circular_1d(x, 360))
if __name__ == '__main__':
import numpy as np
def pt2line_min_dists(a, b0, b1):
Find distance of each point to each line segment
:param a: n, d | n points in \reals^d
:param b0: k, d | start points of k lines
:param b1: k, d | end points of k lines
:return: dists: n, k | [i, j] = distance of a[i] to line b0[j]-b1[j]
line_vectors = b1 - b0 # k, d
pt2start_vectors = a - b0[:, np.newaxis] # k, n, d
dots = (line_vectors[:, np.newaxis] * pt2start_vectors).sum(axis=2) # k, n
norms = (line_vectors ** 2).sum(axis=1) # k,
norms[norms < 1e-8] = 1e-8
t = dots.T/norms # n, k
np.clip(t, 0, 1, out=t)
proj = b0.T + (t[:, np.newaxis] * line_vectors.T) # n, 2, k
rej = a[:, :, np.newaxis] - proj # n, 2, k
sq_dists = (rej ** 2).sum(axis=1) # n, k
return np.sqrt(sq_dists)
Implements Probabilistic Road Map algorithm for global planning in euclidean space.
This basically does:
0) sample attainable configurations in the world
1) make a weighted graph describing cost of moving between the configurations
2) apply shortest path algorithm to find lowest weight path between given graph vertices
The version here does not maintain state, and cannot be iteratively updated.
import numpy as np
from misc import projection as prj
import scipy.sparse as sp
def sample_near_obstacles_radially(obs, n_samples=4, mu_r=1., sig_r=.2):
Around each obstacle sample n_sample positions \in \reals^d.
Set each sample to be at a distance ~ N(mu_r, sig_r^2) from center.
- avoid dividing by zero (though, this chance is a.s. zero)
:param obs: n, d | n obstacle centers in d dimensions
:param n_samples:
:param mu_r:
:param sig_r:
:return: x: n_samples*n, d | sampled positions
n, d = obs.shape
x = np.random.randn(n_samples, n, d)
norms = np.linalg.norm(x, axis=-1, keepdims=True)
zero_mask = norms < 1e-8
norms[zero_mask] = 1.
x /= norms
unit_vec = np.zeros((d,))
unit_vec[0] = 1.
x[zero_mask[..., 0]] = unit_vec
r = np.random.randn(*norms.shape) * sig_r + mu_r
x *= r
x += obs
return x.reshape(n_samples * n, d)
def make_complete_graph(x):
Connect all points with edges with weight given by
euclidean distance.
:param x: n, d | n points \in \reals^d
:return: g: n, n | adjacency matrix for undirected weighted graph
- [i, j] = distance(x[i], x[j])
dif = x[np.newaxis, ...] - x[:, np.newaxis]
g = np.linalg.norm(dif, axis=-1)
return g
def remove_colliding_edges(g, vertices, obs, obs_r):
Set edges in given graph to zero for edges that collide with obstacles.
:param g: n, n | adjacency matrix for undirected weighted graph
:param vertices: n, d | n points in \reals^d
:param obs: m, d | obstacle positions
:param obs_r: m, | obstacle radii - used for removing graph edges
modify graph by setting colliding distances to zero (= no-edge)
n = vertices.shape[0]
i_ind, j_ind = np.mgrid[:n, :n]
i_ind, j_ind = i_ind.ravel(), j_ind.ravel()
mask = i_ind < j_ind
i_ind = i_ind[mask] # k,
j_ind = j_ind[mask]
dists = prj.pt2line_min_dists(obs, vertices[i_ind], vertices[j_ind]) # m, k
invalid_mask = np.any(dists.T < obs_r, axis=1)
g[i_ind[invalid_mask], j_ind[invalid_mask]] = 0
g[g.T == 0] = 0
def get_path_list(predecessors, i, j):
Retrieve shortest path i->j from predecessors list
:param predecessors: n, | predecessor indices for for graph with n vertices
:param i: index of starting point in graph
:param j: index of end point in graph
:return: path_inds | list of indices [i, ..., j]
path_inds = []
while j != i:
j = predecessors[j]
return path_inds[::-1]
def prm(x, y, obs, obs_r, sample_kwargs=None, graph_fcn=make_complete_graph):
:param x: d, | start position in euclidean space
:param y: d, | end position
:param obs: m, d | obstacle positions
:param obs_r: m, | obstacle radii - used for removing graph edges
:param sample_kwargs:
:param graph_fcn: n, d -> n, n | builds adjacency matrix
for undirected weighted graph
path_vertices: k, d | vertices forming shortest path found
- [0] = x, [-1] = y
sample_kwargs = sample_kwargs or {}
vertices = sample_near_obstacles_radially(obs, **sample_kwargs)
vertices = np.vstack((x, vertices, y))
n = vertices.shape[0]
g = graph_fcn(vertices)
remove_colliding_edges(g, vertices, obs, obs_r)
res = sp.csgraph.dijkstra(
csgraph=g, directed=False, indices=0, return_predecessors=True)
dist_matrix, predecessors = res
path_inds = get_path_list(predecessors, 0, n - 1)
return vertices[path_inds]
import numpy as np
class Markers(object):
def __init__(self, xy, kwargs=None):
self.xy = np.asarray(xy)
kwargs = kwargs or dict(
color='magenta', marker='^', linestyle='dashed',
linewidth=2, markersize=5, alpha=0.8,
self.kwargs = kwargs
def draw_top_view(self, ax):
ax.plot(self.xy[:, 0], self.xy[:, 1], **self.kwargs)
def draw_image_view(self, ax, camera, zorder=0):
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment