Skip to content
Snippets Groups Projects
Commit 897ff7af authored by andersct's avatar andersct
Browse files

Add active sensing

- add generation of yaw based on information constraints
- add solving of convex program for yaw
- add cylinder scene objects
- add example showing generated yaw plan
parent 38ca9953
No related branches found
No related tags found
No related merge requests found
"""
Find choices that optimize given objective function (eg point in direction of movement)
while attaining a given level of sensing accuracy (eg variance of estimate is small enough).
Assume that variance for each point of interest is one dimensional, or
can be represented as such, such as items with diagonal variance.
Let \theta_t \in \reals^n be the set of choices available at timestep t.
Normally we would only pick one (eg for two choices, only allow [0 1] or [1 0]),
but we relax this problem by allowing \theta_t to lie in a probability simplex.
This means we have the constraints:
1_n^T \theta_t = 1
0 <= \theta_t
Allowing a linearly scaling cost at each timestep (t \in [1, ..., k]),
and penalizing the difference between "choices" at each timestep,
the optimal choices are selected by minimizing:
minimize:
\sum_{t=1}^k c[t]^T \theta_t +
\sum_{t=1}^{t-1} lambda ||\theta_t - \theta_{t+1}||_2^2
with:
\theta_1, ..., \theta_k \in \reals^n
subject to:
1_n^T \theta_t = 1 for all t=1,...,k
0 <= \theta_t for all t=1,...,k
[\sum_{t=1}^k M[i, t]^T \theta_t]^-1 <= q[i] for all i=1,...,m
The final constraint specifies the minimum sensing accuracy at the final time
for each of the m points of interest.
Here, M[i, t] encodes the information (inverse of dispersion or variance) gained
from each of the possible choices at time t, for interest point i.
M is (m, k, n).
Note the constraint's convexity, since:
1) M[i, t] > 0 since 1/variance is positive
2) \theta_t >= 0 and has a nonzero entry since it is a (discrete) probability distribution
3) 1/x is convex over x > 0
4) the constraint is 1/affine(x) hence convex in x
The problem is convex.
"""
import numpy as np
from scipy import sparse as spa
from scipy import optimize as opt
def f0(theta, c, lambda_diff, M):
k, n = c.shape
theta_m = theta.reshape(k, n)
linear_part = (c * theta_m).sum()
quadratic_part = (theta_m[1:] - theta_m[:-1]) ** 2
return linear_part + lambda_diff * quadratic_part.sum()
def f0_jac(theta, c, lambda_diff, M):
k, n = c.shape
theta_m = theta.reshape(k, n)
quadratic_part = np.zeros((k, n))
l_part = 2 * (theta_m[:-1] - theta_m[1:])
r_part = 2 * (theta_m[1:] - theta_m[:-1])
quadratic_part[:-1] = l_part
quadratic_part[1:] += r_part
jac = c + lambda_diff * quadratic_part
return jac.ravel()
def f0_hess_p(theta, p, c, lambda_diff, M):
k, n = c.shape
p_m = p.reshape(k, n)
H_pm = np.zeros_like(p_m)
H_pm[:-1] += 2 * (p_m[:-1] - p_m[1:])
H_pm[1:] += 2 * (p_m[1:] - p_m[:-1])
return lambda_diff * H_pm.ravel()
def make_simplex_constraints(k, n):
"""
Make matrix and vectors for constraint
lb <= Ax <= ub
where x = vec(\theta_1^T, ..., \theta_k^T)
Split into equality and inequality parts since solvers may treat them differently.
:param k: number of timesteps
:param n: number of choices at each timestep
:return:
A_eq: k, kn | sparse matrix of sum constraints
A_ineq: kn, kn | sparse matrix of non-negative constraints
lb: 2, | array of lower bounds [sum constraints, non-negative constraints]
ub: 2, | array of upper bounds [sum constraints, non-negative constraints]
"""
sum_1_part = spa.block_diag(
[np.ones((n,)) for _ in range(k)],
format='csc',
)
nneg_part = spa.eye(k * n, format='csc')
return sum_1_part, nneg_part, np.array([1, 0]), np.array([1, np.inf])
def info_constraint_f(theta, c, lambda_diff, M):
k, n = c.shape
theta_m = theta.reshape(k, n)
inner = (M * theta_m).sum(axis=(1, 2)) # m,
return 1/inner
def info_constraint_jac(theta, c, lambda_diff, M):
pass
def solve_instance(c, lambda_diff, M, q, x0=()):
"""
Make constraints, initial guess if needed, and solve.
:param c: k, n | c[t] = linear cost of each of n choices at timestep t
:param lambda_diff: scalar weight for cost of changing choice over time
:param M: m, k, n | 1/variance resulting for sensing choice
- [i, t, j] = for point of interest i, at timestep t, for choice j
:param q: m, | maximum variance allowed at final timestep for each of m points
:param x0: (k, n) or kn, | initial guess of solution
:return:
x: k, n | specified choice (distribution) at each timestep
is_valid: | whether optimization terminated successfully
(hence x is feasible solution)
"""
k, n = c.shape
if len(x0) == 0:
x0 = np.zeros((k, n))
x0[:, 0] = 1.
else:
x0 = x0.ravel()
args = (c, lambda_diff, M)
A_eq, A_ineq, lb, ub = make_simplex_constraints(k, n)
lin_eq_constraint = opt.LinearConstraint(A_eq.toarray(), 1, 1, keep_feasible=False)
lin_ineq_constraint = opt.LinearConstraint(A_ineq.toarray(), 0, np.inf, keep_feasible=False)
nonlin_constraint = opt.NonlinearConstraint(lambda x: info_constraint_f(x, *args), -np.inf, q)
res = opt.minimize(
f0, x0.ravel(), args=args,
method='trust-constr',
jac=f0_jac, hessp=f0_hess_p,
constraints=[lin_eq_constraint, lin_ineq_constraint, nonlin_constraint],
options=dict(verbose=0, sparse_jacobian=True, ),
)
return res.x.reshape(k, n), res.success
def _check_small_instance():
k = 4
n = 3
M = np.array([[
[2, 1, 1.],
[2, 1, 1],
[1, 2, 1],
[1, 2, 2],
]])
q = np.array([1/(8-1.)])
c = np.ones((k, n))
lambda_diff = 1.
args = (c, lambda_diff, M)
x0 = np.zeros((k, n))
x0[:, 0] = 1.
x0[-1, 1] = 1
x0[-1, 0] = 0
xb = np.zeros((k, n))
xb[:, 0] = 1.
A_eq, A_ineq, lb, ub = make_simplex_constraints(k, n)
lin_eq_constraint = opt.LinearConstraint(A_eq.toarray(), 1, 1, keep_feasible=False)
lin_ineq_constraint = opt.LinearConstraint(A_ineq.toarray(), 0, np.inf, keep_feasible=False)
nonlin_constraint = opt.NonlinearConstraint(lambda x: info_constraint_f(x, *args), -np.inf, q)
# keep_feasible=False is important for trust-constr to work
res = opt.minimize(
f0, x0.ravel(), args=args,
method='trust-constr',
# method='SLSQP',
jac=f0_jac, hessp=f0_hess_p,
constraints=[lin_eq_constraint, lin_ineq_constraint, nonlin_constraint],
options=dict(verbose=3, sparse_jacobian=False,),
# options=dict(disp=3),
)
print(res.x.reshape(k, n))
print(A_ineq.dot(res.x))
print('-')
print(f0(res.x, *args))
print(f0(x0.ravel(), *args))
print(f0(xb.ravel(), *args))
if __name__ == '__main__':
_check_small_instance()
import numpy as np
from active_sensing import design_1d_variance as d1v
from misc.distances import circular_1d
def plan_planar_1d_var_orientations(
x, point_x, var_bounds, lambda_diff=0.1,
var_fcn=None, cost_fcn=None,
n_angles=16, var_fcn_kwargs=None, cost_fcn_kwargs=None):
"""
Plan angles in plane to satisfy sensing accuracy based on
spherical variance model.
:param x: k, 2 | already planned positions in plane
at which to choose orientation
- assume these are equally spaced in time
(so costs based on transitions are consistent)
:param point_x: m, 2 | points of interest
:param var_bounds: m, | maximum variance allowed at final timestep
for each of m points
:param lambda_diff: scalar weight for cost of changing choice over time
:param var_fcn: (x, angles, y) -> 1/M: (m, k, n_angles)
- facing at angle from x, variance of observation of y position
:param cost_fcn: (x, angles) -> c: (k, n_angles)
- cost of each choice of angle at each timestep
:param n_angles:
:param var_fcn_kwargs:
:param cost_fcn_kwargs:
:return:
yaw: k, | feasible plan of angle for each timestep, or default if not valid
is_valid: True iff feasible plan found
"""
var_fcn = var_fcn if var_fcn else distance_angle_spherical_var
cost_fcn = cost_fcn if cost_fcn else face_movement_direction_cost
var_fcn_kwargs = var_fcn_kwargs if var_fcn_kwargs else dict()
cost_fcn_kwargs = cost_fcn_kwargs if cost_fcn_kwargs else dict()
angles = grid_1d_angles(n_angles)
M_inv = var_fcn(x, angles, point_x, **var_fcn_kwargs)
c = cost_fcn(x, angles, **cost_fcn_kwargs)
yaw_choices, is_valid = d1v.solve_instance(c, lambda_diff, 1/M_inv, var_bounds)
# extract by taking distribution average
yaw = (yaw_choices * angles).sum(axis=1)
if not is_valid:
yaw = make_movement_direction_angles(x)
try:
yaw[-1] = cost_fcn_kwargs['angle_last']
except KeyError:
pass
return yaw, is_valid
def grid_1d_angles(n_angles=16):
return np.linspace(0, 2*np.pi, endpoint=False, num=n_angles)
def distance_angle_spherical_var(x, angles, y, a=(1., 5, .3)):
"""
Compute variance of each possible choice according to model:
y_hat ~ N(y, a0 + .5 exp{a1 |w - w'|} + exp{a2 ||x - y||} )
where current state is [x, w] and heading pointing to point at y is w'
:param x: k, 2
:param angles: n,
:param y: m, 2
:param a: 3, | [a0, a1, a2]
:return: M_inv: m, k, n | [i, t, j] = variance of observation of point i
at state x[t], angle[j]
"""
m = y.shape[0]
k = x.shape[0]
n = angles.size
w_p = np.arctan2(
y[:, 1][:, np.newaxis] - x[:, 1],
y[:, 0][:, np.newaxis] - x[:, 0],
) # m, k
dif_yaw = w_p[..., np.newaxis] - angles.reshape(1, 1, -1) # m, k, n
dif_yaw = circular_1d(dif_yaw.ravel(), 2*np.pi).reshape(m, k, n)
dif_x = np.linalg.norm(y[:, np.newaxis, :] - x[np.newaxis, ...], axis=2) # m, k
M_inv = a[0] + .5 * np.exp(a[1] * dif_yaw) + np.exp(a[2] * dif_x[..., np.newaxis])
return M_inv
def face_movement_direction_cost(x, angles, sd=1., angle_last=None, sd_last=0.1):
"""
Set cost(yaw_t) = |yaw_t - yaw_t^h|^2 / 2 sd^2
where yaw_t is chosen angle at timestep t and
yaw_t^h is that timestep's direction of motion.
Use separate 'standard deviation' weighting for final timestep, since this
direction is often specified by the user.
:param x: k, 2 | [t] = position in plane at timestep t
:param angles:
:param sd:
:param angle_last:
:param sd_last:
:return: c: k, n | c[t] = linear cost of each of n choices at timestep t
"""
k = x.shape[0]
n = angles.size
motion_yaw = make_movement_direction_angles(x)
motion_yaw[-1] = angle_last if angle_last is not None else motion_yaw[-2]
dif_yaw = motion_yaw[:, np.newaxis] - angles
dif_yaw = circular_1d(dif_yaw.ravel(), 2*np.pi).reshape(k, n)
c = np.zeros((k, n))
c[:-1] = dif_yaw[:-1] ** 2 / (2 * sd)
c[-1] = dif_yaw[-1] ** 2 / (2 * sd_last)
return c
def make_movement_direction_angles(x):
"""
Make yaw angles corresponding to direction of motion.
:param x: k, 2
:return: k,
"""
k = x.shape[0]
motion_yaw = np.zeros((k,))
np.arctan2(
x[1:, 1] - x[:-1, 1],
x[1:, 0] - x[:-1, 0],
out=motion_yaw[:-1],
)
motion_yaw[-1] = motion_yaw[-2]
return motion_yaw
import numpy as np
from scene_elements.scene import Scene
from scene_elements.cylinder import Cylinder
from scene_elements.sphere import Sphere
from scene_elements.water import Water
from scene_elements.camera import Camera
from scipy.interpolate import InterpolatedUnivariateSpline
from rigid_body_models import boat
from vis import planar_thruster_model
from active_sensing import orientation_planning as ori
from misc import interpolation as itp
def build_camera():
cam_f = 800.
cam_rows = 480
cam_cols = 640
cam_K = np.array([
[cam_f, 0, cam_rows/2, 0],
[0, cam_f, cam_cols/2, 0],
[0, 0, 1., 0],
])
cam_Rt = np.array([
[0, 0, 1, -.2],
[0, -1, 0, 0],
[1, 0, 0, 0],
[0, 0, 0, 1],
])
camera = Camera(K=cam_K, Rt=cam_Rt, cam_cols=cam_cols, cam_rows=cam_rows)
return camera
def main():
point_x = np.array([
[3, 2],
[5, 6],
[4, 10.],
])
var_bounds = np.array([1., 1, 1])
scene = Scene([Cylinder(pt) for pt in point_x] +
[Water()] +
[Sphere([-1, 14.5], color='red'),
Sphere([1, 15], color='green'),
Sphere([4, 4], color='black'),
Sphere([4, 7], color='yellow'), ])
camera = build_camera()
DT = 0.1
yaw_plan_dt = 1.
t_max = 18
# [x v yaw yaw-rate]
x0 = np.array([0, 0, 0, 0, np.pi / 2, 0])
x1 = np.array([0, 10, 0, 0, np.pi / 2, 0])
n_state = x0.size
cam_traj_spline = itp.build_nd_interpolator(
np.array([x0, x1])[:, :2], [0, t_max],
InterpolatedUnivariateSpline, k=1,
)
plan_x = cam_traj_spline(np.mgrid[0:t_max:yaw_plan_dt])
plan_yaw, is_valid = ori.plan_planar_1d_var_orientations(
plan_x[1:], point_x, var_bounds, n_angles=16)
plan_z_t = np.zeros((plan_x.shape[0], n_state + 1))
plan_z_t[:, :2] = plan_x
plan_z_t[0, :n_state] = x0
plan_z_t[1:, 4] = plan_yaw
plan_z_t[:, 4] = itp.order_circular_1d(plan_z_t[:, 4], 2 * np.pi)
plan_z_t[:, -1] = np.mgrid[0:t_max:yaw_plan_dt]
z_spline = itp.build_nd_interpolator(
plan_z_t[:, :-1], plan_z_t[:, -1],
InterpolatedUnivariateSpline, k=1,
)
xytheta = z_spline(np.mgrid[0:t_max:DT])[:, [0, 1, 4]]
u = np.empty((xytheta.shape[0], 0))
boat_model = boat.DiamondMountedThrusters()
planar_thruster_model.loop_camera_sim(
boat_model, xytheta[:, [0, 1]], xytheta[:, 2], u, DT, scene, camera)
if __name__ == '__main__':
main()
import numpy as np
def circular_1d(x, c):
"""
Wrap values in circular space to smallest magnitude representation
:param x: n, | values in circular space (eg S^1)
:param c: period of circular space [0, c) (eg 2pi)
:return: n, | distances
"""
b = np.array([-c, 0])
d = np.min(np.abs((x % c) + b[:, np.newaxis]), axis=0)
return d
import numpy as np
import matplotlib.patches as pa
class Cylinder(object):
def __init__(self, xy, z=0., height=100., radius=.5, color='#f1a5c1', alpha=0.5):
self.xy = np.asarray(xy)
self.z = z
self.height = height
self.radius = radius
self.color = color
self.alpha = alpha
def draw_top_view(self, ax):
circle = pa.Circle(tuple(self.xy), radius=self.radius, facecolor=self.color)
ax.add_artist(circle)
def calculate_image_view_shape(self, camera):
"""
Use sphere approximation to position bottom of cylinder, and handle top
:param camera:
:return:
xy_top: 2, | [u, v] position of top of element in image
- not in image -> nans
- may be outside of image bounds, use image size to filter these
xy_bot: 2, | [u, v] position of bottom of element in image
radius: | in pixels
"""
xyz1 = np.hstack([self.xy, self.z, 1.])
top_bot_xyz1 = np.array([xyz1, xyz1, xyz1]).T # 4, 3
top_bot_xyz1[2, :] += [self.height, self.radius, -self.radius]
top_bot_xy1 = camera.project(top_bot_xyz1)
if np.any(top_bot_xy1[-1] <= 0):
bad = np.array([np.nan] * 2)
return bad, bad, np.nan
top_bot_xy1 /= top_bot_xy1[-1]
xy_top = top_bot_xy1[:-1, 0]
xy_bot = top_bot_xy1[:-1, 1:].mean(axis=1)
r = np.linalg.norm(top_bot_xy1[:-1, 1] - xy_bot)/2
return xy_top, xy_bot, r
def draw_image_view(self, ax, camera, zorder=0):
"""
Do not draw if behind camera.
(Out of frame objects can be drawn, they will just be cut off)
:param ax:
:param camera:
:param zorder: ordering for fake depth
:return:
"""
xy_top, xy_bot, r = self.calculate_image_view_shape(camera)
if np.isnan(xy_top).any():
return
# reverse xy to place into plot coordinates, since cam x is vertical
uv_leftbot = xy_bot[::-1] - r * np.array([1, 1])
width = 2 * r
height = xy_top[::-1][1] + r
artist = pa.Rectangle(
tuple(uv_leftbot), width, height,
facecolor=self.color, zorder=zorder, alpha=self.alpha
)
ax.add_artist(artist)
......@@ -7,7 +7,7 @@ FAR_DIST_M = 1e5
class Water(object):
def __init__(self, z=0., bound_xy=()):
def __init__(self, z=0., bound_xy=(), is_override_zorder=True):
self.xy = ()
self.z = z
self.color = '#80ebff'
......@@ -17,6 +17,7 @@ class Water(object):
self.bound_xy = np.array([
np.cos(theta), np.sin(theta)
]) * FAR_DIST_M
self.is_override_zorder = is_override_zorder
def draw_top_view(self, ax):
m = 200
......@@ -53,5 +54,6 @@ class Water(object):
bound_xy1[:2, ind_max],
[0, camera.cam_cols],
])
zorder = -1 if self.is_override_zorder else zorder
poly = pa.Polygon(verts[:, ::-1], facecolor=self.color, zorder=zorder)
ax.add_artist(poly)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment