From f3f21d8fc85239298f4ed19c52d4a7d482a83d55 Mon Sep 17 00:00:00 2001 From: Ivan Bushchik Date: Sun, 24 Mar 2024 13:24:48 +0300 Subject: [PATCH] 0.3.0: send audio by fragments, use crossterm in monoclient Add option (`-m --max-samplerate N`) to resample tracks if their samplerate exceeds N Signed-off-by: Ivan Bushchik --- Cargo.lock | 253 ++++++++++++++++++++++++++++++- Cargo.toml | 11 +- monoclient/Cargo.toml | 7 +- monoclient/src/main.rs | 102 ++++--------- monolib/Cargo.toml | 2 +- monolib/src/lib.rs | 69 ++++----- platform/{ => swiftui}/README.md | 0 src/decode.rs | 152 +++++++++++++++++++ src/main.rs | 163 +++++++++----------- 9 files changed, 551 insertions(+), 208 deletions(-) rename platform/{ => swiftui}/README.md (100%) create mode 100644 src/decode.rs diff --git a/Cargo.lock b/Cargo.lock index acbb622..75f3a55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -116,6 +116,28 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -289,6 +311,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +[[package]] +name = "cmake" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -363,6 +394,31 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossterm" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" +dependencies = [ + "bitflags 2.4.2", + "crossterm_winapi", + "libc", + "mio", + "parking_lot", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", +] + [[package]] name = "dasp_sample" version = "0.11.0" @@ -412,6 +468,49 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-core", + "futures-macro", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "getrandom" version = "0.2.12" @@ -563,6 +662,25 @@ dependencies = [ "windows-targets 0.52.4", ] +[[package]] +name = "libsamplerate-sys" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28853b399f78f8281cd88d333b54a63170c4275f6faea66726a2bea5cca72e0d" +dependencies = [ + "cmake", +] + +[[package]] +name = "lock_api" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "lofty" version = "0.18.2" @@ -597,16 +715,20 @@ checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "lonelyradio" -version = "0.2.2" +version = "0.3.0" dependencies = [ + "async-stream", "chrono", "clap", + "futures-util", "lofty", "rand", "rmp-serde", + "samplerate", "serde", "symphonia", "tokio", + "tokio-stream", "walkdir", ] @@ -647,21 +769,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.48.0", ] [[package]] name = "monoclient" -version = "0.2.2" +version = "0.3.0" dependencies = [ "clap", + "crossterm", "monolib", ] [[package]] name = "monolib" -version = "0.2.2" +version = "0.3.0" dependencies = [ "byteorder", "rmp-serde", @@ -824,6 +948,29 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.48.5", +] + [[package]] name = "paste" version = "1.0.14" @@ -836,6 +983,12 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.30" @@ -914,6 +1067,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "regex" version = "1.10.3" @@ -1010,6 +1172,21 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "samplerate" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e032b2b24715c4f982f483ea3abdb3c9ba444d9f63e87b2843d6f998f5ba2698" +dependencies = [ + "libsamplerate-sys", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.197" @@ -1036,6 +1213,51 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + [[package]] name = "socket2" version = "0.5.6" @@ -1313,6 +1535,31 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-util" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml_datetime" version = "0.6.5" diff --git a/Cargo.toml b/Cargo.toml index 0b86411..cf39a14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [workspace] -members = [ "monoclient", "monolib"] +members = ["monoclient", "monolib"] [package] name = "lonelyradio" description = "TCP radio for lonely ones" -version = "0.2.2" +version = "0.3.0" edition = "2021" license = "MIT" authors = ["Ivan Bushchik "] @@ -14,6 +14,7 @@ repository = "https://github.com/ivabus/lonelyradio" rand = "0.8.5" clap = { version = "4.4.18", features = ["derive"] } tokio = { version = "1.35.1", features = [ + "sync", "fs", "io-util", "net", @@ -27,12 +28,14 @@ symphonia = { version = "0.5.4", features = [ "all-formats", "opt-simd", ] } -#samplerate = "0.2.4" chrono = "0.4" rmp-serde = "1.1.2" serde = { version = "1.0.197", features = ["derive"] } lofty = "0.18.2" +async-stream = "0.3.5" +tokio-stream = { version = "0.1.15", features = ["sync"] } +futures-util = "0.3.30" +samplerate = "0.2.4" [profile.release] opt-level = 3 -#lto = true diff --git a/monoclient/Cargo.toml b/monoclient/Cargo.toml index fb94397..651efe8 100644 --- a/monoclient/Cargo.toml +++ b/monoclient/Cargo.toml @@ -1,9 +1,10 @@ [package] name = "monoclient" -version = "0.2.2" +version = "0.3.0" edition = "2021" license = "MIT" [dependencies] -monolib = { path="../monolib" } -clap = { version = "4.4.18", features = ["derive"] } \ No newline at end of file +monolib = { path = "../monolib" } +clap = { version = "4.4.18", features = ["derive"] } +crossterm = "0.27.0" diff --git a/monoclient/src/main.rs b/monoclient/src/main.rs index 43183e1..8121f73 100644 --- a/monoclient/src/main.rs +++ b/monoclient/src/main.rs @@ -1,5 +1,8 @@ use clap::Parser; -use std::io::{IsTerminal, Write}; +use crossterm::cursor::MoveToColumn; +use crossterm::style::Print; +use crossterm::terminal::{Clear, ClearType}; +use std::io::{stdout, IsTerminal}; use std::time::{Duration, Instant}; #[derive(Parser)] @@ -12,98 +15,59 @@ struct Args { no_backspace: bool, } -fn delete_chars(n: usize, nb: bool) { - if !nb { - print!("{}{}{}", "\u{8}".repeat(n), " ".repeat(n), "\u{8}".repeat(n)); - std::io::stdout().flush().expect("Failed to flush stdout") - } else { - println!() - } -} - -fn flush() { - std::io::stdout().flush().unwrap(); -} - fn main() { let mut args = Args::parse(); args.no_backspace |= !std::io::stdout().is_terminal(); std::thread::spawn(move || monolib::run(&args.address)); while monolib::get_metadata().is_none() {} let mut md = monolib::get_metadata().unwrap(); - let seconds = md.length / md.sample_rate as u64 / 2; let mut track_start = Instant::now(); let mut seconds_past = 0; - let mut msg_len = format!( - "Playing: {} - {} - {} ({}:{:02})", - md.artist, - md.album, - md.title, - seconds / 60, - seconds % 60 + crossterm::execute!( + stdout(), + Print(format!( + "Playing: {} - {} - {} ({}:{:02})", + md.artist, + md.album, + md.title, + md.track_length_secs / 60, + md.track_length_secs % 60 + )) ) - .len(); - print!( - "Playing: {} - {} - {} ({}:{:02})", - md.artist, - md.album, - md.title, - seconds / 60, - seconds % 60 - ); - flush(); + .unwrap(); loop { if monolib::get_metadata().unwrap() != md { md = monolib::get_metadata().unwrap(); - let seconds = md.length / md.sample_rate as u64 / 2; - delete_chars(msg_len, args.no_backspace); - msg_len = format!( - "Playing: {} - {} - {} ({}:{:02})", - md.artist, - md.album, - md.title, - seconds / 60, - seconds % 60 - ) - .len(); + crossterm::execute!(stdout(), Clear(ClearType::CurrentLine), MoveToColumn(0)).unwrap(); print!( "Playing: {} - {} - {} (0:00 / {}:{:02})", md.artist, md.album, md.title, - seconds / 60, - seconds % 60 + md.track_length_secs / 60, + md.track_length_secs % 60 ); - flush(); track_start = Instant::now(); seconds_past = 0; } if (Instant::now() - track_start).as_secs() > seconds_past && !args.no_backspace { seconds_past = (Instant::now() - track_start).as_secs(); - msg_len = format!( - "Playing: {} - {} - {} ({}:{:02} / {}:{:02})", - md.artist, - md.album, - md.title, - seconds_past / 60, - seconds_past % 60, - seconds / 60, - seconds % 60 + crossterm::execute!(stdout(), Clear(ClearType::CurrentLine), MoveToColumn(0)).unwrap(); + crossterm::execute!( + stdout(), + Print(format!( + "Playing: {} - {} - {} ({}:{:02} / {}:{:02})", + md.artist, + md.album, + md.title, + seconds_past / 60, + seconds_past % 60, + md.track_length_secs / 60, + md.track_length_secs % 60 + )) ) - .len(); - delete_chars(msg_len, args.no_backspace); - print!( - "Playing: {} - {} - {} ({}:{:02} / {}:{:02})", - md.artist, - md.album, - md.title, - seconds_past / 60, - seconds_past % 60, - seconds / 60, - seconds % 60 - ); - flush(); + .unwrap(); } - std::thread::sleep(Duration::from_secs_f32(0.05)) + std::thread::sleep(Duration::from_secs_f32(0.25)) } } diff --git a/monolib/Cargo.toml b/monolib/Cargo.toml index eeae746..447d75c 100644 --- a/monolib/Cargo.toml +++ b/monolib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "monolib" -version = "0.2.2" +version = "0.3.0" edition = "2021" license = "MIT" description = "A library implementing the lonely radio audio streaming protocol" diff --git a/monolib/src/lib.rs b/monolib/src/lib.rs index f07b931..5420293 100644 --- a/monolib/src/lib.rs +++ b/monolib/src/lib.rs @@ -26,10 +26,7 @@ use std::net::TcpStream; use std::sync::RwLock; use std::time::Instant; -// How many samples to cache before playing in samples (both channels) SHOULD BE EVEN -const BUFFER_SIZE: usize = 4800; -// How many buffers to cache -const CACHE_SIZE: usize = 160; +const CACHE_SIZE: usize = 50; static SINK: RwLock> = RwLock::new(None); static MD: RwLock> = RwLock::new(None); @@ -48,8 +45,13 @@ pub enum State { /// Track metadata #[derive(Deserialize, Clone, Debug, PartialEq)] pub struct Metadata { - /// In samples, length / (sample_rate * 2 (channels)) = length in seconds + /// Fragment length pub length: u64, + /// Total track length + pub track_length_secs: u64, + pub track_length_frac: f32, + pub channels: u16, + // Yep, no more interpolation pub sample_rate: u32, pub title: String, pub album: String, @@ -147,9 +149,8 @@ pub fn run(server: &str) { drop(sink); let mut buffer = [0u8; 2]; - let mut samples = [0f32; BUFFER_SIZE]; + let mut samples = Vec::with_capacity(8192); loop { - let mut index = 0usize; let recv_md: Metadata = rmp_serde::from_read(&stream).expect("Failed to parse track metadata"); @@ -168,35 +169,35 @@ pub fn run(server: &str) { if stream.read_exact(&mut buffer).is_err() { return; }; - - samples[index] = byteorder::LittleEndian::read_i16(&buffer[..2]) as f32 / 32768.0; - index += 1; - - if index == BUFFER_SIZE { - // Sink's thread is detached from main thread, so we need to synchronize with it - // Why we should synchronize with it? - // Let's say, that if we don't synchronize with it, we would have - // a lot (no upper limit, actualy) of buffered sound, waiting for playing in sink - let sink = SINK.read().unwrap(); - if let Some(sink) = sink.as_ref() { - while sink.len() >= CACHE_SIZE { - // Sleeping exactly one buffer and watching for reset signal - if watching_sleep( - if sink.len() > 2 { - sink.len() as f32 - 2.0 - } else { - 0.5 - } * BUFFER_SIZE as f32 / recv_md.sample_rate as f32 - / 2.0, - ) { - _stop(); - return; - } - } - sink.append(SamplesBuffer::new(2, recv_md.sample_rate, samples.as_slice())); - index = 0; + samples.push(byteorder::LittleEndian::read_i16(&buffer[..2]) as f32 / 32768.0); + } + // Sink's thread is detached from main thread, so we need to synchronize with it + // Why we should synchronize with it? + // Let's say, that if we don't synchronize with it, we would have + // a lot (no upper limit, actualy) of buffered sound, waiting for playing in sink + let sink = SINK.read().unwrap(); + if let Some(sink) = sink.as_ref() { + while sink.len() >= CACHE_SIZE { + // Sleeping exactly one buffer and watching for reset signal + if watching_sleep( + if sink.len() > 2 { + sink.len() as f32 - 2.0 + } else { + 0.25 + } * recv_md.length as f32 + / recv_md.sample_rate as f32 + / 2.0, + ) { + _stop(); + return; } } + sink.append(SamplesBuffer::new( + recv_md.channels, + recv_md.sample_rate, + samples.as_slice(), + )); + samples.clear(); } } } diff --git a/platform/README.md b/platform/swiftui/README.md similarity index 100% rename from platform/README.md rename to platform/swiftui/README.md diff --git a/src/decode.rs b/src/decode.rs new file mode 100644 index 0000000..3a4e814 --- /dev/null +++ b/src/decode.rs @@ -0,0 +1,152 @@ +use std::path::{Path, PathBuf}; + +use clap::Parser; +use symphonia::core::audio::SampleBuffer; +use symphonia::core::codecs::CODEC_TYPE_NULL; +use symphonia::core::io::MediaSourceStream; +use symphonia::core::probe::Hint; +use symphonia::core::units::Time; + +use crate::Args; + +pub async fn get_meta(file_path: &Path) -> (u16, u32, Time) { + let file = Box::new(std::fs::File::open(file_path).unwrap()); + let mut hint = Hint::new(); + hint.with_extension(file_path.extension().unwrap().to_str().unwrap()); + + let probed = symphonia::default::get_probe() + .format( + &hint, + MediaSourceStream::new(file, Default::default()), + &Default::default(), + &Default::default(), + ) + .expect("unsupported format"); + + let mut format = probed.format; + + let track = format + .tracks() + .iter() + .find(|t| t.codec_params.codec != CODEC_TYPE_NULL) + .expect("no supported audio tracks"); + + let mut decoder = symphonia::default::get_codecs() + .make(&track.codec_params, &Default::default()) + .expect("unsupported codec"); + let track_id = track.id; + let mut channels = 2u16; + let mut sample_rate = 0; + let track_length = + track.codec_params.time_base.unwrap().calc_time(track.codec_params.n_frames.unwrap()); + loop { + let packet = match format.next_packet() { + Ok(packet) => packet, + _ => break, + }; + + if packet.track_id() != track_id { + continue; + } + + match decoder.decode(&packet) { + Ok(decoded) => { + channels = decoded.spec().channels.count().try_into().unwrap(); + sample_rate = decoded.spec().rate; + break; + } + _ => { + // Handling any error as track skip + continue; + } + } + } + let args = Args::parse(); + + ( + channels, + if sample_rate > args.max_samplerate { + args.max_samplerate + } else { + sample_rate + }, + track_length, + ) +} + +/// Getting samples +pub async fn decode_file(file_path: PathBuf, tx: tokio::sync::mpsc::Sender>) { + let args = Args::parse(); + let file = Box::new(std::fs::File::open(&file_path).unwrap()); + let mut hint = Hint::new(); + hint.with_extension(file_path.extension().unwrap().to_str().unwrap()); + + let probed = symphonia::default::get_probe() + .format( + &hint, + MediaSourceStream::new(file, Default::default()), + &Default::default(), + &Default::default(), + ) + .expect("unsupported format"); + + let mut format = probed.format; + + let track = format + .tracks() + .iter() + .find(|t| t.codec_params.codec != CODEC_TYPE_NULL) + .expect("no supported audio tracks"); + + let mut decoder = symphonia::default::get_codecs() + .make(&track.codec_params, &Default::default()) + .expect("unsupported codec"); + let track_id = track.id; + loop { + let packet = match format.next_packet() { + Ok(packet) => packet, + _ => break, + }; + + if packet.track_id() != track_id { + continue; + } + + match decoder.decode(&packet) { + Ok(decoded) => { + if decoded.spec().rate > args.max_samplerate { + let spec = *decoded.spec(); + let mut byte_buf = + SampleBuffer::::new(decoded.capacity() as u64, *decoded.spec()); + byte_buf.copy_interleaved_ref(decoded); + + tx.send( + samplerate::convert( + spec.rate, + args.max_samplerate, + spec.channels.count(), + samplerate::ConverterType::Linear, + byte_buf.samples(), + ) + .unwrap() + .iter() + .map(|x| (*x * 32767.0) as i16) + .collect(), + ) + .await + .unwrap(); + } else { + let mut byte_buf = + SampleBuffer::::new(decoded.capacity() as u64, *decoded.spec()); + byte_buf.copy_interleaved_ref(decoded); + tx.send(byte_buf.samples().to_vec()).await.unwrap(); + } + continue; + } + _ => { + // Handling any error as track skip + continue; + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 7819c29..eab06a1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,26 @@ +mod decode; + use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use chrono::Local; use clap::Parser; +use futures_util::pin_mut; use lofty::Accessor; use lofty::TaggedFileExt; use rand::prelude::*; use serde::Serialize; -use symphonia::core::audio::SampleBuffer; -use symphonia::core::codecs::CODEC_TYPE_NULL; -use symphonia::core::io::MediaSourceStream; -use symphonia::core::probe::Hint; use tokio::io::AsyncWriteExt; use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::mpsc; +use tokio_stream::Stream; +use tokio_stream::StreamExt; use walkdir::DirEntry; +use crate::decode::decode_file; +use crate::decode::get_meta; + #[derive(Parser)] struct Args { dir: PathBuf, @@ -27,13 +32,19 @@ struct Args { #[arg(short, long)] war: bool, + + #[arg(short, long, default_value = "96000")] + max_samplerate: u32, } -#[derive(Serialize)] +#[derive(Serialize, Clone)] struct SentMetadata { - // In bytes, we need to read next track metadata + /// Fragment length length: u64, - // Yep, no more interpolation + /// Total track length + track_length_secs: u64, + track_length_frac: f32, + channels: u16, sample_rate: u32, title: String, album: String, @@ -41,29 +52,34 @@ struct SentMetadata { } async fn stream_samples( - track_samples: Vec, + samples_stream: impl Stream>, war: bool, md: SentMetadata, s: &mut TcpStream, ) -> bool { - if s.write_all(rmp_serde::to_vec(&md).unwrap().as_slice()).await.is_err() { - return true; - } + pin_mut!(samples_stream); - for sample in track_samples { - if s.write_all( - &(if war { - sample.signum() * 32767 - } else { - sample - } - .to_le_bytes()), - ) - .await - .is_err() - { + while let Some(samples) = samples_stream.next().await { + let mut md = md.clone(); + md.length = samples.len() as u64; + if s.write_all(rmp_serde::to_vec(&md).unwrap().as_slice()).await.is_err() { return true; - }; + } + for sample in samples { + if s.write_all( + &(if war { + sample.signum() * 32767 + } else { + sample + } + .to_le_bytes()), + ) + .await + .is_err() + { + return true; + }; + } } false } @@ -103,20 +119,21 @@ fn track_valid(track: &Path) -> bool { async fn stream(mut s: TcpStream, tracklist: Arc>) { let args = Args::parse(); + s.set_nodelay(true).unwrap(); loop { - let track = tracklist.choose(&mut thread_rng()).unwrap(); + let track = tracklist.choose(&mut thread_rng()).unwrap().clone(); let mut title = String::new(); let mut artist = String::new(); let mut album = String::new(); - let mut file = std::fs::File::open(track).unwrap(); + let mut file = std::fs::File::open(&track).unwrap(); let tagged = lofty::read_from(&mut file).unwrap(); if let Some(id3v2) = tagged.primary_tag() { title = id3v2.title().unwrap_or(track.file_stem().unwrap().to_string_lossy()).to_string(); album = id3v2.album().unwrap_or("[No tag]".into()).to_string(); - artist = id3v2.artist().unwrap_or("[No tag]".into()).to_string() + artist = id3v2.artist().unwrap_or("[No tag]".into()).to_string(); }; let track_message = format!("{} - {} - {}", &artist, &album, &title); eprintln!( @@ -145,74 +162,32 @@ async fn stream(mut s: TcpStream, tracklist: Arc>) { } ); } - - let file = Box::new(std::fs::File::open(track).unwrap()); - let mut hint = Hint::new(); - hint.with_extension(track.extension().unwrap().to_str().unwrap()); - - let probed = symphonia::default::get_probe() - .format( - &hint, - MediaSourceStream::new(file, Default::default()), - &Default::default(), - &Default::default(), - ) - .expect("unsupported format"); - - let mut format = probed.format; - - let track = format - .tracks() - .iter() - .find(|t| t.codec_params.codec != CODEC_TYPE_NULL) - .expect("no supported audio tracks"); - - let mut decoder = symphonia::default::get_codecs() - .make(&track.codec_params, &Default::default()) - .expect("unsupported codec"); - - let track_id = track.id; - let mut sample_rate = 0; - let mut samples = vec![]; - loop { - let packet = match format.next_packet() { - Ok(packet) => packet, - _ => break, - }; - while !format.metadata().is_latest() { - format.metadata().pop(); + let (channels, sample_rate, time) = get_meta(track.as_path()).await; + let (tx, mut rx) = mpsc::channel::>(8192); + tokio::spawn(decode_file(track, tx)); + let stream = async_stream::stream! { + while let Some(item) = rx.recv().await { + yield item; } - - if packet.track_id() != track_id { - continue; - } - - match decoder.decode(&packet) { - Ok(decoded) => { - sample_rate = decoded.spec().rate; - let mut byte_buf = - SampleBuffer::::new(decoded.capacity() as u64, *decoded.spec()); - byte_buf.copy_interleaved_ref(decoded); - samples.append(&mut byte_buf.samples_mut().to_vec()); - continue; - } - _ => { - // Handling any error as track skip - continue; - } - } - } - let md = SentMetadata { - length: samples.len() as u64, - sample_rate, - title, - album, - artist, }; - if !samples.is_empty() { - if stream_samples(samples, args.war, md, &mut s).await { - break; - } - } + if stream_samples( + stream, + args.war, + SentMetadata { + album, + artist, + title, + length: 0, + track_length_frac: time.frac as f32, + track_length_secs: time.seconds, + sample_rate, + channels, + }, + &mut s, + ) + .await + { + return; + }; } }