# PYTHON VERSION SHOULD BE 3.8.8
import sys
print(sys.version)
# NUMPY VERSION SHOULD BE 1.19.0
import numpy as np
np.version.version
3.8.8 (default, Apr 13 2021, 15:08:03) [MSC v.1916 64 bit (AMD64)]
'1.19.0'
import matplotlib.pyplot as plt
from l5kit.data import ChunkedDataset, LocalDataManager
from l5kit.dataset import EgoDataset, AgentDataset
from l5kit.rasterization import build_rasterizer
from l5kit.configs import load_config_data
import os
# opening the zarr_dataset
# os.environ["L5KIT_DATA_FOLDER"] = "../../prediction-dataset/"
os.environ["L5KIT_DATA_FOLDER"] = "../prediction-sample-dataset/"
cfg = load_config_data("./visualisation_config.yaml")
dm = LocalDataManager()
dataset_path = dm.require(cfg["val_data_loader"]["key"])
zarr_dataset = ChunkedDataset(dataset_path)
zarr_dataset.open()
<l5kit.data.zarr_dataset.ChunkedDataset at 0x2141eef14c0>
# using EgoDataset interface to extract AV data
rast = build_rasterizer(cfg, dm)
ego_dataset = EgoDataset(cfg, zarr_dataset, rast)
# kalmanFilterSetup() sets up the matrices needed to use the multivariate Kalman Filter
def kalmanFilterSetup():
# setting up matrices for Kalman Filter
# initial state matrix, "unknown" vehicle location so we set pos, vel, acc to 0
x = np.array([[0],
[0],
[0],
[0],
[0],
[0]])
# since velocity and acceleration is unknown, a high estimate uncertainty is set, resulting in a high Kalman Gain
# calculate these values
P = np.array([[100, 0, 0, 0, 0, 0],
[0, 80, 0, 0, 0, 0],
[0, 0, 10, 0, 0, 0],
[0, 0, 0, 100, 0, 0],
[0, 0, 0, 0, 80, 0],
[0, 0, 0, 0, 0, 10]])
# t = measurement period in seconds
t = 1
acc_std_dev = 0.2
# state transition matrix, kinematic equations for position, velocity, and acceleration
F = np.array([[1, t, 0.5*(t**2), 0, 0, 0],
[0, 1, t, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, t, 0.5*(t**2)],
[0, 0, 0, 0, 1, t],
[0, 0, 0, 0, 0, 1]])
# process noise matrix
Q = np.array([[0.25, 0.5, 0.5, 0, 0, 0],
[ 0.5, 1, 1, 0, 0, 0],
[ 0.5, 1, 1, 0, 0, 0],
[ 0, 0, 0, 0.25, 0.5, 0.5],
[ 0, 0, 0, 0.5, 1, 1],
[ 0, 0, 0, 0.5, 1, 1]]) * (acc_std_dev**2)
# measurement error variance
xvar = 30
yvar = 30
# measurement uncertainty matrix
R = np.array([[xvar, 0],
[0, yvar]])
H = np.array([[1, 0, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0]])
I = np.array([[1, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 1]])
# initialization
P = np.matmul(np.matmul(F, P), np.transpose(F)) + Q
return(x, F, P, R, H, Q, I,)
# kalmanFilterSetup() sets up the matrices needed to use the multivariate Kalman Filter
def kalmanFilterSetupPos(initial_position):
# setting up matrices for Kalman Filter
# initial state matrix, "unknown" vehicle location so we set pos, vel, acc to 0
x = np.array([[initial_position[0]],
[0],
[0],
[initial_position[1]],
[0],
[0]])
# since velocity and acceleration is unknown, a high estimate uncertainty is set, resulting in a high Kalman Gain
# calculate these values
P = np.array([[100, 0, 0, 0, 0, 0],
[0, 80, 0, 0, 0, 0],
[0, 0, 10, 0, 0, 0],
[0, 0, 0, 100, 0, 0],
[0, 0, 0, 0, 80, 0],
[0, 0, 0, 0, 0, 10]])
# t = measurement period in seconds
t = 1
acc_std_dev = 0.2
# state transition matrix, kinematic equations for position, velocity, and acceleration
F = np.array([[1, t, 0.5*(t**2), 0, 0, 0],
[0, 1, t, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, t, 0.5*(t**2)],
[0, 0, 0, 0, 1, t],
[0, 0, 0, 0, 0, 1]])
# process noise matrix
Q = np.array([[0.25, 0.5, 0.5, 0, 0, 0],
[ 0.5, 1, 1, 0, 0, 0],
[ 0.5, 1, 1, 0, 0, 0],
[ 0, 0, 0, 0.25, 0.5, 0.5],
[ 0, 0, 0, 0.5, 1, 1],
[ 0, 0, 0, 0.5, 1, 1]]) * (acc_std_dev**2)
# measurement error variance
xvar = 1
yvar = 1
# measurement uncertainty matrix
R = np.array([[xvar, 0],
[0, yvar]])
H = np.array([[1, 0, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0]])
I = np.array([[1, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 1]])
# initialization
P = np.matmul(np.matmul(F, P), np.transpose(F)) + Q
return(x, F, P, R, H, Q, I,)
# kalmanFilter() implements the Kalman Filter on the inputted measurements for the given number of iterations
def kalmanFilter(positions, output):
estimates = []
predictions = []
x, F, P, R, H, Q, I = kalmanFilterSetup()
for i in range(len(positions)):
z = np.array([[positions[i][0]],
[positions[i][1]]])
if output: print(z)
# Kalman Gain calculation
# K = (P * H_T) * (H * P * H_T + R)^-1
K = np.matmul(np.matmul(P, np.transpose(H)),
np.linalg.inv(np.matmul(np.matmul(H, P), np.transpose(H)) + R))
if output: print(K)
# current state estimation
# x = x + K * (z - H * x)
x = x + np.matmul(K, (z - np.matmul(H, x)))
estimates.append([x[0][0],x[3][0]])
if output: print(x)
# update estimate uncertainty
# P = (I - K * H) * P * (I - K * H)_T + K * R * K_T
# IKH = (I - K * H)
IKH = I - np.matmul(K, H)
P = np.matmul(np.matmul(IKH, P), np.transpose(IKH)) + np.matmul(np.matmul(K, R), np.transpose(K))
if output: print(P)
# predict state and update estimate uncertainty
# x = F * x
# P = F * P * F_T + Q
x = np.matmul(F, x)
P = np.matmul(np.matmul(F, P), np.transpose(F)) + Q
predictions.append([x[0][0],x[3][0]])
if output: print(x)
if output: print(P)
# formatting
estimates = np.array(estimates)
predictions = np.array(predictions)
return (estimates, predictions)
def kalmanFilterPos(positions, output):
estimates = []
predictions = []
x, F, P, R, H, Q, I = kalmanFilterSetupPos(positions[0])
for i in range(len(positions)):
z = np.array([[positions[i][0]],
[positions[i][1]]])
if output: print(z)
# Kalman Gain calculation
# K = (P * H_T) * (H * P * H_T + R)^-1
K = np.matmul(np.matmul(P, np.transpose(H)),
np.linalg.inv(np.matmul(np.matmul(H, P), np.transpose(H)) + R))
if output: print(K)
# current state estimation
# x = x + K * (z - H * x)
x = x + np.matmul(K, (z - np.matmul(H, x)))
estimates.append([x[0][0],x[3][0]])
if output: print(x)
# update estimate uncertainty
# P = (I - K * H) * P * (I - K * H)_T + K * R * K_T
# IKH = (I - K * H)
IKH = I - np.matmul(K, H)
P = np.matmul(np.matmul(IKH, P), np.transpose(IKH)) + np.matmul(np.matmul(K, R), np.transpose(K))
if output: print(P)
# predict state and update estimate uncertainty
# x = F * x
# P = F * P * F_T + Q
x = np.matmul(F, x)
P = np.matmul(np.matmul(F, P), np.transpose(F)) + Q
predictions.append([x[0][0],x[3][0]])
if output: print(x)
if output: print(P)
# formatting
estimates = np.array(estimates)
predictions = np.array(predictions)
return (estimates, predictions)
scene_frame_indices = []
scene_frame_indices = zarr_dataset.scenes[:]["frame_index_interval"]
print(scene_frame_indices)
[[ 0 248] [ 248 497] [ 497 746] [ 746 995] [ 995 1244] [ 1244 1493] [ 1493 1742] [ 1742 1991] [ 1991 2240] [ 2240 2489] [ 2489 2738] [ 2738 2987] [ 2987 3236] [ 3236 3485] [ 3485 3734] [ 3734 3983] [ 3983 4232] [ 4232 4481] [ 4481 4730] [ 4730 4979] [ 4979 5228] [ 5228 5477] [ 5477 5725] [ 5725 5973] [ 5973 6221] [ 6221 6469] [ 6469 6717] [ 6717 6965] [ 6965 7213] [ 7213 7461] [ 7461 7709] [ 7709 7957] [ 7957 8205] [ 8205 8453] [ 8453 8701] [ 8701 8949] [ 8949 9197] [ 9197 9445] [ 9445 9693] [ 9693 9941] [ 9941 10189] [10189 10437] [10437 10685] [10685 10933] [10933 11182] [11182 11431] [11431 11680] [11680 11929] [11929 12177] [12177 12425] [12425 12673] [12673 12921] [12921 13169] [13169 13417] [13417 13665] [13665 13913] [13913 14161] [14161 14409] [14409 14657] [14657 14905] [14905 15153] [15153 15401] [15401 15649] [15649 15897] [15897 16145] [16145 16393] [16393 16641] [16641 16889] [16889 17137] [17137 17385] [17385 17633] [17633 17881] [17881 18129] [18129 18377] [18377 18625] [18625 18874] [18874 19123] [19123 19372] [19372 19621] [19621 19870] [19870 20118] [20118 20366] [20366 20614] [20614 20862] [20862 21110] [21110 21358] [21358 21607] [21607 21856] [21856 22105] [22105 22354] [22354 22603] [22603 22852] [22852 23101] [23101 23350] [23350 23598] [23598 23846] [23846 24094] [24094 24342] [24342 24590] [24590 24838]]
def plotData(measurements):
estimates_pos, predictions_pos = kalmanFilterPos(measurements, output=False)
estimates, predictions = kalmanFilter(measurements, output=False)
measurements_x = [pos[0] for pos in measurements]
measurements_y = [pos[1] for pos in measurements]
predictions_x = [prediction[0] for prediction in predictions]
predictions_y = [prediction[1] for prediction in predictions]
predictions_x_pos = [prediction[0] for prediction in predictions_pos]
predictions_y_pos = [prediction[1] for prediction in predictions_pos]
fig, axs = plt.subplots(1, 3, figsize=(12, 5))
ax = plt.subplot(1, 3, 1)
ax.scatter(measurements_x, measurements_y, label="Positions", s=5, alpha=1, color='c')
ax.legend()
ax.set_title("Position Data")
ax.set_xlabel("Position (m)")
ax.set_ylabel("Position (m)")
ax1 = plt.subplot(1, 3, 2)
ax1.scatter(measurements_x, measurements_y, label="Positions", s=5, alpha=0.6, color='c')
ax1.scatter(predictions_x, predictions_y, label="Predictions", s=5, alpha=0.4, color='m')
ax1.legend()
ax1.set_title("Basic Kalman Filter")
ax2 = plt.subplot(1, 3, 3)
ax2.scatter(measurements_x, measurements_y, label="Positions", s=5, alpha=0.6, color='c')
ax2.scatter(predictions_x_pos, predictions_y_pos, label="Predictions", s=5, alpha=0.4, color='m')
ax2.legend()
ax2.set_title("Filter with Initial Position")
# https://stackoverflow.com/questions/38855748/matplotlib-generating-strange-y-axis-on-certain-data-sets
ax.get_yaxis().get_major_formatter().set_useOffset(False)
ax1.get_yaxis().get_major_formatter().set_useOffset(False)
ax2.get_yaxis().get_major_formatter().set_useOffset(False)
plt.tight_layout()
plt.show()
def compareFilters(scene_number):
positions = zarr_dataset.frames[slice(scene_frame_indices[scene_number][0], scene_frame_indices[scene_number][1])]["ego_translation"]
positions = [[pos[0], pos[1]] for pos in positions]
plotData(positions)
compareFilters(0)
compareFilters(2)
compareFilters(5)
compareFilters(6)
compareFilters(60)
def mse(position, prediction):
position = np.array(position)
prediction = np.array(prediction)
return np.square(np.subtract(position, prediction)).mean(axis=0)
# https://www.geeksforgeeks.org/how-to-implement-weighted-mean-square-error-in-python/#:~:text=Weighted%20mean%20square%20error%20enables,be%20a%20vital%20performance%20metric.
def oneRWmse(position, prediction):
position = np.array(position)
prediction = np.array(prediction)
weights = np.array([1/(len(position)-x) for x in range(len(position))])
weights = weights/weights.sum()
weights = [[weight, weight] for weight in weights]
return (np.square(np.subtract(position, prediction))*weights).sum(axis=0)
def RWmse(position, prediction):
position = np.array(position)
prediction = np.array(prediction)
weights = np.array([x/(x+1) for x in range(1, len(position) + 1)])
weights = weights/weights.sum()
weights = [[weight, weight] for weight in weights]
return (np.square(np.subtract(position, prediction))*weights).sum(axis=0)
#mse = (1/n) * sum(actual-prediction)^2
mse_values = []
oneRWmse_values = []
RWmse_values = []
all_positions = zarr_dataset.frames[:]["ego_translation"]
for scene in scene_frame_indices:
positions = [[pos[0], pos[1]] for pos in all_positions[scene[0]:scene[1]]]
estimates, predictions = kalmanFilterPos(positions, output=False)
# calculate mean-squared error between measure:estimate and measure:prediction and store values
mse_values.append([mse(positions, predictions)[0], mse(positions, predictions)[1]])
oneRWmse_values.append([oneRWmse(positions, predictions)[0], oneRWmse(positions, predictions)[1]])
RWmse_values.append([RWmse(positions, predictions)[0], RWmse(positions, predictions)[1]])
x_error = [error[0] for error in mse_values]
y_error = [error[1] for error in mse_values]
x_error_r = [error[0] for error in oneRWmse_values]
y_error_r = [error[1] for error in oneRWmse_values]
x_error_p = [error[0] for error in RWmse_values]
y_error_p = [error[1] for error in RWmse_values]
fig, axs = plt.subplots(1, 3, figsize=(15, 5))
fig.suptitle("Mean-error histogram of 100 scenes")
ax = plt.subplot(1, 3, 1)
ax.hist([x_error, y_error], label=['x', 'y'], color=['blue', 'red'], bins=15)
ax.legend()
ax.set_title("Mean-squared error")
ax.set_xlabel("Error (m)")
ax.set_ylabel("Count")
ax.get_yaxis().get_major_formatter().set_useOffset(False)
ax1 = plt.subplot(1, 3, 2)
ax1.hist([x_error_r, y_error_r], label=['x', 'y'], color=['blue', 'red'], bins=15)
ax1.legend()
ax1.set_title("1/r weighted error")
ax1.set_xlabel("Error (m)")
ax1.set_ylabel("Count")
ax1.get_yaxis().get_major_formatter().set_useOffset(False)
ax2 = plt.subplot(1, 3, 3)
ax2.hist([x_error_p, y_error_p], label=['x', 'y'], color=['blue', 'red'], bins=15)
ax2.legend()
ax2.set_title("x/(x+1) weighted error")
ax2.set_xlabel("Error (m)")
ax2.set_ylabel("Count")
ax2.get_yaxis().get_major_formatter().set_useOffset(False)
plt.tight_layout()
plt.show()
x_error.sort()
y_error.sort()
print(f"Median error for x: {x_error[len(x_error)//2]}")
print(f"Median error for y: {y_error[len(y_error)//2]}")
mean = sum(x_error) / len(x_error)
variance = sum([((value - mean) ** 2) for value in x_error]) / len(x_error)
std_deviation = variance ** 0.5
print(f"Standard deviation for x-error: {std_deviation}")
mean = sum(y_error) / len(y_error)
variance = sum([((value - mean) ** 2) for value in y_error]) / len(y_error)
std_deviation = variance ** 0.5
print(f"Standard deviation for x-error: {std_deviation}")
Median error for x: 0.2705772985391188 Median error for y: 0.3466283203171715 Standard deviation for x-error: 0.28502242427879715 Standard deviation for x-error: 0.37554922477341096