Skip to content

Commit ce27644

Browse files
committed
Added HyperhemisphericalGridFilter
1 parent faf8e3d commit ce27644

2 files changed

Lines changed: 179 additions & 0 deletions

File tree

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import numpy as np
2+
import warnings
3+
from .abstract_grid_filter import AbstractGridFilter
4+
from .abstract_hyperhemispherical_filter import AbstractHyperhemisphericalFilter
5+
from pyrecest.distributions.hypersphere_subset.hyperhemispherical_grid_distribution import HyperhemisphericalGridDistribution
6+
from pyrecest.distributions.conditional.sd_half_cond_sd_half_grid_distribution import SdHalfCondSdHalfGridDistribution
7+
from pyrecest.distributions import BinghamDistribution
8+
from pyrecest.distributions import WatsonDistribution
9+
from pyrecest.distributions import VonMisesFisherDistribution
10+
from pyrecest.distributions import HypersphericalMixture
11+
from pyrecest.distributions import HyperhemisphericalUniformDistribution
12+
from pyrecest.distributions import AbstractHyperhemisphericalDistribution
13+
from pyrecest.distributions import HyperhemisphericalWatsonDistribution
14+
15+
class HyperhemisphericalGridFilter(AbstractGridFilter, AbstractHyperhemisphericalFilter):
16+
def __init__(self, no_of_coefficients, dim, grid_type='eq_point_set_symm'):
17+
self.gd = HyperhemisphericalGridDistribution.from_distribution(
18+
HyperhemisphericalUniformDistribution(dim), no_of_coefficients, grid_type)
19+
20+
def set_state(self, new_state):
21+
assert self.dim == new_state.dim
22+
assert isinstance(new_state, AbstractHyperhemisphericalDistribution)
23+
self.gd = new_state
24+
25+
def predict_identity(self, d_sys):
26+
assert isinstance(d_sys, AbstractHyperhemisphericalDistribution)
27+
sd_half_cond_sd_half = HyperhemisphericalGridFilter.sys_noise_to_transition_density(
28+
d_sys, self.gd.grid_values.shape[0])
29+
self.predict_nonlinear_via_transition_density(sd_half_cond_sd_half)
30+
31+
def update_identity(self, meas_noise, z=None):
32+
assert isinstance(meas_noise, AbstractHyperhemisphericalDistribution)
33+
if not z==None:
34+
measNoise = measNoise.setMode(z)
35+
curr_grid = self.gd.get_grid()
36+
self.gd = self.gd.multiply(HyperhemisphericalGridDistribution(curr_grid, 2 * meas_noise.pdf(curr_grid).T))
37+
38+
def update_nonlinear(self, likelihood, z):
39+
self.gd.grid_values = self.gd.grid_values * likelihood(z, self.gd.get_grid()).T
40+
with warnings.catch_warnings():
41+
warnings.simplefilter("ignore", category=RuntimeWarning)
42+
self.gd = self.gd.normalize()
43+
44+
def predict_nonlinear_via_transition_density(self, f_trans):
45+
assert np.array_equal(self.gd.get_grid(), f_trans.get_grid()), \
46+
"fTrans is using an incompatible grid."
47+
self.gd = self.gd.normalize()
48+
grid_values_new = self.gd.get_manifold_size() / self.gd.grid_values.shape[0] * f_trans.grid_values.dot(
49+
self.gd.grid_values)
50+
self.gd = HyperhemisphericalGridDistribution(self.gd.get_grid(), grid_values_new)
51+
52+
def get_point_estimate(self):
53+
gd_full_sphere = self.gd.to_full_sphere()
54+
p = BinghamDistribution.fit(gd_full_sphere.get_grid(), gd_full_sphere.grid_values.T / np.sum(
55+
gd_full_sphere.grid_values)).mode()
56+
if p[-1] < 0:
57+
p = -p
58+
return p
59+
60+
@staticmethod
61+
def sys_noise_to_transition_density(d_sys, no_grid_points):
62+
if isinstance(d_sys, AbstractDistribution):
63+
if isinstance(d_sys, (HyperhemisphericalWatsonDistribution, WatsonDistribution)):
64+
def trans(xkk, xk):
65+
return np.array([2 * WatsonDistribution(xk[:, i], d_sys.kappa).pdf(xkk) for i in range(xk.shape[1])]).T
66+
67+
elif (isinstance(d_sys, HypersphericalMixture) and len(d_sys.dists) == 2 and
68+
np.all(d_sys.w == 0.5) and np.array_equal(d_sys.dists[0].mu, -d_sys.dists[1].mu) and
69+
d_sys.dists[0].kappa == d_sys.dists[1].kappa):
70+
def trans(xkk, xk):
71+
return np.array([(VonMisesFisherDistribution(xk[:, i], d_sys.dists[0].kappa).pdf(xkk) +
72+
VonMisesFisherDistribution(xk[:, i], d_sys.dists[0].kappa).pdf(-xkk))
73+
for i in range(xk.shape[1])]).T
74+
else:
75+
raise ValueError("Distribution not supported for predict identity. Must be zonal (rotationally symmetric around last dimension)")
76+
77+
print("PredictIdentity:Inefficient - Using inefficient prediction. Consider precalculating the SdHalfCondSdHalfGridDistribution and using predictNonlinearViaTransitionDensity.")
78+
sd_half_cond_sd_half = SdHalfCondSdHalfGridDistribution.from_function(trans, no_grid_points, True, 'eq_point_set_symm', 2 * d_sys.dim)
79+
return sd_half_cond_sd_half
80+
81+
else:
82+
raise TypeError("d_sys must be an instance of AbstractDistribution")
83+
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import unittest
2+
import numpy as np
3+
from pyrecest.filters.hyperhemispherical_grid_filter import HyperhemisphericalGridFilter
4+
from pyrecest.distributions.hypersphere_subset.hyperhemispherical_grid_distribution import HyperhemisphericalGridDistribution
5+
from pyrecest.distributions.conditional.sd_half_cond_sd_half_grid_distribution import SdHalfCondSdHalfGridDistribution
6+
from pyrecest.distributions import BinghamDistribution, HypersphericalMixture, VonMisesFisherDistribution
7+
8+
class HyperhemisphericalGridFilterTest(unittest.TestCase):
9+
def test_set_state_s2(self):
10+
np.random.seed(0)
11+
no_grid_points = 1001
12+
sgf = HyperhemisphericalGridFilter(no_grid_points, 3)
13+
14+
self.assertEqual(sgf.get_estimate().grid_values.shape, (no_grid_points, 1))
15+
16+
# Test if it is uniform at the beginning
17+
self.assertAlmostEqual(np.sum(np.abs(sgf.get_estimate().grid_values - (1 / sgf.get_estimate().get_manifold_size() * np.ones((no_grid_points, 1))))), 0, delta=1e-13)
18+
19+
M = np.eye(3)
20+
Z = np.array([-2, -1, 0]).reshape(-1, 1)
21+
bd = BinghamDistribution(Z, M)
22+
bd.F = bd.F * bd.integrate_numerically()
23+
24+
sgd_state = HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points)
25+
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution)
26+
sgf.set_state(sgd_state)
27+
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution)
28+
29+
# Verify that it is no longer a uniform distribution
30+
self.assertGreater(np.sum(np.abs(sgf.get_estimate().grid_values - (1 / sgf.get_estimate().get_manifold_size()))), 60)
31+
32+
# Verify estimate matches a mode of the Bingham
33+
self.assertAlmostEqual(np.min(np.linalg.norm(sgf.get_point_estimate() - np.hstack((bd.mode(), -bd.mode())), axis=0)), 0, delta=1e-11)
34+
35+
# Verify errors
36+
full_grid = sgd_state.get_grid()
37+
sgd_state.grid = full_grid[:, -1]
38+
sgd_state.grid_values = sgd_state.grid_values[:-1]
39+
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution)
40+
sgf_tmp = sgf.copy()
41+
42+
with self.assertRaises(ValueError):
43+
sgf_tmp.set_state(sgd_state)
44+
45+
with self.assertRaises(ValueError):
46+
sgf.set_state(bd)
47+
48+
def test_set_state_s3(self):
49+
no_grid_points = 1001
50+
sgf = HyperhemisphericalGridFilter(no_grid_points, 4)
51+
self.assertEqual(sgf.get_estimate().grid_values.shape, (no_grid_points, 1))
52+
53+
# Test if it is uniform at the beginning
54+
self.assertAlmostEqual(np.sum(np.abs(np.diff(sgf.get_estimate().grid_values.T))), 0)
55+
56+
M = np.eye(4)
57+
Z = np.array([-2, -1, -0.5, 0]).reshape(-1, 1)
58+
bd = BinghamDistribution(Z, M)
59+
bd.F = bd.F * bd.integrate_numerically()
60+
61+
sgd_state = HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points)
62+
sgf.set_state(sgd_state)
63+
64+
# Verify that it is no longer a uniform distribution
65+
self.assertGreater(np.sum(np.abs(np.diff(sgf.get_estimate().grid_values.T))), 5)
66+
def test_predict_converges_to_uniform_S2_S3(self):
67+
test_predict_converges_to_uniform(3, 501, [-2, -1, 0], 3, 5e-5, 'eq_point_set_symm', 6)
68+
test_predict_converges_to_uniform(4, 1001, [-2, -1, -0.5, 0], 5, 1e-3, 'eq_point_set', 8)
69+
def test_predict_converges_to_uniform(dim, no_grid_points, z_values, tol_verify_greater, tol_verify_equal, eq_point_set_type, eq_point_set_arg):
70+
sgf = HyperhemisphericalGridFilter(no_grid_points, dim)
71+
M = np.eye(dim)
72+
Z = np.array(z_values).reshape(-1, 1)
73+
bd = BinghamDistribution(Z, M)
74+
bd.F = bd.F * bd.integrate_numerically()
75+
sgf.set_state(HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points))
76+
77+
# Verify that it is not a uniform distribution
78+
assert sum(abs(np.diff(sgf.get_estimate().grid_values.T))) > tol_verify_greater
79+
80+
# Predict 10 times with VM-distributed noise
81+
def trans(xkk, xk):
82+
return 2 * np.hstack([HypersphericalMixture([VonMisesFisherDistribution(xk[:, i], 1), VonMisesFisherDistribution(-xk[:, i], 1)], [0.5, 0.5]).pdf(xkk) for i in range(xk.shape[1])])
83+
84+
sdsd = SdHalfCondSdHalfGridDistribution.from_function(trans, no_grid_points, True, eq_point_set_type, eq_point_set_arg)
85+
manifold_size = sgf.get_estimate().get_manifold_size()
86+
87+
for i in range(10):
88+
values_alternative_formula = (manifold_size / sgf.get_estimate().get_grid().shape[1]) * np.sum(sgf.get_estimate().grid_values.T * sdsd.grid_values, axis=1)
89+
sgf.predict_nonlinear_via_transition_density(sdsd)
90+
assert np.allclose(sgf.get_estimate().grid_values, values_alternative_formula, atol=1e-12)
91+
92+
# Verify that it is approximately uniform now
93+
assert np.isclose(sum(abs(np.diff(sgf.get_estimate().grid_values.T))), 0, atol=tol_verify_equal)
94+
95+
if __name__ == '__main__':
96+
unittest.main()

0 commit comments

Comments
 (0)