From 897ff7af6183c6797c64a22471a0c47958c348ef Mon Sep 17 00:00:00 2001 From: andersct <andersct@umich.edu> Date: Tue, 10 Nov 2020 21:33:26 -0500 Subject: [PATCH] Add active sensing - add generation of yaw based on information constraints - add solving of convex program for yaw - add cylinder scene objects - add example showing generated yaw plan --- active_sensing/__init__.py | 0 active_sensing/design_1d_variance.py | 189 +++++++++++++++++++++++ active_sensing/orientation_planning.py | 123 +++++++++++++++ examples/display_orientation_planning.py | 82 ++++++++++ misc/distances.py | 13 ++ scene_elements/cylinder.py | 63 ++++++++ scene_elements/water.py | 4 +- 7 files changed, 473 insertions(+), 1 deletion(-) create mode 100644 active_sensing/__init__.py create mode 100644 active_sensing/design_1d_variance.py create mode 100644 active_sensing/orientation_planning.py create mode 100644 examples/display_orientation_planning.py create mode 100644 misc/distances.py create mode 100644 scene_elements/cylinder.py diff --git a/active_sensing/__init__.py b/active_sensing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/active_sensing/design_1d_variance.py b/active_sensing/design_1d_variance.py new file mode 100644 index 0000000..5f49009 --- /dev/null +++ b/active_sensing/design_1d_variance.py @@ -0,0 +1,189 @@ +""" +Find choices that optimize given objective function (eg point in direction of movement) +while attaining a given level of sensing accuracy (eg variance of estimate is small enough). +Assume that variance for each point of interest is one dimensional, or +can be represented as such, such as items with diagonal variance. + +Let \theta_t \in \reals^n be the set of choices available at timestep t. +Normally we would only pick one (eg for two choices, only allow [0 1] or [1 0]), +but we relax this problem by allowing \theta_t to lie in a probability simplex. +This means we have the constraints: + +1_n^T \theta_t = 1 +0 <= \theta_t + +Allowing a linearly scaling cost at each timestep (t \in [1, ..., k]), +and penalizing the difference between "choices" at each timestep, +the optimal choices are selected by minimizing: + +minimize: + \sum_{t=1}^k c[t]^T \theta_t + + \sum_{t=1}^{t-1} lambda ||\theta_t - \theta_{t+1}||_2^2 +with: + \theta_1, ..., \theta_k \in \reals^n +subject to: + 1_n^T \theta_t = 1 for all t=1,...,k + 0 <= \theta_t for all t=1,...,k + [\sum_{t=1}^k M[i, t]^T \theta_t]^-1 <= q[i] for all i=1,...,m + +The final constraint specifies the minimum sensing accuracy at the final time +for each of the m points of interest. +Here, M[i, t] encodes the information (inverse of dispersion or variance) gained +from each of the possible choices at time t, for interest point i. +M is (m, k, n). +Note the constraint's convexity, since: +1) M[i, t] > 0 since 1/variance is positive +2) \theta_t >= 0 and has a nonzero entry since it is a (discrete) probability distribution +3) 1/x is convex over x > 0 +4) the constraint is 1/affine(x) hence convex in x +The problem is convex. +""" +import numpy as np +from scipy import sparse as spa +from scipy import optimize as opt + + +def f0(theta, c, lambda_diff, M): + k, n = c.shape + theta_m = theta.reshape(k, n) + linear_part = (c * theta_m).sum() + quadratic_part = (theta_m[1:] - theta_m[:-1]) ** 2 + return linear_part + lambda_diff * quadratic_part.sum() + + +def f0_jac(theta, c, lambda_diff, M): + k, n = c.shape + theta_m = theta.reshape(k, n) + quadratic_part = np.zeros((k, n)) + l_part = 2 * (theta_m[:-1] - theta_m[1:]) + r_part = 2 * (theta_m[1:] - theta_m[:-1]) + quadratic_part[:-1] = l_part + quadratic_part[1:] += r_part + jac = c + lambda_diff * quadratic_part + return jac.ravel() + + +def f0_hess_p(theta, p, c, lambda_diff, M): + k, n = c.shape + p_m = p.reshape(k, n) + H_pm = np.zeros_like(p_m) + H_pm[:-1] += 2 * (p_m[:-1] - p_m[1:]) + H_pm[1:] += 2 * (p_m[1:] - p_m[:-1]) + return lambda_diff * H_pm.ravel() + + +def make_simplex_constraints(k, n): + """ + Make matrix and vectors for constraint + lb <= Ax <= ub + where x = vec(\theta_1^T, ..., \theta_k^T) + Split into equality and inequality parts since solvers may treat them differently. + :param k: number of timesteps + :param n: number of choices at each timestep + :return: + A_eq: k, kn | sparse matrix of sum constraints + A_ineq: kn, kn | sparse matrix of non-negative constraints + lb: 2, | array of lower bounds [sum constraints, non-negative constraints] + ub: 2, | array of upper bounds [sum constraints, non-negative constraints] + """ + sum_1_part = spa.block_diag( + [np.ones((n,)) for _ in range(k)], + format='csc', + ) + nneg_part = spa.eye(k * n, format='csc') + return sum_1_part, nneg_part, np.array([1, 0]), np.array([1, np.inf]) + + +def info_constraint_f(theta, c, lambda_diff, M): + k, n = c.shape + theta_m = theta.reshape(k, n) + inner = (M * theta_m).sum(axis=(1, 2)) # m, + return 1/inner + + +def info_constraint_jac(theta, c, lambda_diff, M): + pass + + +def solve_instance(c, lambda_diff, M, q, x0=()): + """ + Make constraints, initial guess if needed, and solve. + :param c: k, n | c[t] = linear cost of each of n choices at timestep t + :param lambda_diff: scalar weight for cost of changing choice over time + :param M: m, k, n | 1/variance resulting for sensing choice + - [i, t, j] = for point of interest i, at timestep t, for choice j + :param q: m, | maximum variance allowed at final timestep for each of m points + :param x0: (k, n) or kn, | initial guess of solution + :return: + x: k, n | specified choice (distribution) at each timestep + is_valid: | whether optimization terminated successfully + (hence x is feasible solution) + """ + k, n = c.shape + if len(x0) == 0: + x0 = np.zeros((k, n)) + x0[:, 0] = 1. + else: + x0 = x0.ravel() + args = (c, lambda_diff, M) + A_eq, A_ineq, lb, ub = make_simplex_constraints(k, n) + lin_eq_constraint = opt.LinearConstraint(A_eq.toarray(), 1, 1, keep_feasible=False) + lin_ineq_constraint = opt.LinearConstraint(A_ineq.toarray(), 0, np.inf, keep_feasible=False) + nonlin_constraint = opt.NonlinearConstraint(lambda x: info_constraint_f(x, *args), -np.inf, q) + res = opt.minimize( + f0, x0.ravel(), args=args, + method='trust-constr', + jac=f0_jac, hessp=f0_hess_p, + constraints=[lin_eq_constraint, lin_ineq_constraint, nonlin_constraint], + options=dict(verbose=0, sparse_jacobian=True, ), + ) + return res.x.reshape(k, n), res.success + + +def _check_small_instance(): + k = 4 + n = 3 + M = np.array([[ + [2, 1, 1.], + [2, 1, 1], + [1, 2, 1], + [1, 2, 2], + ]]) + q = np.array([1/(8-1.)]) + c = np.ones((k, n)) + lambda_diff = 1. + args = (c, lambda_diff, M) + + x0 = np.zeros((k, n)) + x0[:, 0] = 1. + x0[-1, 1] = 1 + x0[-1, 0] = 0 + + xb = np.zeros((k, n)) + xb[:, 0] = 1. + + A_eq, A_ineq, lb, ub = make_simplex_constraints(k, n) + lin_eq_constraint = opt.LinearConstraint(A_eq.toarray(), 1, 1, keep_feasible=False) + lin_ineq_constraint = opt.LinearConstraint(A_ineq.toarray(), 0, np.inf, keep_feasible=False) + nonlin_constraint = opt.NonlinearConstraint(lambda x: info_constraint_f(x, *args), -np.inf, q) + + # keep_feasible=False is important for trust-constr to work + res = opt.minimize( + f0, x0.ravel(), args=args, + method='trust-constr', + # method='SLSQP', + jac=f0_jac, hessp=f0_hess_p, + constraints=[lin_eq_constraint, lin_ineq_constraint, nonlin_constraint], + options=dict(verbose=3, sparse_jacobian=False,), + # options=dict(disp=3), + ) + print(res.x.reshape(k, n)) + print(A_ineq.dot(res.x)) + print('-') + print(f0(res.x, *args)) + print(f0(x0.ravel(), *args)) + print(f0(xb.ravel(), *args)) + + +if __name__ == '__main__': + _check_small_instance() diff --git a/active_sensing/orientation_planning.py b/active_sensing/orientation_planning.py new file mode 100644 index 0000000..faa2173 --- /dev/null +++ b/active_sensing/orientation_planning.py @@ -0,0 +1,123 @@ +import numpy as np +from active_sensing import design_1d_variance as d1v +from misc.distances import circular_1d + + +def plan_planar_1d_var_orientations( + x, point_x, var_bounds, lambda_diff=0.1, + var_fcn=None, cost_fcn=None, + n_angles=16, var_fcn_kwargs=None, cost_fcn_kwargs=None): + """ + Plan angles in plane to satisfy sensing accuracy based on + spherical variance model. + :param x: k, 2 | already planned positions in plane + at which to choose orientation + - assume these are equally spaced in time + (so costs based on transitions are consistent) + :param point_x: m, 2 | points of interest + :param var_bounds: m, | maximum variance allowed at final timestep + for each of m points + :param lambda_diff: scalar weight for cost of changing choice over time + :param var_fcn: (x, angles, y) -> 1/M: (m, k, n_angles) + - facing at angle from x, variance of observation of y position + :param cost_fcn: (x, angles) -> c: (k, n_angles) + - cost of each choice of angle at each timestep + :param n_angles: + :param var_fcn_kwargs: + :param cost_fcn_kwargs: + :return: + yaw: k, | feasible plan of angle for each timestep, or default if not valid + is_valid: True iff feasible plan found + """ + var_fcn = var_fcn if var_fcn else distance_angle_spherical_var + cost_fcn = cost_fcn if cost_fcn else face_movement_direction_cost + var_fcn_kwargs = var_fcn_kwargs if var_fcn_kwargs else dict() + cost_fcn_kwargs = cost_fcn_kwargs if cost_fcn_kwargs else dict() + angles = grid_1d_angles(n_angles) + + M_inv = var_fcn(x, angles, point_x, **var_fcn_kwargs) + c = cost_fcn(x, angles, **cost_fcn_kwargs) + yaw_choices, is_valid = d1v.solve_instance(c, lambda_diff, 1/M_inv, var_bounds) + + # extract by taking distribution average + yaw = (yaw_choices * angles).sum(axis=1) + if not is_valid: + yaw = make_movement_direction_angles(x) + try: + yaw[-1] = cost_fcn_kwargs['angle_last'] + except KeyError: + pass + return yaw, is_valid + + +def grid_1d_angles(n_angles=16): + return np.linspace(0, 2*np.pi, endpoint=False, num=n_angles) + + +def distance_angle_spherical_var(x, angles, y, a=(1., 5, .3)): + """ + Compute variance of each possible choice according to model: + y_hat ~ N(y, a0 + .5 exp{a1 |w - w'|} + exp{a2 ||x - y||} ) + where current state is [x, w] and heading pointing to point at y is w' + :param x: k, 2 + :param angles: n, + :param y: m, 2 + :param a: 3, | [a0, a1, a2] + :return: M_inv: m, k, n | [i, t, j] = variance of observation of point i + at state x[t], angle[j] + """ + m = y.shape[0] + k = x.shape[0] + n = angles.size + w_p = np.arctan2( + y[:, 1][:, np.newaxis] - x[:, 1], + y[:, 0][:, np.newaxis] - x[:, 0], + ) # m, k + dif_yaw = w_p[..., np.newaxis] - angles.reshape(1, 1, -1) # m, k, n + dif_yaw = circular_1d(dif_yaw.ravel(), 2*np.pi).reshape(m, k, n) + dif_x = np.linalg.norm(y[:, np.newaxis, :] - x[np.newaxis, ...], axis=2) # m, k + M_inv = a[0] + .5 * np.exp(a[1] * dif_yaw) + np.exp(a[2] * dif_x[..., np.newaxis]) + return M_inv + + +def face_movement_direction_cost(x, angles, sd=1., angle_last=None, sd_last=0.1): + """ + Set cost(yaw_t) = |yaw_t - yaw_t^h|^2 / 2 sd^2 + where yaw_t is chosen angle at timestep t and + yaw_t^h is that timestep's direction of motion. + Use separate 'standard deviation' weighting for final timestep, since this + direction is often specified by the user. + :param x: k, 2 | [t] = position in plane at timestep t + :param angles: + :param sd: + :param angle_last: + :param sd_last: + :return: c: k, n | c[t] = linear cost of each of n choices at timestep t + """ + k = x.shape[0] + n = angles.size + motion_yaw = make_movement_direction_angles(x) + motion_yaw[-1] = angle_last if angle_last is not None else motion_yaw[-2] + dif_yaw = motion_yaw[:, np.newaxis] - angles + dif_yaw = circular_1d(dif_yaw.ravel(), 2*np.pi).reshape(k, n) + c = np.zeros((k, n)) + c[:-1] = dif_yaw[:-1] ** 2 / (2 * sd) + c[-1] = dif_yaw[-1] ** 2 / (2 * sd_last) + return c + + +def make_movement_direction_angles(x): + """ + Make yaw angles corresponding to direction of motion. + :param x: k, 2 + :return: k, + """ + k = x.shape[0] + motion_yaw = np.zeros((k,)) + np.arctan2( + x[1:, 1] - x[:-1, 1], + x[1:, 0] - x[:-1, 0], + out=motion_yaw[:-1], + ) + motion_yaw[-1] = motion_yaw[-2] + return motion_yaw diff --git a/examples/display_orientation_planning.py b/examples/display_orientation_planning.py new file mode 100644 index 0000000..f2ffd1f --- /dev/null +++ b/examples/display_orientation_planning.py @@ -0,0 +1,82 @@ +import numpy as np +from scene_elements.scene import Scene +from scene_elements.cylinder import Cylinder +from scene_elements.sphere import Sphere +from scene_elements.water import Water +from scene_elements.camera import Camera +from scipy.interpolate import InterpolatedUnivariateSpline +from rigid_body_models import boat +from vis import planar_thruster_model +from active_sensing import orientation_planning as ori +from misc import interpolation as itp + + +def build_camera(): + cam_f = 800. + cam_rows = 480 + cam_cols = 640 + cam_K = np.array([ + [cam_f, 0, cam_rows/2, 0], + [0, cam_f, cam_cols/2, 0], + [0, 0, 1., 0], + ]) + cam_Rt = np.array([ + [0, 0, 1, -.2], + [0, -1, 0, 0], + [1, 0, 0, 0], + [0, 0, 0, 1], + ]) + camera = Camera(K=cam_K, Rt=cam_Rt, cam_cols=cam_cols, cam_rows=cam_rows) + return camera + + +def main(): + point_x = np.array([ + [3, 2], + [5, 6], + [4, 10.], + ]) + var_bounds = np.array([1., 1, 1]) + scene = Scene([Cylinder(pt) for pt in point_x] + + [Water()] + + [Sphere([-1, 14.5], color='red'), + Sphere([1, 15], color='green'), + Sphere([4, 4], color='black'), + Sphere([4, 7], color='yellow'), ]) + camera = build_camera() + + DT = 0.1 + yaw_plan_dt = 1. + t_max = 18 + # [x v yaw yaw-rate] + x0 = np.array([0, 0, 0, 0, np.pi / 2, 0]) + x1 = np.array([0, 10, 0, 0, np.pi / 2, 0]) + n_state = x0.size + cam_traj_spline = itp.build_nd_interpolator( + np.array([x0, x1])[:, :2], [0, t_max], + InterpolatedUnivariateSpline, k=1, + ) + plan_x = cam_traj_spline(np.mgrid[0:t_max:yaw_plan_dt]) + plan_yaw, is_valid = ori.plan_planar_1d_var_orientations( + plan_x[1:], point_x, var_bounds, n_angles=16) + plan_z_t = np.zeros((plan_x.shape[0], n_state + 1)) + plan_z_t[:, :2] = plan_x + plan_z_t[0, :n_state] = x0 + plan_z_t[1:, 4] = plan_yaw + plan_z_t[:, 4] = itp.order_circular_1d(plan_z_t[:, 4], 2 * np.pi) + plan_z_t[:, -1] = np.mgrid[0:t_max:yaw_plan_dt] + z_spline = itp.build_nd_interpolator( + plan_z_t[:, :-1], plan_z_t[:, -1], + InterpolatedUnivariateSpline, k=1, + ) + + xytheta = z_spline(np.mgrid[0:t_max:DT])[:, [0, 1, 4]] + u = np.empty((xytheta.shape[0], 0)) + + boat_model = boat.DiamondMountedThrusters() + planar_thruster_model.loop_camera_sim( + boat_model, xytheta[:, [0, 1]], xytheta[:, 2], u, DT, scene, camera) + + +if __name__ == '__main__': + main() diff --git a/misc/distances.py b/misc/distances.py new file mode 100644 index 0000000..d585fff --- /dev/null +++ b/misc/distances.py @@ -0,0 +1,13 @@ +import numpy as np + + +def circular_1d(x, c): + """ + Wrap values in circular space to smallest magnitude representation + :param x: n, | values in circular space (eg S^1) + :param c: period of circular space [0, c) (eg 2pi) + :return: n, | distances + """ + b = np.array([-c, 0]) + d = np.min(np.abs((x % c) + b[:, np.newaxis]), axis=0) + return d diff --git a/scene_elements/cylinder.py b/scene_elements/cylinder.py new file mode 100644 index 0000000..8632d9b --- /dev/null +++ b/scene_elements/cylinder.py @@ -0,0 +1,63 @@ +import numpy as np +import matplotlib.patches as pa + + +class Cylinder(object): + + def __init__(self, xy, z=0., height=100., radius=.5, color='#f1a5c1', alpha=0.5): + self.xy = np.asarray(xy) + self.z = z + self.height = height + self.radius = radius + self.color = color + self.alpha = alpha + + def draw_top_view(self, ax): + circle = pa.Circle(tuple(self.xy), radius=self.radius, facecolor=self.color) + ax.add_artist(circle) + + def calculate_image_view_shape(self, camera): + """ + Use sphere approximation to position bottom of cylinder, and handle top + :param camera: + :return: + xy_top: 2, | [u, v] position of top of element in image + - not in image -> nans + - may be outside of image bounds, use image size to filter these + xy_bot: 2, | [u, v] position of bottom of element in image + radius: | in pixels + """ + xyz1 = np.hstack([self.xy, self.z, 1.]) + top_bot_xyz1 = np.array([xyz1, xyz1, xyz1]).T # 4, 3 + top_bot_xyz1[2, :] += [self.height, self.radius, -self.radius] + top_bot_xy1 = camera.project(top_bot_xyz1) + if np.any(top_bot_xy1[-1] <= 0): + bad = np.array([np.nan] * 2) + return bad, bad, np.nan + top_bot_xy1 /= top_bot_xy1[-1] + xy_top = top_bot_xy1[:-1, 0] + xy_bot = top_bot_xy1[:-1, 1:].mean(axis=1) + r = np.linalg.norm(top_bot_xy1[:-1, 1] - xy_bot)/2 + return xy_top, xy_bot, r + + def draw_image_view(self, ax, camera, zorder=0): + """ + Do not draw if behind camera. + (Out of frame objects can be drawn, they will just be cut off) + :param ax: + :param camera: + :param zorder: ordering for fake depth + :return: + """ + xy_top, xy_bot, r = self.calculate_image_view_shape(camera) + if np.isnan(xy_top).any(): + return + # reverse xy to place into plot coordinates, since cam x is vertical + uv_leftbot = xy_bot[::-1] - r * np.array([1, 1]) + width = 2 * r + height = xy_top[::-1][1] + r + artist = pa.Rectangle( + tuple(uv_leftbot), width, height, + facecolor=self.color, zorder=zorder, alpha=self.alpha + ) + ax.add_artist(artist) diff --git a/scene_elements/water.py b/scene_elements/water.py index 62304d9..ffabbc0 100644 --- a/scene_elements/water.py +++ b/scene_elements/water.py @@ -7,7 +7,7 @@ FAR_DIST_M = 1e5 class Water(object): - def __init__(self, z=0., bound_xy=()): + def __init__(self, z=0., bound_xy=(), is_override_zorder=True): self.xy = () self.z = z self.color = '#80ebff' @@ -17,6 +17,7 @@ class Water(object): self.bound_xy = np.array([ np.cos(theta), np.sin(theta) ]) * FAR_DIST_M + self.is_override_zorder = is_override_zorder def draw_top_view(self, ax): m = 200 @@ -53,5 +54,6 @@ class Water(object): bound_xy1[:2, ind_max], [0, camera.cam_cols], ]) + zorder = -1 if self.is_override_zorder else zorder poly = pa.Polygon(verts[:, ::-1], facecolor=self.color, zorder=zorder) ax.add_artist(poly) -- GitLab