Add metadata streaming using MessagePack and lofty

Signed-off-by: Ivan Bushchik <ivabus@ivabus.dev>
This commit is contained in:
Ivan Bushchik 2024-03-13 20:48:10 +03:00
parent fd780cf28b
commit c6b9bdcf54
No known key found for this signature in database
GPG key ID: 2F16FBF3262E090C
6 changed files with 503 additions and 369 deletions

545
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -3,8 +3,8 @@ members = ["monoclient", "platform/swiftui/monolib"]
[package] [package]
name = "lonelyradio" name = "lonelyradio"
description = "TCP radio for singles" description = "TCP radio for lonely ones"
version = "0.1.7" version = "0.2.0"
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"
authors = ["Ivan Bushchik <ivabus@ivabus.dev>"] authors = ["Ivan Bushchik <ivabus@ivabus.dev>"]
@ -29,7 +29,10 @@ symphonia = { version = "0.5.4", features = [
] } ] }
samplerate = "0.2.4" samplerate = "0.2.4"
chrono = "0.4" chrono = "0.4"
rmp-serde = "1.1.2"
serde = { version = "1.0.197", features = ["derive"] }
lofty = "0.18.2"
[profile.release] [profile.release]
opt-level = 3 opt-level = 3
lto = true #lto = true

View file

@ -25,24 +25,12 @@ All files (recursively) will be shuffled and played back. Public log will be dis
[monoclient](./monoclient) with optional channel separation, hardcoded input (16/44.1/LE). [monoclient](./monoclient) with optional channel separation, hardcoded input (16/44.1/LE).
```shell ```shell
monoclient <SERVER>:<PORT> s monoclient <SERVER>:<PORT>
```
FFplay (from FFmpeg)
```shell
nc <SERVER> <PORT> | ffplay -f s16le -vn -ac 2 -ar 44100 -nodisp -autoexit -
```
MPV
```shell
nc <SERVER> <PORT> | mpv --audio-channels=stereo --audio-samplerate=44100 --demuxer-rawaudio-format=s16le --demuxer=rawaudio -
``` ```
### Other clients ### Other clients
SwiftUI client is availible in [platform](./platform) directory. SwiftUI client is availible in [platform](./platform) directory (not yet adapted for lonelyradio 0.2).
## License ## License

View file

@ -1,6 +1,6 @@
[package] [package]
name = "monoclient" name = "monoclient"
version = "0.1.0" version = "0.2.0"
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"
@ -8,3 +8,5 @@ license = "MIT"
clap = { version = "4.4.18", features = ["derive"] } clap = { version = "4.4.18", features = ["derive"] }
rodio = { version = "0.17.3", default-features = false } rodio = { version = "0.17.3", default-features = false }
byteorder = "1.5.0" byteorder = "1.5.0"
rmp-serde = "1.1.2"
serde = { version = "1.0.197", features = ["derive"] }

View file

@ -2,9 +2,9 @@ use byteorder::ByteOrder;
use clap::Parser; use clap::Parser;
use rodio::buffer::SamplesBuffer; use rodio::buffer::SamplesBuffer;
use rodio::{OutputStream, Sink}; use rodio::{OutputStream, Sink};
use std::io::Read; use serde::Deserialize;
use std::io::{Read, Write};
use std::net::TcpStream; use std::net::TcpStream;
use std::time::Instant;
// How many samples to cache before playing in samples (both channels) SHOULD BE EVEN // How many samples to cache before playing in samples (both channels) SHOULD BE EVEN
const BUFFER_SIZE: usize = 2400; const BUFFER_SIZE: usize = 2400;
@ -17,6 +17,15 @@ enum Channel {
Stereo, Stereo,
} }
#[derive(Deserialize, Debug)]
struct SentMetadata {
// In bytes, we need to read next track metadata
lenght: u64,
title: String,
album: String,
artist: String,
}
#[derive(Parser)] #[derive(Parser)]
struct Args { struct Args {
/// Remote address /// Remote address
@ -28,22 +37,15 @@ struct Args {
/// Play only on specified channel, with it if channel = Right => L=0 and R=R, without L=R and R=R. No effect on Stereo /// Play only on specified channel, with it if channel = Right => L=0 and R=R, without L=R and R=R. No effect on Stereo
single: bool, single: bool,
/// More verbose /// Do not erase previously played track from stdout
#[arg(short)] #[arg(short)]
verbose: bool, no_backspace: bool,
} }
fn main() { fn main() {
let start = Instant::now();
let args = Args::parse(); let args = Args::parse();
let mut stream = TcpStream::connect(args.address).unwrap(); let mut stream = TcpStream::connect(args.address).unwrap();
if args.verbose { println!("Connected to {} from {}", stream.peer_addr().unwrap(), stream.local_addr().unwrap());
eprintln!(
"Connected to {} from {}",
stream.peer_addr().unwrap(),
stream.local_addr().unwrap()
)
}
let channel = match args.channel.to_ascii_lowercase().as_str() { let channel = match args.channel.to_ascii_lowercase().as_str() {
"l" => Channel::Left, "l" => Channel::Left,
@ -55,9 +57,38 @@ fn main() {
let sink = Sink::try_new(&stream_handle).unwrap(); let sink = Sink::try_new(&stream_handle).unwrap();
let mut buffer = [0u8; 4]; let mut buffer = [0u8; 4];
let mut samples = [0f32; BUFFER_SIZE]; let mut samples = [0f32; BUFFER_SIZE];
let mut latest_msg_len = 0;
print!("Playing: ");
loop {
let mut index = 0usize; let mut index = 0usize;
let mut first = true;
while stream.read_exact(&mut buffer).is_ok() { let md: SentMetadata = rmp_serde::from_read(&stream).unwrap();
let seconds = md.lenght / (2 * 44100);
let message = format!(
"{} - {} - {} ({}:{:02})",
md.artist,
md.album,
md.title,
seconds / 60,
seconds % 60
);
if latest_msg_len != 0 {
if args.no_backspace {
print!("\nPlaying: ");
} else {
print!("{}", "\u{8}".repeat(latest_msg_len));
print!("{}", " ".repeat(latest_msg_len));
print!("{}", "\u{8}".repeat(latest_msg_len));
}
}
print!("{}", message);
std::io::stdout().flush().unwrap();
latest_msg_len = message.chars().count();
for _ in 0..md.lenght / 2 {
if stream.read_exact(&mut buffer).is_err() {
return;
};
let sample_l = byteorder::LittleEndian::read_i16(&buffer[..2]) as f32 / 32768.0; let sample_l = byteorder::LittleEndian::read_i16(&buffer[..2]) as f32 / 32768.0;
let sample_r = byteorder::LittleEndian::read_i16(&buffer[2..]) as f32 / 32768.0; let sample_r = byteorder::LittleEndian::read_i16(&buffer[2..]) as f32 / 32768.0;
// Left channel // Left channel
@ -85,16 +116,11 @@ fn main() {
}; };
index += 1; index += 1;
if index == BUFFER_SIZE { if index == BUFFER_SIZE {
let mut first_wait_iteration = true;
// Sink's thread is detached from main thread, so we need to synchronize with it // Sink's thread is detached from main thread, so we need to synchronize with it
// Why we should synchronize with it? // Why we should synchronize with it?
// Let's say, that if we don't synchronize with it, we would have // Let's say, that if we don't synchronize with it, we would have
// a lot (no upper limit, actualy) of buffered sound, waiting for playing in sink // a lot (no upper limit, actualy) of buffered sound, waiting for playing in sink
while sink.len() >= CACHE_SIZE { while sink.len() >= CACHE_SIZE {
if args.verbose && first_wait_iteration {
eprint!(".");
first_wait_iteration = false;
}
// Sleeping exactly one buffer // Sleeping exactly one buffer
std::thread::sleep(std::time::Duration::from_secs_f32( std::thread::sleep(std::time::Duration::from_secs_f32(
(if sink.len() >= 2 { (if sink.len() >= 2 {
@ -105,12 +131,12 @@ fn main() {
/ 44100.0 / 2.0, / 44100.0 / 2.0,
)) ))
} }
if first && args.verbose {
eprintln!("Started playing in {} ms", (Instant::now() - start).as_millis());
first = false;
}
sink.append(SamplesBuffer::new(2, 44100, samples.as_slice())); sink.append(SamplesBuffer::new(2, 44100, samples.as_slice()));
index = 0; index = 0;
} }
} }
while sink.len() != 0 {
std::thread::sleep(std::time::Duration::from_secs_f32(0.01))
}
}
} }

View file

@ -1,13 +1,18 @@
use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use chrono::Local; use chrono::Local;
use clap::Parser; use clap::Parser;
use lofty::Accessor;
use lofty::TaggedFileExt;
use rand::prelude::*; use rand::prelude::*;
use samplerate::ConverterType; use samplerate::ConverterType;
use serde::Serialize;
use symphonia::core::audio::SampleBuffer; use symphonia::core::audio::SampleBuffer;
use symphonia::core::codecs::CODEC_TYPE_NULL; use symphonia::core::codecs::CODEC_TYPE_NULL;
use symphonia::core::io::MediaSourceStream; use symphonia::core::io::MediaSourceStream;
use symphonia::core::meta::StandardTagKey;
use symphonia::core::probe::Hint; use symphonia::core::probe::Hint;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
@ -26,6 +31,51 @@ struct Args {
war: bool, war: bool,
} }
#[derive(Serialize)]
struct SentMetadata {
// In bytes, we need to read next track metadata
lenght: u64,
title: String,
album: String,
artist: String,
}
async fn stream_samples(
track_samples: Vec<f32>,
rate: u32,
war: bool,
md: SentMetadata,
s: &mut TcpStream,
) -> bool {
let resampled = if rate != 44100 {
samplerate::convert(rate, 44100, 2, ConverterType::Linear, track_samples.as_slice())
.unwrap()
} else {
track_samples
};
let mut md = md;
md.lenght = resampled.len() as u64;
if s.write_all(rmp_serde::to_vec(&md).unwrap().as_slice()).await.is_err() {
return true;
}
for sample in resampled {
if s.write_all(
&(if war {
sample.signum() as i16 * 32767
} else {
(sample * 32768_f32) as i16
}
.to_le_bytes()),
)
.await
.is_err()
{
return true;
};
}
false
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let listener = TcpListener::bind(Args::parse().address).await.unwrap(); let listener = TcpListener::bind(Args::parse().address).await.unwrap();
@ -35,8 +85,7 @@ async fn main() {
.filter_entry(is_not_hidden) .filter_entry(is_not_hidden)
.filter_map(|v| v.ok()) .filter_map(|v| v.ok())
.map(|x| x.into_path()) .map(|x| x.into_path())
.filter(track_valid) .filter(|x| track_valid(x))
.into_iter()
.collect::<Vec<PathBuf>>(), .collect::<Vec<PathBuf>>(),
); );
loop { loop {
@ -48,7 +97,7 @@ fn is_not_hidden(entry: &DirEntry) -> bool {
entry.file_name().to_str().map(|s| entry.depth() == 0 || !s.starts_with('.')).unwrap_or(false) entry.file_name().to_str().map(|s| entry.depth() == 0 || !s.starts_with('.')).unwrap_or(false)
} }
fn track_valid(track: &PathBuf) -> bool { fn track_valid(track: &Path) -> bool {
if !track.metadata().unwrap().is_file() { if !track.metadata().unwrap().is_file() {
return false; return false;
} }
@ -63,12 +112,24 @@ fn track_valid(track: &PathBuf) -> bool {
async fn stream(mut s: TcpStream, tracklist: Arc<Vec<PathBuf>>) { async fn stream(mut s: TcpStream, tracklist: Arc<Vec<PathBuf>>) {
let args = Args::parse(); let args = Args::parse();
'track: loop { loop {
let track = tracklist.choose(&mut thread_rng()).unwrap(); let track = tracklist.choose(&mut thread_rng()).unwrap();
let mut title = String::new();
let mut artist = String::new();
let mut album = String::new();
let mut file = std::fs::File::open(track).unwrap();
let tagged = lofty::read_from(&mut file).unwrap();
if let Some(id3v2) = tagged.primary_tag() {
title = id3v2.title().unwrap_or("[No tag]".into()).to_string();
album = id3v2.album().unwrap_or("[No tag]".into()).to_string();
artist = id3v2.artist().unwrap_or("[No tag]".into()).to_string()
};
let track_message = format!("{} - {} - {}", &artist, &album, &title);
eprintln!( eprintln!(
"[{}] {} to {}:{}{}", "[{}] {} to {}:{}{}",
Local::now().to_rfc3339(), Local::now().to_rfc3339(),
track.to_str().unwrap(), track_message,
s.peer_addr().unwrap().ip(), s.peer_addr().unwrap().ip(),
s.peer_addr().unwrap().port(), s.peer_addr().unwrap().port(),
if args.war { if args.war {
@ -118,15 +179,27 @@ async fn stream(mut s: TcpStream, tracklist: Arc<Vec<PathBuf>>) {
.expect("unsupported codec"); .expect("unsupported codec");
let track_id = track.id; let track_id = track.id;
let mut track_rate = 0;
let mut samples = vec![];
loop { loop {
let packet = match format.next_packet() { let packet = match format.next_packet() {
Ok(packet) => packet, Ok(packet) => packet,
_ => continue 'track, _ => break,
}; };
while !format.metadata().is_latest() { while !format.metadata().is_latest() {
format.metadata().pop(); format.metadata().pop();
if let Some(rev) = format.metadata().current() {
for tag in rev.tags() {
println!("Looped");
match tag.std_key {
Some(StandardTagKey::Album) => album = tag.value.to_string(),
Some(StandardTagKey::Artist) => artist = tag.value.to_string(),
Some(StandardTagKey::TrackTitle) => title = tag.value.to_string(),
_ => {}
}
eprintln!("DBG: {} {} {}", &album, &artist, &title)
}
}
} }
if packet.track_id() != track_id { if packet.track_id() != track_id {
@ -135,47 +208,36 @@ async fn stream(mut s: TcpStream, tracklist: Arc<Vec<PathBuf>>) {
match decoder.decode(&packet) { match decoder.decode(&packet) {
Ok(decoded) => { Ok(decoded) => {
let rate = decoded.spec().rate; track_rate = decoded.spec().rate;
let mut byte_buf = let mut byte_buf =
SampleBuffer::<f32>::new(decoded.capacity() as u64, *decoded.spec()); SampleBuffer::<f32>::new(decoded.capacity() as u64, *decoded.spec());
byte_buf.copy_interleaved_ref(decoded); byte_buf.copy_interleaved_ref(decoded);
let samples = if rate != 44100 { samples.append(&mut byte_buf.samples_mut().to_vec());
samplerate::convert(
rate,
44100,
2,
ConverterType::Linear,
byte_buf.samples(),
)
.unwrap()
} else {
byte_buf.samples().to_vec()
};
for sample in samples {
let result = s
.write(
&(if args.war {
sample.signum() as i16 * 32767
} else {
(sample * 32768_f32) as i16
})
.to_le_bytes(),
)
.await;
match result {
Err(_) | Ok(0) => {
return;
}
_ => (),
};
}
continue; continue;
} }
_ => { _ => {
// Handling any error as track skip // Handling any error as track skip
continue 'track; continue;
} }
} }
} }
if !samples.is_empty() {
if stream_samples(
samples,
track_rate,
args.war,
SentMetadata {
lenght: 0,
title,
album,
artist,
},
&mut s,
)
.await
{
break;
}
}
} }
} }