forked from zzh606/wifi-csi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_scale_csi.py
65 lines (53 loc) · 1.91 KB
/
get_scale_csi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import numpy as np
import math
from Bfee import Bfee
def get_scale_csi(csi_st):
#Pull out csi
csi = csi_st['csi']
# print(csi.shape)
#print(csi)
#Calculate the scale factor between normalized CSI and RSSI (mW)
csi_sq = np.multiply(csi, np.conj(csi)).real
csi_pwr = np.sum(csi_sq, axis=0)
csi_pwr = csi_pwr.reshape(1, csi_pwr.shape[0], -1)
rssi_pwr = dbinv(get_total_rss(csi_st))
scale = rssi_pwr / (csi_pwr / 30)
if csi_st['noise'] == -127:
noise_db = -92
else:
noise_db = csi_st['noise']
thermal_noise_pwr = dbinv(noise_db)
quant_error_pwr = scale * (csi_st['Nrx'] * csi_st['Ntx'])
total_noise_pwr = thermal_noise_pwr + quant_error_pwr
ret = csi * np.sqrt(scale / total_noise_pwr) # 正常输出的CSI
if csi_st['Ntx'] == 2:
ret = ret * math.sqrt(2)
elif csi_st['Ntx'] == 3:
ret = ret * math.sqrt(dbinv(4.5))
return ret
def get_total_rss(csi_st):
# Careful here: rssis could be zero
rssi_mag = 0
if csi_st['rssi_a'] != 0:
rssi_mag = rssi_mag + dbinv(csi_st['rssi_a'])
if csi_st['rssi_b'] != 0:
rssi_mag = rssi_mag + dbinv(csi_st['rssi_b'])
if csi_st['rssi_c'] != 0:
rssi_mag = rssi_mag + dbinv(csi_st['rssi_c'])
return db(rssi_mag, 'power') - 44 - csi_st['agc']
def dbinv(x):
return math.pow(10, x / 10)
def db(X, U):
R = 1
if 'power'.startswith(U):
assert X >= 0
else:
X = math.pow(abs(X), 2) / R
return (10 * math.log10(X) + 300) - 300
def csi_get_all(filename):
bfee = Bfee.from_file(filename, model_name_encode="gb2312")
csi_data = np.zeros([len(bfee.all_csi), np.size(bfee.all_csi, 1) * np.size(bfee.all_csi, 2)], dtype=complex)
for ii in range(len(bfee.all_csi)):
csi_temp = np.squeeze(get_scale_csi(bfee.dicts[ii]))
csi_data[ii, :] = np.concatenate([csi_temp[:,0], csi_temp[:,1], csi_temp[:,2]])
return csi_data