From 1352408b5424761c34f65ba07cfa8d5874a35f6d Mon Sep 17 00:00:00 2001 From: Hector Martin Date: Fri, 8 Dec 2023 23:19:19 +0900 Subject: [PATCH] Add testing scripts Signed-off-by: Hector Martin --- .gitignore | 2 + testing/analyze.py | 235 ++++++++++++++++++++++++++++++++++++++ testing/make_test_file.py | 45 ++++++++ 3 files changed, 282 insertions(+) create mode 100644 testing/analyze.py create mode 100644 testing/make_test_file.py diff --git a/.gitignore b/.gitignore index 838146b..2e146fb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target *.kate-swp */target +testing/*.wav +testing/j* diff --git a/testing/analyze.py b/testing/analyze.py new file mode 100644 index 0000000..b52e354 --- /dev/null +++ b/testing/analyze.py @@ -0,0 +1,235 @@ +import json, sys, os.path, configparser +import numpy as np +import matplotlib.pyplot as plt +from scipy.signal import butter, sosfilt, freqz + +CONFDIR = os.path.join(os.path.dirname(__file__), "../conf") + +# This information is not in the blackbox file +DEFAULT_AMP_GAIN = 15.50 +AMP_GAIN = { + "apple,j180": 13.0, + "apple,j313": 16.0, + "apple,j274": 18.0, + "apple,j375": 18.0, + "apple,j473": 18.0, + "apple,j474": 18.0, + "apple,j475": 18.0, +} + +def db(x): + return 10 ** (x / 20) + +def smooth(a, n=3): + l = len(a) + ret = np.cumsum(a, dtype=float) + ret[n:] = ret[n:] - ret[:-n] + ret = ret[n - 1:] / n + pad = l - len(ret) + return np.pad(ret, (pad//2, (pad + 1)//2), "edge") + +def butter_lowpass(cutoff, fs, order=5): + return butter(order, cutoff, fs=fs, btype='low', output="sos", analog=False) + +def butter_highpass(cutoff, fs, order=5): + return butter(order, cutoff, fs=fs, btype='high', output="sos", analog=False) + +def butter_lowpass_filter(data, cutoff, fs, order=5): + sos = butter_lowpass(cutoff, fs, order=order) + y = sosfilt(sos, data) + return y + +def butter_highpass_filter(data, cutoff, fs, order=5): + sos = butter_highpass(cutoff, fs, order=order) + y = sosfilt(sos, data) + return y + +def pilot_filter(data, fs): + data = butter_lowpass_filter(data, 100, fs, 6) + return butter_highpass_filter(data, 10, fs, 3) + +class Model: + def __init__(self, idx, an, name, conf): + self.idx = idx + self.an = an + self.name = name + self.conf = conf + self.tr_coil = float(conf["tr_coil"]) + self.tr_magnet = float(conf["tr_magnet"]) + self.tau_coil = float(conf["tau_coil"]) + self.tau_magnet = float(conf["tau_magnet"]) + self.t_limit = float(conf["t_limit"]) + self.t_headroom = float(conf["t_headroom"]) + self.z_nominal = float(conf["z_nominal"]) + self.z_shunt = float(conf.get("z_shunt", 0)) + self.is_scale = float(conf["is_scale"]) + self.vs_scale = float(conf["vs_scale"]) + self.a_t_20c = float(conf["a_t_20c"]) + self.a_t_35c = float(conf["a_t_35c"]) + + self.is_chan = int(conf["is_chan"]) + self.vs_chan = int(conf["vs_chan"]) + + self.t_ambient = an.fdr["t_ambient"] + + self.t_coil = an.fdr["blocks"][0]["speakers"][self.idx]["t_coil"] + self.t_magnet = an.fdr["blocks"][0]["speakers"][self.idx]["t_magnet"] + + self.m_x = [] + self.m_t_coil_tg = [] + self.m_t_coil = [] + self.m_t_magnet_tg = [] + self.m_t_magnet = [] + + self.l_x = [] + self.l_t_coil = [] + self.l_t_magnet = [] + + def run_model(self): + off = 0 + t = 0 + for blk in self.an.fdr["blocks"]: + sr = blk["sample_rate"] + cnt = blk["sample_count"] + data = blk["speakers"][self.idx] + + isense = self.an.cvr[off:off+cnt, self.is_chan] * self.is_scale + vsense = self.an.cvr[off:off+cnt, self.vs_chan] * self.vs_scale + + dt = 1 / self.an.sr + alpha_coil = dt / (dt + self.tau_coil) + alpha_magnet = dt / (dt + self.tau_magnet) + + self.l_x.append(t) + self.l_t_coil.append(data["t_coil"]) + self.l_t_magnet.append(data["t_magnet"]) + for x, (i, v) in enumerate(zip(isense, vsense)): + self.m_x.append(t + x / sr) + + p = i * v + + tvc_tgt = self.t_magnet + p * self.tr_coil + self.t_coil = tvc_tgt * alpha_coil + self.t_coil * (1 - alpha_coil) + tmag_tgt = self.t_ambient + p * self.tr_magnet + self.t_magnet = tmag_tgt * alpha_magnet + self.t_magnet * (1 - alpha_magnet) + + self.m_t_coil_tg.append(tvc_tgt) + self.m_t_coil.append(self.t_coil) + self.m_t_magnet_tg.append(tmag_tgt) + self.m_t_magnet.append(self.t_magnet) + + t += cnt / sr + off += cnt + + def analyze(self, outfile): + plt.clf() + + fig, ax1 = plt.subplots(figsize=(30,15)) + + ax1.set_title(self.name) + + ax1.set_xlabel('time (s)') + ax1.set_ylabel('temperature') + ax1.tick_params(axis='y') + ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis + + color = 'tab:red' + ax2.set_ylabel('power', color=color) + ax2.tick_params(axis='y', labelcolor=color) + + ax1.plot(self.m_x, self.m_t_coil, "r") + # ax1.plot(self.m_x, smooth(self.m_t_coil_tg, 1000), "y") + ax1.plot(self.m_x, self.m_t_magnet, "b") + # ax1.plot(self.m_x, smooth(self.m_t_magnet_tg, 1000), "c") + ax1.plot(self.l_x, self.l_t_coil, "om") + ax1.plot(self.l_x, self.l_t_magnet, "og") + + i = self.an.cvr[:, self.is_chan] * self.is_scale + v = self.an.cvr[:, self.vs_chan] * self.vs_scale + + sr = self.an.fdr["sample_rate"] + ilp = pilot_filter(i, sr) + vlp = pilot_filter(v, sr) + + p = butter_lowpass_filter(i * v, 10, sr, 1) + p = smooth(p, 4000) + plp = butter_lowpass_filter(ilp * vlp, 10, sr, 1) + plp = smooth(plp, 4000) + vlprms_sq = butter_lowpass_filter(vlp * vlp, 10, sr, 1) + vlprms_sq = smooth(vlprms_sq, 4000) + r = vlprms_sq / plp + + # ax2.plot(self.m_x, p, "b") + + rref = np.average(r[1 * sr:2 * sr]) + + # Clear out the first second, since it tends to contain garbage + r[:1*sr] = rref + #r = butter_lowpass_filter(r - rref, 2, sr, 2) + rref + a = self.a_t_35c # XXX why are there two values at different temperatures? + tref = self.l_t_coil[0] + t = ((r - self.z_shunt) / (rref - self.z_shunt) - 1) / a + tref + + ax1.plot(self.m_x, t, "k") + ax2.plot(self.m_x, p, "r") + # ax2.plot(self.m_x, plp, "g") + # ax2.plot(self.m_x, vlprms_sq, "b") + + for level in (-1000, -6, -10, -15): + gain = AMP_GAIN.get(self.an.fdr["machine"], DEFAULT_AMP_GAIN) + pbase = (db(gain - 30) ** 2) / (self.z_nominal + self.z_shunt) + ptest = (db(gain + level) ** 2) / (self.z_nominal + self.z_shunt) + p = pbase + ptest + ax2.axhline(y=p, color='r', linestyle='--') + + fig.tight_layout() # otherwise the right y-label is slightly clipped + + plt.savefig(outfile) + + +class Analyzer: + def __init__(self, base): + self.fdr = json.load(open(base + ".fdr")) + data = open(base + ".cvr", "rb").read() + cvr = np.frombuffer(data, dtype="int16").astype("float") / 32768 + + maker, model = self.fdr["machine"].split(",") + cf = os.path.join(CONFDIR, maker, model + ".conf") + print(f"Using config file: {cf}") + self.conf = configparser.ConfigParser() + self.conf.read(cf) + + ch = int(self.conf["Globals"]["channels"]) + samples = len(cvr) // ch + self.cvr = cvr.reshape((samples, ch)) + print(f"Got {samples} samples ({ch} channels)") + + assert ch == self.fdr["channels"] + self.sr = self.fdr["sample_rate"] + + speaker_configs = [] + + for key in self.conf.sections(): + if not key.startswith("Speaker/"): + continue + print(key) + name = key.split("/")[1] + speaker_configs.append((name, self.conf[key])) + + # Match the order that speakersafetyd uses (by group) + speaker_configs.sort(key=lambda x: int(x[1]["group"])) + + self.speakers = [] + for i, cfg in enumerate(speaker_configs): + self.speakers.append(Model(i, self, cfg[0], cfg[1])) + + def analyze(self): + for i, spk in enumerate(self.speakers): + print(f"Processing speaker {i}") + spk.run_model() + spk.analyze(f"speaker_{i}.png") + # break + +if __name__ == "__main__": + a = Analyzer(sys.argv[1]) + a.analyze() diff --git a/testing/make_test_file.py b/testing/make_test_file.py new file mode 100644 index 0000000..c1201d0 --- /dev/null +++ b/testing/make_test_file.py @@ -0,0 +1,45 @@ +#!/usr/bin/python +import scipy, sys +import numpy as np + +ch = int(sys.argv[1]) +out = sys.argv[2] + +FS = 48000 +PILOT_DB = -30 +PILOT_FREQ = 43 +TEST0 = (500, 1, -10) +TEST1 = (500, 10, -15) +TEST2 = (43, 1.5, -6) +TEST3 = (1000, 1.5, -6) + +def db(x): + return 10 ** (x / 20) + +def silence(t): + return np.zeros(int(FS * t)) + +def sine(f, t, v): + space = np.linspace(0, t, int(FS * t), endpoint=False) + return np.sin(2 * np.pi * f * space) * db(v) + +signal = np.concatenate(( + silence(3), + sine(*TEST0), + silence(2), + sine(*TEST1), + silence(2), + sine(*TEST2), + silence(2), + sine(*TEST3), + silence(5) +)) + +space = np.linspace(0, len(signal) / FS, len(signal), endpoint=False) +signal += np.sin(2 * np.pi * PILOT_FREQ * space) * db(PILOT_DB) + +signal = np.concatenate((silence(60), signal)) + +signal = np.repeat(signal, ch).reshape((-1, ch)) + +scipy.io.wavfile.write(out, FS, signal.astype("float32"))