speakersafetyd/testing/analyze.py
Hector Martin 1352408b54 Add testing scripts
Signed-off-by: Hector Martin <marcan@marcan.st>
2023-12-08 23:19:19 +09:00

235 lines
7.7 KiB
Python

import json, sys, os.path, configparser
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import butter, sosfilt, freqz
CONFDIR = os.path.join(os.path.dirname(__file__), "../conf")
# This information is not in the blackbox file
DEFAULT_AMP_GAIN = 15.50
AMP_GAIN = {
"apple,j180": 13.0,
"apple,j313": 16.0,
"apple,j274": 18.0,
"apple,j375": 18.0,
"apple,j473": 18.0,
"apple,j474": 18.0,
"apple,j475": 18.0,
}
def db(x):
return 10 ** (x / 20)
def smooth(a, n=3):
l = len(a)
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
ret = ret[n - 1:] / n
pad = l - len(ret)
return np.pad(ret, (pad//2, (pad + 1)//2), "edge")
def butter_lowpass(cutoff, fs, order=5):
return butter(order, cutoff, fs=fs, btype='low', output="sos", analog=False)
def butter_highpass(cutoff, fs, order=5):
return butter(order, cutoff, fs=fs, btype='high', output="sos", analog=False)
def butter_lowpass_filter(data, cutoff, fs, order=5):
sos = butter_lowpass(cutoff, fs, order=order)
y = sosfilt(sos, data)
return y
def butter_highpass_filter(data, cutoff, fs, order=5):
sos = butter_highpass(cutoff, fs, order=order)
y = sosfilt(sos, data)
return y
def pilot_filter(data, fs):
data = butter_lowpass_filter(data, 100, fs, 6)
return butter_highpass_filter(data, 10, fs, 3)
class Model:
def __init__(self, idx, an, name, conf):
self.idx = idx
self.an = an
self.name = name
self.conf = conf
self.tr_coil = float(conf["tr_coil"])
self.tr_magnet = float(conf["tr_magnet"])
self.tau_coil = float(conf["tau_coil"])
self.tau_magnet = float(conf["tau_magnet"])
self.t_limit = float(conf["t_limit"])
self.t_headroom = float(conf["t_headroom"])
self.z_nominal = float(conf["z_nominal"])
self.z_shunt = float(conf.get("z_shunt", 0))
self.is_scale = float(conf["is_scale"])
self.vs_scale = float(conf["vs_scale"])
self.a_t_20c = float(conf["a_t_20c"])
self.a_t_35c = float(conf["a_t_35c"])
self.is_chan = int(conf["is_chan"])
self.vs_chan = int(conf["vs_chan"])
self.t_ambient = an.fdr["t_ambient"]
self.t_coil = an.fdr["blocks"][0]["speakers"][self.idx]["t_coil"]
self.t_magnet = an.fdr["blocks"][0]["speakers"][self.idx]["t_magnet"]
self.m_x = []
self.m_t_coil_tg = []
self.m_t_coil = []
self.m_t_magnet_tg = []
self.m_t_magnet = []
self.l_x = []
self.l_t_coil = []
self.l_t_magnet = []
def run_model(self):
off = 0
t = 0
for blk in self.an.fdr["blocks"]:
sr = blk["sample_rate"]
cnt = blk["sample_count"]
data = blk["speakers"][self.idx]
isense = self.an.cvr[off:off+cnt, self.is_chan] * self.is_scale
vsense = self.an.cvr[off:off+cnt, self.vs_chan] * self.vs_scale
dt = 1 / self.an.sr
alpha_coil = dt / (dt + self.tau_coil)
alpha_magnet = dt / (dt + self.tau_magnet)
self.l_x.append(t)
self.l_t_coil.append(data["t_coil"])
self.l_t_magnet.append(data["t_magnet"])
for x, (i, v) in enumerate(zip(isense, vsense)):
self.m_x.append(t + x / sr)
p = i * v
tvc_tgt = self.t_magnet + p * self.tr_coil
self.t_coil = tvc_tgt * alpha_coil + self.t_coil * (1 - alpha_coil)
tmag_tgt = self.t_ambient + p * self.tr_magnet
self.t_magnet = tmag_tgt * alpha_magnet + self.t_magnet * (1 - alpha_magnet)
self.m_t_coil_tg.append(tvc_tgt)
self.m_t_coil.append(self.t_coil)
self.m_t_magnet_tg.append(tmag_tgt)
self.m_t_magnet.append(self.t_magnet)
t += cnt / sr
off += cnt
def analyze(self, outfile):
plt.clf()
fig, ax1 = plt.subplots(figsize=(30,15))
ax1.set_title(self.name)
ax1.set_xlabel('time (s)')
ax1.set_ylabel('temperature')
ax1.tick_params(axis='y')
ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis
color = 'tab:red'
ax2.set_ylabel('power', color=color)
ax2.tick_params(axis='y', labelcolor=color)
ax1.plot(self.m_x, self.m_t_coil, "r")
# ax1.plot(self.m_x, smooth(self.m_t_coil_tg, 1000), "y")
ax1.plot(self.m_x, self.m_t_magnet, "b")
# ax1.plot(self.m_x, smooth(self.m_t_magnet_tg, 1000), "c")
ax1.plot(self.l_x, self.l_t_coil, "om")
ax1.plot(self.l_x, self.l_t_magnet, "og")
i = self.an.cvr[:, self.is_chan] * self.is_scale
v = self.an.cvr[:, self.vs_chan] * self.vs_scale
sr = self.an.fdr["sample_rate"]
ilp = pilot_filter(i, sr)
vlp = pilot_filter(v, sr)
p = butter_lowpass_filter(i * v, 10, sr, 1)
p = smooth(p, 4000)
plp = butter_lowpass_filter(ilp * vlp, 10, sr, 1)
plp = smooth(plp, 4000)
vlprms_sq = butter_lowpass_filter(vlp * vlp, 10, sr, 1)
vlprms_sq = smooth(vlprms_sq, 4000)
r = vlprms_sq / plp
# ax2.plot(self.m_x, p, "b")
rref = np.average(r[1 * sr:2 * sr])
# Clear out the first second, since it tends to contain garbage
r[:1*sr] = rref
#r = butter_lowpass_filter(r - rref, 2, sr, 2) + rref
a = self.a_t_35c # XXX why are there two values at different temperatures?
tref = self.l_t_coil[0]
t = ((r - self.z_shunt) / (rref - self.z_shunt) - 1) / a + tref
ax1.plot(self.m_x, t, "k")
ax2.plot(self.m_x, p, "r")
# ax2.plot(self.m_x, plp, "g")
# ax2.plot(self.m_x, vlprms_sq, "b")
for level in (-1000, -6, -10, -15):
gain = AMP_GAIN.get(self.an.fdr["machine"], DEFAULT_AMP_GAIN)
pbase = (db(gain - 30) ** 2) / (self.z_nominal + self.z_shunt)
ptest = (db(gain + level) ** 2) / (self.z_nominal + self.z_shunt)
p = pbase + ptest
ax2.axhline(y=p, color='r', linestyle='--')
fig.tight_layout() # otherwise the right y-label is slightly clipped
plt.savefig(outfile)
class Analyzer:
def __init__(self, base):
self.fdr = json.load(open(base + ".fdr"))
data = open(base + ".cvr", "rb").read()
cvr = np.frombuffer(data, dtype="int16").astype("float") / 32768
maker, model = self.fdr["machine"].split(",")
cf = os.path.join(CONFDIR, maker, model + ".conf")
print(f"Using config file: {cf}")
self.conf = configparser.ConfigParser()
self.conf.read(cf)
ch = int(self.conf["Globals"]["channels"])
samples = len(cvr) // ch
self.cvr = cvr.reshape((samples, ch))
print(f"Got {samples} samples ({ch} channels)")
assert ch == self.fdr["channels"]
self.sr = self.fdr["sample_rate"]
speaker_configs = []
for key in self.conf.sections():
if not key.startswith("Speaker/"):
continue
print(key)
name = key.split("/")[1]
speaker_configs.append((name, self.conf[key]))
# Match the order that speakersafetyd uses (by group)
speaker_configs.sort(key=lambda x: int(x[1]["group"]))
self.speakers = []
for i, cfg in enumerate(speaker_configs):
self.speakers.append(Model(i, self, cfg[0], cfg[1]))
def analyze(self):
for i, spk in enumerate(self.speakers):
print(f"Processing speaker {i}")
spk.run_model()
spk.analyze(f"speaker_{i}.png")
# break
if __name__ == "__main__":
a = Analyzer(sys.argv[1])
a.analyze()