-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathKalman.py
274 lines (213 loc) · 8.31 KB
/
Kalman.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
"""
Defining the basic Kalman filtered
adjusted for correcting meteorological
data (ie no explicit system dynamics).
"""
import numpy as np
class Kalman:
def __init__(self, history, dim, F = None, P = None, exp_var = None):
if history < 1 or dim < 1:
print('Kalman module reporting: ')
raise ValueError('Improper value entered for history length and/or dimension of observation matrix...')
if dim < 2:
print('Kalman module reporting: ')
print('Caution! Low accuracy due to the size of the observation matrix.')
# Set the bare minimum info
self.history = history
self.dim = dim
self.sqShape = (dim, dim)
self.vecShape = (dim, 1)
self.nTrained = 0
# Set the system matrix based on entries
self.set_system_matrix(F)
# Set the covariance matrix based on entries
self.set_covariance_matrix(P)
# Hope it's classic Kalman, but you never know
self.classic = True
self.exp_var = None
if not (exp_var is None):
print('Switching to Information Geometry Kalman filter...')
print('Variance for data is provided explicitly as: {}'.format(exp_var))
self.classic = False
self.exp_var = exp_var
# Most likely, it's not going to be used right away, but planning ahead
# Initialise other relevant matrices
self.X = np.zeros(self.vecShape) # State vector
self.H = np.zeros(self.vecShape) # Observations matrix
self.KG = np.zeros(self.vecShape) # Kalman gain
# Create variables to keep history data
self.x_history = np.zeros((self.dim, self.history + 1))
self.H_history = np.zeros((self.dim, self.history + 1))
self.y_history = np.zeros((self.history + 1, 1))
def set_system_matrix(self, ff):
# Get the system matrix
if ff is None:
self.F = np.eye(self.dim)
else:
if ff.shape[0] == self.dim:
self.F = np.array(ff)
else:
print('Kalman module reporting: ')
raise ValueError('Transition (system) matrix F has the wrong dimensions.')
return
def set_covariance_matrix(self, pp):
# Get the covariance matrix
if pp is None:
self.P = 10.0 * np.ones(self.sqShape)
else:
if isinstance(pp, list):
if pp.shape[0] == self.dim:
self.P = np.array(pp)
else:
print('Kalman module reporting: ')
raise ValueError('Covariance matrix P has the wrong dimensions.')
else:
self.P = pp * np.ones(self.sqShape)
return
def set_observations_matrix(self, val):
# Get the observations matrix from
# current model's prediction
for ij in range(self.dim):
self.H[ij, 0] = val ** ij
return
def print_2D_mat(self, mat, txt):
# Just show me the contents
print('{}:\t \t{}'.format(txt, mat[0, :]))
for ij in range(1,self.dim):
print(' \t \t{}'.format(mat[ij, :]))
return
def print_1D_mat(self, mat, txt):
# Just show me the contents
print('{}:\t \t{}'.format(txt, mat[:, 0]))
def update_x_history(self, xx):
# Update the x-history
self.x_history[:, 0:-1] = self.x_history[:, 1:]
self.x_history[:, -1] = xx[:, 0]
return
def update_H_history(self, hh):
# Update the H-history
self.H_history[:, 0:-1] = self.H_history[:, 1:]
self.H_history[:, -1] = hh[:, 0]
return
def update_y_history(self, yy):
# Update the y-history
self.y_history[0:-1, 0] = self.y_history[1:, 0]
self.y_history[-1, 0] = yy
return
def get_state_variance(self):
# Calculate the variance
# from the x-history
sx = np.zeros_like(self.X)
for ij in range(self.history):
sx[:, 0] += self.x_history[:, ij] - self.x_history[:, ij + 1]
sx /= self.history
Wt = np.zeros_like(self.P)
for ij in range(self.history):
temp = self.x_history[:, ij] - self.x_history[:, ij + 1] - sx
Wt += np.dot(temp, temp.T)
Wt /= (self.history - 1.0)
return Wt
def get_observation_variance(self):
# Calculate the variance
# from the y-history
sy = 0.0
for ij in range(self.history):
sy += self.y_history[ij, 0] - np.dot(self.H_history[:, ij].T, self.x_history[:, ij])
sy /= self.history
Vt = 0.0
for ij in range(self.history):
temp = self.y_history[ij, 0] - np.dot(self.H_history[:, ij].T, self.x_history[:, ij]) - sy
Vt += temp ** 2
Vt /= (self.history - 1.0)
return Vt
def predict(self):
# Provide an optimal estimate
# for State vector and Covariance
# matrix
W = self.get_state_variance()
self.X_est = np.dot(self.F, self.X)
self.P_est = np.dot(np.dot(self.F, self.P), self.F.T) + W
return
def update(self, y):
# Provide an update of
# the filter's state
# based on new input
# Get the variance of the dataset
if self.classic:
vt = self.get_observation_variance()
else:
vt = self.exp_var
# Calculate the value for denominator
denom = np.dot(np.dot(self.H.T, self.P_est), self.H) + vt
# Kalman Gain
self.KG = np.dot(self.P_est, self.H) / denom
# Get new state
self.X = self.X + self.KG * (y - np.dot(self.H.T, self.X_est))
# Get new covariance matrix
self.P = np.dot(np.eye(self.dim) - np.dot(self.KG, self.H.T), self.P_est)
# Update history data
self.update_x_history(self.X)
self.update_H_history(self.H)
self.update_y_history(y)
return
def trainMe(self, obs, model, showProrgess = False):
"""
Master method to control the initial
training of the filter.
"""
# Ensure they are numpy arrays
myobs = np.array(obs)
mymodel = np.array(model)
# Check if the dimensions match
if myobs.shape != mymodel.shape:
print('Kalman module reporting: ')
raise TypeError('Initial training set does not have conforming shapes.')
NN = len(myobs)
if NN > self.history:
print('Kalman module reporting: ')
print('WARNING: Dimensions of training set exceeds length of history database.')
# Train it using all the available data
for ih in range(NN):
self.nTrained += 1
if showProrgess:
print('Training #{}'.format(self.nTrained))
y = myobs[ih] - mymodel[ih]
self.set_observations_matrix(mymodel[ih])
self.predict()
self.update(y)
if showProrgess:
self.dumpMembers()
print('************************************ \n')
return
def adjustForecast(self, val, buff = 20.0):
"""
Method to provide an adjustment to
the forecast value based on current
state of the filter.
"""
prod = np.dot(self.H.T, self.X)
ret = val + prod
if ret <= 0.0:
ret = 0.0
elif abs(prod) >= buff:
ret = val
return ret
def dumpMembers(self):
"""
Defining the "print" method for
debugging and informative purposes.
"""
print('--------------------------')
print(' Kalman Instance ')
print('Classic? \t{}'.format(self.classic))
print('History: \t{}'.format(self.history))
print('Order: \t{}'.format(self.dim))
self.print_2D_mat(self.F, 'F')
self.print_2D_mat(self.P, 'P')
self.print_2D_mat(self.KG, 'KG')
self.print_1D_mat(self.X, 'X')
self.print_1D_mat(self.H, 'H')
self.print_2D_mat(self.x_history, 'x-hist')
self.print_1D_mat(self.y_history, 'y-hist')
print('Trained: \t{}'.format(self.nTrained))
print('--------------------------')