mirror of
https://github.com/ivabus/speakersafetyd
synced 2024-11-22 16:25:06 +03:00
236 lines
7.7 KiB
Python
236 lines
7.7 KiB
Python
|
import json, sys, os.path, configparser
|
||
|
import numpy as np
|
||
|
import matplotlib.pyplot as plt
|
||
|
from scipy.signal import butter, sosfilt, freqz
|
||
|
|
||
|
CONFDIR = os.path.join(os.path.dirname(__file__), "../conf")
|
||
|
|
||
|
# This information is not in the blackbox file
|
||
|
DEFAULT_AMP_GAIN = 15.50
|
||
|
AMP_GAIN = {
|
||
|
"apple,j180": 13.0,
|
||
|
"apple,j313": 16.0,
|
||
|
"apple,j274": 18.0,
|
||
|
"apple,j375": 18.0,
|
||
|
"apple,j473": 18.0,
|
||
|
"apple,j474": 18.0,
|
||
|
"apple,j475": 18.0,
|
||
|
}
|
||
|
|
||
|
def db(x):
|
||
|
return 10 ** (x / 20)
|
||
|
|
||
|
def smooth(a, n=3):
|
||
|
l = len(a)
|
||
|
ret = np.cumsum(a, dtype=float)
|
||
|
ret[n:] = ret[n:] - ret[:-n]
|
||
|
ret = ret[n - 1:] / n
|
||
|
pad = l - len(ret)
|
||
|
return np.pad(ret, (pad//2, (pad + 1)//2), "edge")
|
||
|
|
||
|
def butter_lowpass(cutoff, fs, order=5):
|
||
|
return butter(order, cutoff, fs=fs, btype='low', output="sos", analog=False)
|
||
|
|
||
|
def butter_highpass(cutoff, fs, order=5):
|
||
|
return butter(order, cutoff, fs=fs, btype='high', output="sos", analog=False)
|
||
|
|
||
|
def butter_lowpass_filter(data, cutoff, fs, order=5):
|
||
|
sos = butter_lowpass(cutoff, fs, order=order)
|
||
|
y = sosfilt(sos, data)
|
||
|
return y
|
||
|
|
||
|
def butter_highpass_filter(data, cutoff, fs, order=5):
|
||
|
sos = butter_highpass(cutoff, fs, order=order)
|
||
|
y = sosfilt(sos, data)
|
||
|
return y
|
||
|
|
||
|
def pilot_filter(data, fs):
|
||
|
data = butter_lowpass_filter(data, 100, fs, 6)
|
||
|
return butter_highpass_filter(data, 10, fs, 3)
|
||
|
|
||
|
class Model:
|
||
|
def __init__(self, idx, an, name, conf):
|
||
|
self.idx = idx
|
||
|
self.an = an
|
||
|
self.name = name
|
||
|
self.conf = conf
|
||
|
self.tr_coil = float(conf["tr_coil"])
|
||
|
self.tr_magnet = float(conf["tr_magnet"])
|
||
|
self.tau_coil = float(conf["tau_coil"])
|
||
|
self.tau_magnet = float(conf["tau_magnet"])
|
||
|
self.t_limit = float(conf["t_limit"])
|
||
|
self.t_headroom = float(conf["t_headroom"])
|
||
|
self.z_nominal = float(conf["z_nominal"])
|
||
|
self.z_shunt = float(conf.get("z_shunt", 0))
|
||
|
self.is_scale = float(conf["is_scale"])
|
||
|
self.vs_scale = float(conf["vs_scale"])
|
||
|
self.a_t_20c = float(conf["a_t_20c"])
|
||
|
self.a_t_35c = float(conf["a_t_35c"])
|
||
|
|
||
|
self.is_chan = int(conf["is_chan"])
|
||
|
self.vs_chan = int(conf["vs_chan"])
|
||
|
|
||
|
self.t_ambient = an.fdr["t_ambient"]
|
||
|
|
||
|
self.t_coil = an.fdr["blocks"][0]["speakers"][self.idx]["t_coil"]
|
||
|
self.t_magnet = an.fdr["blocks"][0]["speakers"][self.idx]["t_magnet"]
|
||
|
|
||
|
self.m_x = []
|
||
|
self.m_t_coil_tg = []
|
||
|
self.m_t_coil = []
|
||
|
self.m_t_magnet_tg = []
|
||
|
self.m_t_magnet = []
|
||
|
|
||
|
self.l_x = []
|
||
|
self.l_t_coil = []
|
||
|
self.l_t_magnet = []
|
||
|
|
||
|
def run_model(self):
|
||
|
off = 0
|
||
|
t = 0
|
||
|
for blk in self.an.fdr["blocks"]:
|
||
|
sr = blk["sample_rate"]
|
||
|
cnt = blk["sample_count"]
|
||
|
data = blk["speakers"][self.idx]
|
||
|
|
||
|
isense = self.an.cvr[off:off+cnt, self.is_chan] * self.is_scale
|
||
|
vsense = self.an.cvr[off:off+cnt, self.vs_chan] * self.vs_scale
|
||
|
|
||
|
dt = 1 / self.an.sr
|
||
|
alpha_coil = dt / (dt + self.tau_coil)
|
||
|
alpha_magnet = dt / (dt + self.tau_magnet)
|
||
|
|
||
|
self.l_x.append(t)
|
||
|
self.l_t_coil.append(data["t_coil"])
|
||
|
self.l_t_magnet.append(data["t_magnet"])
|
||
|
for x, (i, v) in enumerate(zip(isense, vsense)):
|
||
|
self.m_x.append(t + x / sr)
|
||
|
|
||
|
p = i * v
|
||
|
|
||
|
tvc_tgt = self.t_magnet + p * self.tr_coil
|
||
|
self.t_coil = tvc_tgt * alpha_coil + self.t_coil * (1 - alpha_coil)
|
||
|
tmag_tgt = self.t_ambient + p * self.tr_magnet
|
||
|
self.t_magnet = tmag_tgt * alpha_magnet + self.t_magnet * (1 - alpha_magnet)
|
||
|
|
||
|
self.m_t_coil_tg.append(tvc_tgt)
|
||
|
self.m_t_coil.append(self.t_coil)
|
||
|
self.m_t_magnet_tg.append(tmag_tgt)
|
||
|
self.m_t_magnet.append(self.t_magnet)
|
||
|
|
||
|
t += cnt / sr
|
||
|
off += cnt
|
||
|
|
||
|
def analyze(self, outfile):
|
||
|
plt.clf()
|
||
|
|
||
|
fig, ax1 = plt.subplots(figsize=(30,15))
|
||
|
|
||
|
ax1.set_title(self.name)
|
||
|
|
||
|
ax1.set_xlabel('time (s)')
|
||
|
ax1.set_ylabel('temperature')
|
||
|
ax1.tick_params(axis='y')
|
||
|
ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis
|
||
|
|
||
|
color = 'tab:red'
|
||
|
ax2.set_ylabel('power', color=color)
|
||
|
ax2.tick_params(axis='y', labelcolor=color)
|
||
|
|
||
|
ax1.plot(self.m_x, self.m_t_coil, "r")
|
||
|
# ax1.plot(self.m_x, smooth(self.m_t_coil_tg, 1000), "y")
|
||
|
ax1.plot(self.m_x, self.m_t_magnet, "b")
|
||
|
# ax1.plot(self.m_x, smooth(self.m_t_magnet_tg, 1000), "c")
|
||
|
ax1.plot(self.l_x, self.l_t_coil, "om")
|
||
|
ax1.plot(self.l_x, self.l_t_magnet, "og")
|
||
|
|
||
|
i = self.an.cvr[:, self.is_chan] * self.is_scale
|
||
|
v = self.an.cvr[:, self.vs_chan] * self.vs_scale
|
||
|
|
||
|
sr = self.an.fdr["sample_rate"]
|
||
|
ilp = pilot_filter(i, sr)
|
||
|
vlp = pilot_filter(v, sr)
|
||
|
|
||
|
p = butter_lowpass_filter(i * v, 10, sr, 1)
|
||
|
p = smooth(p, 4000)
|
||
|
plp = butter_lowpass_filter(ilp * vlp, 10, sr, 1)
|
||
|
plp = smooth(plp, 4000)
|
||
|
vlprms_sq = butter_lowpass_filter(vlp * vlp, 10, sr, 1)
|
||
|
vlprms_sq = smooth(vlprms_sq, 4000)
|
||
|
r = vlprms_sq / plp
|
||
|
|
||
|
# ax2.plot(self.m_x, p, "b")
|
||
|
|
||
|
rref = np.average(r[1 * sr:2 * sr])
|
||
|
|
||
|
# Clear out the first second, since it tends to contain garbage
|
||
|
r[:1*sr] = rref
|
||
|
#r = butter_lowpass_filter(r - rref, 2, sr, 2) + rref
|
||
|
a = self.a_t_35c # XXX why are there two values at different temperatures?
|
||
|
tref = self.l_t_coil[0]
|
||
|
t = ((r - self.z_shunt) / (rref - self.z_shunt) - 1) / a + tref
|
||
|
|
||
|
ax1.plot(self.m_x, t, "k")
|
||
|
ax2.plot(self.m_x, p, "r")
|
||
|
# ax2.plot(self.m_x, plp, "g")
|
||
|
# ax2.plot(self.m_x, vlprms_sq, "b")
|
||
|
|
||
|
for level in (-1000, -6, -10, -15):
|
||
|
gain = AMP_GAIN.get(self.an.fdr["machine"], DEFAULT_AMP_GAIN)
|
||
|
pbase = (db(gain - 30) ** 2) / (self.z_nominal + self.z_shunt)
|
||
|
ptest = (db(gain + level) ** 2) / (self.z_nominal + self.z_shunt)
|
||
|
p = pbase + ptest
|
||
|
ax2.axhline(y=p, color='r', linestyle='--')
|
||
|
|
||
|
fig.tight_layout() # otherwise the right y-label is slightly clipped
|
||
|
|
||
|
plt.savefig(outfile)
|
||
|
|
||
|
|
||
|
class Analyzer:
|
||
|
def __init__(self, base):
|
||
|
self.fdr = json.load(open(base + ".fdr"))
|
||
|
data = open(base + ".cvr", "rb").read()
|
||
|
cvr = np.frombuffer(data, dtype="int16").astype("float") / 32768
|
||
|
|
||
|
maker, model = self.fdr["machine"].split(",")
|
||
|
cf = os.path.join(CONFDIR, maker, model + ".conf")
|
||
|
print(f"Using config file: {cf}")
|
||
|
self.conf = configparser.ConfigParser()
|
||
|
self.conf.read(cf)
|
||
|
|
||
|
ch = int(self.conf["Globals"]["channels"])
|
||
|
samples = len(cvr) // ch
|
||
|
self.cvr = cvr.reshape((samples, ch))
|
||
|
print(f"Got {samples} samples ({ch} channels)")
|
||
|
|
||
|
assert ch == self.fdr["channels"]
|
||
|
self.sr = self.fdr["sample_rate"]
|
||
|
|
||
|
speaker_configs = []
|
||
|
|
||
|
for key in self.conf.sections():
|
||
|
if not key.startswith("Speaker/"):
|
||
|
continue
|
||
|
print(key)
|
||
|
name = key.split("/")[1]
|
||
|
speaker_configs.append((name, self.conf[key]))
|
||
|
|
||
|
# Match the order that speakersafetyd uses (by group)
|
||
|
speaker_configs.sort(key=lambda x: int(x[1]["group"]))
|
||
|
|
||
|
self.speakers = []
|
||
|
for i, cfg in enumerate(speaker_configs):
|
||
|
self.speakers.append(Model(i, self, cfg[0], cfg[1]))
|
||
|
|
||
|
def analyze(self):
|
||
|
for i, spk in enumerate(self.speakers):
|
||
|
print(f"Processing speaker {i}")
|
||
|
spk.run_model()
|
||
|
spk.analyze(f"speaker_{i}.png")
|
||
|
# break
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
a = Analyzer(sys.argv[1])
|
||
|
a.analyze()
|