一維高斯分布
代碼
關(guān)鍵問(wèn)題描述都放在代碼注釋里
# Write a program that will iteratively update and
# predict based on the location measurements
# and inferred motions shown below.
# 用傳感器更新當(dāng)前狀態(tài)米罚,主要是用高斯乘法
# 也就是所謂的糾偏
# 新的方差比原來(lái)兩個(gè)方差都小
# 新的均值在原來(lái)兩個(gè)均值之間
def update(mean1, var1, mean2, var2):
new_mean = float(var2 * mean1 + var1 * mean2) / (var1 + var2)
new_var = 1./(1./var1 + 1./var2)
return [new_mean, new_var]
# 移動(dòng)部分就是直接按照當(dāng)前的速度或者方向,向前推進(jìn)對(duì)應(yīng)的距離
# 在移動(dòng)的過(guò)程丈探,我們直接將移動(dòng)造成的均值和方差加在原來(lái)均值和方差之上
# 勢(shì)必會(huì)導(dǎo)致方差變大录择,這也是合乎常理,因?yàn)橐苿?dòng)預(yù)測(cè)勢(shì)必會(huì)造成信息丟失
def predict(mean1, var1, mean2, var2):
new_mean = mean1 + mean2
new_var = var1 + var2
return [new_mean, new_var]
# 傳感器的更新?tīng)顟B(tài)
measurements = [5., 6., 7., 9., 10.]
# 每次移動(dòng)的距離
motion = [1., 1., 2., 1., 1.]
#傳感器方差
measurement_sig = 4.
# 移動(dòng)的方差
motion_sig = 2.
# 初始均值
mu = 0.
# 初始方差
sig = 10000.
#Please print out ONLY the final values of the mean
#and the variance in a list [mu, sig].
# Insert code here
for i in range(len(measurements)):
[mu, sig] = update(mu, sig, measurements[i], measurement_sig)
print ('update',[mu, sig])
[mu, sig] = predict(mu, sig, motion[i], motion_sig)
print ('predict',[mu, sig])
print [mu, sig]
輸出
update [4.998000799680128, 3.9984006397441023]
predict [5.998000799680128, 5.998400639744102]
update [5.999200191953932, 2.399744061425258]
predict [6.999200191953932, 4.399744061425258]
update [6.999619127420922, 2.0951800575117594]
predict [8.999619127420921, 4.09518005751176]
update [8.999811802788143, 2.0235152416216957]
predict [9.999811802788143, 4.023515241621696]
update [9.999906177177365, 2.0058615808441944]
predict [10.999906177177365, 4.005861580844194]
分析
可以很直觀的看到碗降,傳感器對(duì)整體定位的逐步調(diào)整
mu = 0
sig = 0.0001
值得注意的一點(diǎn)是:
如上述代碼
如果我們將初始方差調(diào)到很邪摺:意味著我們非常確信最開(kāi)始的定位是準(zhǔn)確的。
我們同樣會(huì)有對(duì)應(yīng)的一組輸出
update [1.2499968750078127e-05, 9.999975000062502e-06]
predict [1.00001249996875, 2.000009999975]
update [2.6666805554976856, 1.3333377777592592]
predict [3.6666805554976856, 3.333337777759259]
update [5.181826859462716, 1.8181831404895565]
predict [7.181826859462716, 3.8181831404895563]
update [8.06977203891725, 1.9534887182243577]
predict [9.06977203891725, 3.9534887182243574]
update [9.532166075020006, 1.9883041811151205]
predict [10.532166075020006, 3.9883041811151205]
但是隨著一次次的更新讼渊,我們發(fā)現(xiàn)动看,傳感器具有更高的可信度,于是最終結(jié)果也是非常接近我們預(yù)期效果位置11
高維高斯分布
先看高維情況的卡爾曼濾波
https://zhuanlan.zhihu.com/p/39912633
一圖勝千言
二維4階卡爾曼濾波實(shí)現(xiàn)
# Write a function 'kalman_filter' that implements a multi-
# dimensional Kalman Filter for the example given
from math import *
class matrix:
# implements basic operations of a matrix class
def __init__(self, value):
self.value = value
self.dimx = len(value)
self.dimy = len(value[0])
if value == [[]]:
self.dimx = 0
def zero(self, dimx, dimy):
# check if valid dimensions
if dimx < 1 or dimy < 1:
raise ValueError("Invalid size of matrix")
else:
self.dimx = dimx
self.dimy = dimy
self.value = [[0 for row in range(dimy)] for col in range(dimx)]
def identity(self, dim):
# check if valid dimension
if dim < 1:
raise ValueError("Invalid size of matrix")
else:
self.dimx = dim
self.dimy = dim
self.value = [[0 for row in range(dim)] for col in range(dim)]
for i in range(dim):
self.value[i][i] = 1
def show(self):
for i in range(self.dimx):
print(self.value[i])
print(' ')
def __add__(self, other):
# check if correct dimensions
if self.dimx != other.dimx or self.dimy != other.dimy:
raise ValueError("Matrices must be of equal dimensions to add")
else:
# add if correct dimensions
res = matrix([[]])
res.zero(self.dimx, self.dimy)
for i in range(self.dimx):
for j in range(self.dimy):
res.value[i][j] = self.value[i][j] + other.value[i][j]
return res
def __sub__(self, other):
# check if correct dimensions
if self.dimx != other.dimx or self.dimy != other.dimy:
raise ValueError("Matrices must be of equal dimensions to subtract")
else:
# subtract if correct dimensions
res = matrix([[]])
res.zero(self.dimx, self.dimy)
for i in range(self.dimx):
for j in range(self.dimy):
res.value[i][j] = self.value[i][j] - other.value[i][j]
return res
def __mul__(self, other):
# check if correct dimensions
if self.dimy != other.dimx:
raise ValueError("Matrices must be m*n and n*p to multiply")
else:
# multiply if correct dimensions
res = matrix([[]])
res.zero(self.dimx, other.dimy)
for i in range(self.dimx):
for j in range(other.dimy):
for k in range(self.dimy):
res.value[i][j] += self.value[i][k] * other.value[k][j]
return res
def transpose(self):
# compute transpose
res = matrix([[]])
res.zero(self.dimy, self.dimx)
for i in range(self.dimx):
for j in range(self.dimy):
res.value[j][i] = self.value[i][j]
return res
# Thanks to Ernesto P. Adorio for use of Cholesky and CholeskyInverse functions
def Cholesky(self, ztol=1.0e-5):
# Computes the upper triangular Cholesky factorization of
# a positive definite matrix.
res = matrix([[]])
res.zero(self.dimx, self.dimx)
for i in range(self.dimx):
S = sum([(res.value[k][i]) ** 2 for k in range(i)])
d = self.value[i][i] - S
if abs(d) < ztol:
res.value[i][i] = 0.0
else:
if d < 0.0:
raise ValueError("Matrix not positive-definite")
res.value[i][i] = sqrt(d)
for j in range(i + 1, self.dimx):
S = sum([res.value[k][i] * res.value[k][j] for k in range(self.dimx)])
if abs(S) < ztol:
S = 0.0
try:
res.value[i][j] = (self.value[i][j] - S) / res.value[i][i]
except:
raise ValueError("Zero diagonal")
return res
def CholeskyInverse(self):
# Computes inverse of matrix given its Cholesky upper Triangular
# decomposition of matrix.
res = matrix([[]])
res.zero(self.dimx, self.dimx)
# Backward step for inverse.
for j in reversed(range(self.dimx)):
tjj = self.value[j][j]
S = sum([self.value[j][k] * res.value[j][k] for k in range(j + 1, self.dimx)])
res.value[j][j] = 1.0 / tjj ** 2 - S / tjj
for i in reversed(range(j)):
res.value[j][i] = res.value[i][j] = -sum(
[self.value[i][k] * res.value[k][j] for k in range(i + 1, self.dimx)]) / self.value[i][i]
return res
def inverse(self):
aux = self.Cholesky()
res = aux.CholeskyInverse()
return res
def __repr__(self):
return repr(self.value)
# z 是measurement
def update(x, P, z):
y = z - H * x
S = H * P * H.transpose() + R # 加入波動(dòng)的協(xié)方差
K = P * H.transpose() * S.inverse()
x = x + (K * y)
P = (I - K * H) * P
return x, P
def prediction(x, P):
x = F * x + u
P = F * P * F.transpose()
return x, P
def kalman_filter(x, P):
for n in range(len(measurements)):
z = matrix([measurements[n]]).transpose()
# 本例中是先預(yù)測(cè)后測(cè)量
x, P = prediction(x, P)
x, P = update(x, P, z)
return x, P
# measurements = [1, 2, 3] # 傳感器的均值
# x = matrix([[0.], [0.]]) # initial state (location and velocity) 均值
# P = matrix([[1000., 0.], [0., 1000.]]) # initial uncertainty 協(xié)方差
# u = matrix([[0.], [0.]]) # external motion 其他控制建模爪幻,比如司機(jī)手剎
# F = matrix([[1., 1.], [0, 1.]]) # next state function 用于預(yù)測(cè)
# H = matrix([[1., 0.]]) # measurement function
# R = matrix([[1.]]) # measurement uncertainty 傳感器的協(xié)方差
# I = matrix([[1., 0.], [0., 1.]]) # identity matrix
# measurements = [[5., 10.], [6., 8.], [7., 6.], [8., 4.], [9., 2.], [10., 0.]]
# initial_xy = [4., 12.]
# measurements = [[1., 4.], [6., 0.], [11., -4.], [16., -8.]]
# initial_xy = [-4., 8.]
measurements = [[1., 17.], [1., 15.], [1., 13.], [1., 11.]]
initial_xy = [1., 19.]
dt = 0.1
x = matrix([[initial_xy[0]], [initial_xy[1]], [0.], [0.]]) # initial state (location and velocity)
u = matrix([[0.], [0.], [0.], [0.]]) # external motion
P = matrix([[0., 0., 0., 0.],
[0., 0., 0., 0.],
[0., 0., 1000., 0.],
[0., 0., 0., 1000.]]) # initial uncertainty: 0 for positions x and y, 1000 for the two velocities
F = matrix([[1., 0, dt, 0.],
[0., 1, 0., dt],
[0., 0, 1., 0.],
[0., 0., 0., 1.]]) # next state function: generalize the 2d version to 4d
H = matrix([[1., 0., 0., 0.],
[0.,1.,0.,0.]]) # measurement function: reflect the fact that we observe x and y but not the two velocities
R = matrix([[0.1, 0], [0, 0.1]]) # measurement uncertainty: use 2x2 matrix with 0.1 as main diagonal
I = matrix([[1., 0.,0.,0.], [0., 1.,0.,0.],[0.,0.,1.,0.],[0.,0.,0.,1.]]) # 4d identity matrix
a = kalman_filter(x, P)
a[0].show()
a[1].show()
核心為后面菱皆,前面是實(shí)現(xiàn)了一個(gè)矩陣的API便于使用须误,其實(shí)完全可以用numpy
本例中使用的卡爾曼濾波公式
其中 R矩陣 也就是傳感器的誤差(方差),在真實(shí)情況下仇轻,可以通過(guò)實(shí)驗(yàn)獲得京痢,或者最開(kāi)始設(shè)置一個(gè)較大的數(shù)字,然后觀看效果篷店。如果不錯(cuò)的話祭椰,就保留