0.3.0: send audio by fragments, use crossterm in monoclient

Add option (`-m --max-samplerate N`) to resample tracks if their samplerate exceeds N

Signed-off-by: Ivan Bushchik <ivabus@ivabus.dev>
This commit is contained in:
Ivan Bushchik 2024-03-24 13:24:48 +03:00
parent 0afbed5758
commit f3f21d8fc8
No known key found for this signature in database
GPG key ID: 2F16FBF3262E090C
9 changed files with 551 additions and 208 deletions

253
Cargo.lock generated
View file

@ -116,6 +116,28 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.1.0" version = "1.1.0"
@ -289,6 +311,15 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce"
[[package]]
name = "cmake"
version = "0.1.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130"
dependencies = [
"cc",
]
[[package]] [[package]]
name = "colorchoice" name = "colorchoice"
version = "1.0.0" version = "1.0.0"
@ -363,6 +394,31 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crossterm"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df"
dependencies = [
"bitflags 2.4.2",
"crossterm_winapi",
"libc",
"mio",
"parking_lot",
"signal-hook",
"signal-hook-mio",
"winapi",
]
[[package]]
name = "crossterm_winapi"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "dasp_sample" name = "dasp_sample"
version = "0.11.0" version = "0.11.0"
@ -412,6 +468,49 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "futures-core"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-util"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-core",
"futures-macro",
"futures-task",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.12" version = "0.2.12"
@ -563,6 +662,25 @@ dependencies = [
"windows-targets 0.52.4", "windows-targets 0.52.4",
] ]
[[package]]
name = "libsamplerate-sys"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28853b399f78f8281cd88d333b54a63170c4275f6faea66726a2bea5cca72e0d"
dependencies = [
"cmake",
]
[[package]]
name = "lock_api"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]] [[package]]
name = "lofty" name = "lofty"
version = "0.18.2" version = "0.18.2"
@ -597,16 +715,20 @@ checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
[[package]] [[package]]
name = "lonelyradio" name = "lonelyradio"
version = "0.2.2" version = "0.3.0"
dependencies = [ dependencies = [
"async-stream",
"chrono", "chrono",
"clap", "clap",
"futures-util",
"lofty", "lofty",
"rand", "rand",
"rmp-serde", "rmp-serde",
"samplerate",
"serde", "serde",
"symphonia", "symphonia",
"tokio", "tokio",
"tokio-stream",
"walkdir", "walkdir",
] ]
@ -647,21 +769,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [ dependencies = [
"libc", "libc",
"log",
"wasi", "wasi",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
name = "monoclient" name = "monoclient"
version = "0.2.2" version = "0.3.0"
dependencies = [ dependencies = [
"clap", "clap",
"crossterm",
"monolib", "monolib",
] ]
[[package]] [[package]]
name = "monolib" name = "monolib"
version = "0.2.2" version = "0.3.0"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"rmp-serde", "rmp-serde",
@ -824,6 +948,29 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets 0.48.5",
]
[[package]] [[package]]
name = "paste" name = "paste"
version = "1.0.14" version = "1.0.14"
@ -836,6 +983,12 @@ version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]] [[package]]
name = "pkg-config" name = "pkg-config"
version = "0.3.30" version = "0.3.30"
@ -914,6 +1067,15 @@ dependencies = [
"getrandom", "getrandom",
] ]
[[package]]
name = "redox_syscall"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [
"bitflags 1.3.2",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.10.3" version = "1.10.3"
@ -1010,6 +1172,21 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "samplerate"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e032b2b24715c4f982f483ea3abdb3c9ba444d9f63e87b2843d6f998f5ba2698"
dependencies = [
"libsamplerate-sys",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.197" version = "1.0.197"
@ -1036,6 +1213,51 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801"
dependencies = [
"libc",
"signal-hook-registry",
]
[[package]]
name = "signal-hook-mio"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
dependencies = [
"libc",
"mio",
"signal-hook",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.5.6" version = "0.5.6"
@ -1313,6 +1535,31 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-util"
version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "toml_datetime" name = "toml_datetime"
version = "0.6.5" version = "0.6.5"

View file

@ -1,10 +1,10 @@
[workspace] [workspace]
members = [ "monoclient", "monolib"] members = ["monoclient", "monolib"]
[package] [package]
name = "lonelyradio" name = "lonelyradio"
description = "TCP radio for lonely ones" description = "TCP radio for lonely ones"
version = "0.2.2" version = "0.3.0"
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"
authors = ["Ivan Bushchik <ivabus@ivabus.dev>"] authors = ["Ivan Bushchik <ivabus@ivabus.dev>"]
@ -14,6 +14,7 @@ repository = "https://github.com/ivabus/lonelyradio"
rand = "0.8.5" rand = "0.8.5"
clap = { version = "4.4.18", features = ["derive"] } clap = { version = "4.4.18", features = ["derive"] }
tokio = { version = "1.35.1", features = [ tokio = { version = "1.35.1", features = [
"sync",
"fs", "fs",
"io-util", "io-util",
"net", "net",
@ -27,12 +28,14 @@ symphonia = { version = "0.5.4", features = [
"all-formats", "all-formats",
"opt-simd", "opt-simd",
] } ] }
#samplerate = "0.2.4"
chrono = "0.4" chrono = "0.4"
rmp-serde = "1.1.2" rmp-serde = "1.1.2"
serde = { version = "1.0.197", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
lofty = "0.18.2" lofty = "0.18.2"
async-stream = "0.3.5"
tokio-stream = { version = "0.1.15", features = ["sync"] }
futures-util = "0.3.30"
samplerate = "0.2.4"
[profile.release] [profile.release]
opt-level = 3 opt-level = 3
#lto = true

View file

@ -1,9 +1,10 @@
[package] [package]
name = "monoclient" name = "monoclient"
version = "0.2.2" version = "0.3.0"
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"
[dependencies] [dependencies]
monolib = { path="../monolib" } monolib = { path = "../monolib" }
clap = { version = "4.4.18", features = ["derive"] } clap = { version = "4.4.18", features = ["derive"] }
crossterm = "0.27.0"

View file

@ -1,5 +1,8 @@
use clap::Parser; use clap::Parser;
use std::io::{IsTerminal, Write}; use crossterm::cursor::MoveToColumn;
use crossterm::style::Print;
use crossterm::terminal::{Clear, ClearType};
use std::io::{stdout, IsTerminal};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
#[derive(Parser)] #[derive(Parser)]
@ -12,98 +15,59 @@ struct Args {
no_backspace: bool, no_backspace: bool,
} }
fn delete_chars(n: usize, nb: bool) {
if !nb {
print!("{}{}{}", "\u{8}".repeat(n), " ".repeat(n), "\u{8}".repeat(n));
std::io::stdout().flush().expect("Failed to flush stdout")
} else {
println!()
}
}
fn flush() {
std::io::stdout().flush().unwrap();
}
fn main() { fn main() {
let mut args = Args::parse(); let mut args = Args::parse();
args.no_backspace |= !std::io::stdout().is_terminal(); args.no_backspace |= !std::io::stdout().is_terminal();
std::thread::spawn(move || monolib::run(&args.address)); std::thread::spawn(move || monolib::run(&args.address));
while monolib::get_metadata().is_none() {} while monolib::get_metadata().is_none() {}
let mut md = monolib::get_metadata().unwrap(); let mut md = monolib::get_metadata().unwrap();
let seconds = md.length / md.sample_rate as u64 / 2;
let mut track_start = Instant::now(); let mut track_start = Instant::now();
let mut seconds_past = 0; let mut seconds_past = 0;
let mut msg_len = format!( crossterm::execute!(
"Playing: {} - {} - {} ({}:{:02})", stdout(),
md.artist, Print(format!(
md.album, "Playing: {} - {} - {} ({}:{:02})",
md.title, md.artist,
seconds / 60, md.album,
seconds % 60 md.title,
md.track_length_secs / 60,
md.track_length_secs % 60
))
) )
.len(); .unwrap();
print!(
"Playing: {} - {} - {} ({}:{:02})",
md.artist,
md.album,
md.title,
seconds / 60,
seconds % 60
);
flush();
loop { loop {
if monolib::get_metadata().unwrap() != md { if monolib::get_metadata().unwrap() != md {
md = monolib::get_metadata().unwrap(); md = monolib::get_metadata().unwrap();
let seconds = md.length / md.sample_rate as u64 / 2; crossterm::execute!(stdout(), Clear(ClearType::CurrentLine), MoveToColumn(0)).unwrap();
delete_chars(msg_len, args.no_backspace);
msg_len = format!(
"Playing: {} - {} - {} ({}:{:02})",
md.artist,
md.album,
md.title,
seconds / 60,
seconds % 60
)
.len();
print!( print!(
"Playing: {} - {} - {} (0:00 / {}:{:02})", "Playing: {} - {} - {} (0:00 / {}:{:02})",
md.artist, md.artist,
md.album, md.album,
md.title, md.title,
seconds / 60, md.track_length_secs / 60,
seconds % 60 md.track_length_secs % 60
); );
flush();
track_start = Instant::now(); track_start = Instant::now();
seconds_past = 0; seconds_past = 0;
} }
if (Instant::now() - track_start).as_secs() > seconds_past && !args.no_backspace { if (Instant::now() - track_start).as_secs() > seconds_past && !args.no_backspace {
seconds_past = (Instant::now() - track_start).as_secs(); seconds_past = (Instant::now() - track_start).as_secs();
msg_len = format!( crossterm::execute!(stdout(), Clear(ClearType::CurrentLine), MoveToColumn(0)).unwrap();
"Playing: {} - {} - {} ({}:{:02} / {}:{:02})", crossterm::execute!(
md.artist, stdout(),
md.album, Print(format!(
md.title, "Playing: {} - {} - {} ({}:{:02} / {}:{:02})",
seconds_past / 60, md.artist,
seconds_past % 60, md.album,
seconds / 60, md.title,
seconds % 60 seconds_past / 60,
seconds_past % 60,
md.track_length_secs / 60,
md.track_length_secs % 60
))
) )
.len(); .unwrap();
delete_chars(msg_len, args.no_backspace);
print!(
"Playing: {} - {} - {} ({}:{:02} / {}:{:02})",
md.artist,
md.album,
md.title,
seconds_past / 60,
seconds_past % 60,
seconds / 60,
seconds % 60
);
flush();
} }
std::thread::sleep(Duration::from_secs_f32(0.05)) std::thread::sleep(Duration::from_secs_f32(0.25))
} }
} }

View file

@ -1,6 +1,6 @@
[package] [package]
name = "monolib" name = "monolib"
version = "0.2.2" version = "0.3.0"
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"
description = "A library implementing the lonely radio audio streaming protocol" description = "A library implementing the lonely radio audio streaming protocol"

View file

@ -26,10 +26,7 @@ use std::net::TcpStream;
use std::sync::RwLock; use std::sync::RwLock;
use std::time::Instant; use std::time::Instant;
// How many samples to cache before playing in samples (both channels) SHOULD BE EVEN const CACHE_SIZE: usize = 50;
const BUFFER_SIZE: usize = 4800;
// How many buffers to cache
const CACHE_SIZE: usize = 160;
static SINK: RwLock<Option<Sink>> = RwLock::new(None); static SINK: RwLock<Option<Sink>> = RwLock::new(None);
static MD: RwLock<Option<Metadata>> = RwLock::new(None); static MD: RwLock<Option<Metadata>> = RwLock::new(None);
@ -48,8 +45,13 @@ pub enum State {
/// Track metadata /// Track metadata
#[derive(Deserialize, Clone, Debug, PartialEq)] #[derive(Deserialize, Clone, Debug, PartialEq)]
pub struct Metadata { pub struct Metadata {
/// In samples, length / (sample_rate * 2 (channels)) = length in seconds /// Fragment length
pub length: u64, pub length: u64,
/// Total track length
pub track_length_secs: u64,
pub track_length_frac: f32,
pub channels: u16,
// Yep, no more interpolation
pub sample_rate: u32, pub sample_rate: u32,
pub title: String, pub title: String,
pub album: String, pub album: String,
@ -147,9 +149,8 @@ pub fn run(server: &str) {
drop(sink); drop(sink);
let mut buffer = [0u8; 2]; let mut buffer = [0u8; 2];
let mut samples = [0f32; BUFFER_SIZE]; let mut samples = Vec::with_capacity(8192);
loop { loop {
let mut index = 0usize;
let recv_md: Metadata = let recv_md: Metadata =
rmp_serde::from_read(&stream).expect("Failed to parse track metadata"); rmp_serde::from_read(&stream).expect("Failed to parse track metadata");
@ -168,35 +169,35 @@ pub fn run(server: &str) {
if stream.read_exact(&mut buffer).is_err() { if stream.read_exact(&mut buffer).is_err() {
return; return;
}; };
samples.push(byteorder::LittleEndian::read_i16(&buffer[..2]) as f32 / 32768.0);
samples[index] = byteorder::LittleEndian::read_i16(&buffer[..2]) as f32 / 32768.0; }
index += 1; // Sink's thread is detached from main thread, so we need to synchronize with it
// Why we should synchronize with it?
if index == BUFFER_SIZE { // Let's say, that if we don't synchronize with it, we would have
// Sink's thread is detached from main thread, so we need to synchronize with it // a lot (no upper limit, actualy) of buffered sound, waiting for playing in sink
// Why we should synchronize with it? let sink = SINK.read().unwrap();
// Let's say, that if we don't synchronize with it, we would have if let Some(sink) = sink.as_ref() {
// a lot (no upper limit, actualy) of buffered sound, waiting for playing in sink while sink.len() >= CACHE_SIZE {
let sink = SINK.read().unwrap(); // Sleeping exactly one buffer and watching for reset signal
if let Some(sink) = sink.as_ref() { if watching_sleep(
while sink.len() >= CACHE_SIZE { if sink.len() > 2 {
// Sleeping exactly one buffer and watching for reset signal sink.len() as f32 - 2.0
if watching_sleep( } else {
if sink.len() > 2 { 0.25
sink.len() as f32 - 2.0 } * recv_md.length as f32
} else { / recv_md.sample_rate as f32
0.5 / 2.0,
} * BUFFER_SIZE as f32 / recv_md.sample_rate as f32 ) {
/ 2.0, _stop();
) { return;
_stop();
return;
}
}
sink.append(SamplesBuffer::new(2, recv_md.sample_rate, samples.as_slice()));
index = 0;
} }
} }
sink.append(SamplesBuffer::new(
recv_md.channels,
recv_md.sample_rate,
samples.as_slice(),
));
samples.clear();
} }
} }
} }

152
src/decode.rs Normal file
View file

@ -0,0 +1,152 @@
use std::path::{Path, PathBuf};
use clap::Parser;
use symphonia::core::audio::SampleBuffer;
use symphonia::core::codecs::CODEC_TYPE_NULL;
use symphonia::core::io::MediaSourceStream;
use symphonia::core::probe::Hint;
use symphonia::core::units::Time;
use crate::Args;
pub async fn get_meta(file_path: &Path) -> (u16, u32, Time) {
let file = Box::new(std::fs::File::open(file_path).unwrap());
let mut hint = Hint::new();
hint.with_extension(file_path.extension().unwrap().to_str().unwrap());
let probed = symphonia::default::get_probe()
.format(
&hint,
MediaSourceStream::new(file, Default::default()),
&Default::default(),
&Default::default(),
)
.expect("unsupported format");
let mut format = probed.format;
let track = format
.tracks()
.iter()
.find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
.expect("no supported audio tracks");
let mut decoder = symphonia::default::get_codecs()
.make(&track.codec_params, &Default::default())
.expect("unsupported codec");
let track_id = track.id;
let mut channels = 2u16;
let mut sample_rate = 0;
let track_length =
track.codec_params.time_base.unwrap().calc_time(track.codec_params.n_frames.unwrap());
loop {
let packet = match format.next_packet() {
Ok(packet) => packet,
_ => break,
};
if packet.track_id() != track_id {
continue;
}
match decoder.decode(&packet) {
Ok(decoded) => {
channels = decoded.spec().channels.count().try_into().unwrap();
sample_rate = decoded.spec().rate;
break;
}
_ => {
// Handling any error as track skip
continue;
}
}
}
let args = Args::parse();
(
channels,
if sample_rate > args.max_samplerate {
args.max_samplerate
} else {
sample_rate
},
track_length,
)
}
/// Getting samples
pub async fn decode_file(file_path: PathBuf, tx: tokio::sync::mpsc::Sender<Vec<i16>>) {
let args = Args::parse();
let file = Box::new(std::fs::File::open(&file_path).unwrap());
let mut hint = Hint::new();
hint.with_extension(file_path.extension().unwrap().to_str().unwrap());
let probed = symphonia::default::get_probe()
.format(
&hint,
MediaSourceStream::new(file, Default::default()),
&Default::default(),
&Default::default(),
)
.expect("unsupported format");
let mut format = probed.format;
let track = format
.tracks()
.iter()
.find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
.expect("no supported audio tracks");
let mut decoder = symphonia::default::get_codecs()
.make(&track.codec_params, &Default::default())
.expect("unsupported codec");
let track_id = track.id;
loop {
let packet = match format.next_packet() {
Ok(packet) => packet,
_ => break,
};
if packet.track_id() != track_id {
continue;
}
match decoder.decode(&packet) {
Ok(decoded) => {
if decoded.spec().rate > args.max_samplerate {
let spec = *decoded.spec();
let mut byte_buf =
SampleBuffer::<f32>::new(decoded.capacity() as u64, *decoded.spec());
byte_buf.copy_interleaved_ref(decoded);
tx.send(
samplerate::convert(
spec.rate,
args.max_samplerate,
spec.channels.count(),
samplerate::ConverterType::Linear,
byte_buf.samples(),
)
.unwrap()
.iter()
.map(|x| (*x * 32767.0) as i16)
.collect(),
)
.await
.unwrap();
} else {
let mut byte_buf =
SampleBuffer::<i16>::new(decoded.capacity() as u64, *decoded.spec());
byte_buf.copy_interleaved_ref(decoded);
tx.send(byte_buf.samples().to_vec()).await.unwrap();
}
continue;
}
_ => {
// Handling any error as track skip
continue;
}
}
}
}

View file

@ -1,21 +1,26 @@
mod decode;
use std::path::Path; use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use chrono::Local; use chrono::Local;
use clap::Parser; use clap::Parser;
use futures_util::pin_mut;
use lofty::Accessor; use lofty::Accessor;
use lofty::TaggedFileExt; use lofty::TaggedFileExt;
use rand::prelude::*; use rand::prelude::*;
use serde::Serialize; use serde::Serialize;
use symphonia::core::audio::SampleBuffer;
use symphonia::core::codecs::CODEC_TYPE_NULL;
use symphonia::core::io::MediaSourceStream;
use symphonia::core::probe::Hint;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
use walkdir::DirEntry; use walkdir::DirEntry;
use crate::decode::decode_file;
use crate::decode::get_meta;
#[derive(Parser)] #[derive(Parser)]
struct Args { struct Args {
dir: PathBuf, dir: PathBuf,
@ -27,13 +32,19 @@ struct Args {
#[arg(short, long)] #[arg(short, long)]
war: bool, war: bool,
#[arg(short, long, default_value = "96000")]
max_samplerate: u32,
} }
#[derive(Serialize)] #[derive(Serialize, Clone)]
struct SentMetadata { struct SentMetadata {
// In bytes, we need to read next track metadata /// Fragment length
length: u64, length: u64,
// Yep, no more interpolation /// Total track length
track_length_secs: u64,
track_length_frac: f32,
channels: u16,
sample_rate: u32, sample_rate: u32,
title: String, title: String,
album: String, album: String,
@ -41,29 +52,34 @@ struct SentMetadata {
} }
async fn stream_samples( async fn stream_samples(
track_samples: Vec<i16>, samples_stream: impl Stream<Item = Vec<i16>>,
war: bool, war: bool,
md: SentMetadata, md: SentMetadata,
s: &mut TcpStream, s: &mut TcpStream,
) -> bool { ) -> bool {
if s.write_all(rmp_serde::to_vec(&md).unwrap().as_slice()).await.is_err() { pin_mut!(samples_stream);
return true;
}
for sample in track_samples { while let Some(samples) = samples_stream.next().await {
if s.write_all( let mut md = md.clone();
&(if war { md.length = samples.len() as u64;
sample.signum() * 32767 if s.write_all(rmp_serde::to_vec(&md).unwrap().as_slice()).await.is_err() {
} else {
sample
}
.to_le_bytes()),
)
.await
.is_err()
{
return true; return true;
}; }
for sample in samples {
if s.write_all(
&(if war {
sample.signum() * 32767
} else {
sample
}
.to_le_bytes()),
)
.await
.is_err()
{
return true;
};
}
} }
false false
} }
@ -103,20 +119,21 @@ fn track_valid(track: &Path) -> bool {
async fn stream(mut s: TcpStream, tracklist: Arc<Vec<PathBuf>>) { async fn stream(mut s: TcpStream, tracklist: Arc<Vec<PathBuf>>) {
let args = Args::parse(); let args = Args::parse();
s.set_nodelay(true).unwrap();
loop { loop {
let track = tracklist.choose(&mut thread_rng()).unwrap(); let track = tracklist.choose(&mut thread_rng()).unwrap().clone();
let mut title = String::new(); let mut title = String::new();
let mut artist = String::new(); let mut artist = String::new();
let mut album = String::new(); let mut album = String::new();
let mut file = std::fs::File::open(track).unwrap(); let mut file = std::fs::File::open(&track).unwrap();
let tagged = lofty::read_from(&mut file).unwrap(); let tagged = lofty::read_from(&mut file).unwrap();
if let Some(id3v2) = tagged.primary_tag() { if let Some(id3v2) = tagged.primary_tag() {
title = title =
id3v2.title().unwrap_or(track.file_stem().unwrap().to_string_lossy()).to_string(); id3v2.title().unwrap_or(track.file_stem().unwrap().to_string_lossy()).to_string();
album = id3v2.album().unwrap_or("[No tag]".into()).to_string(); album = id3v2.album().unwrap_or("[No tag]".into()).to_string();
artist = id3v2.artist().unwrap_or("[No tag]".into()).to_string() artist = id3v2.artist().unwrap_or("[No tag]".into()).to_string();
}; };
let track_message = format!("{} - {} - {}", &artist, &album, &title); let track_message = format!("{} - {} - {}", &artist, &album, &title);
eprintln!( eprintln!(
@ -145,74 +162,32 @@ async fn stream(mut s: TcpStream, tracklist: Arc<Vec<PathBuf>>) {
} }
); );
} }
let (channels, sample_rate, time) = get_meta(track.as_path()).await;
let file = Box::new(std::fs::File::open(track).unwrap()); let (tx, mut rx) = mpsc::channel::<Vec<i16>>(8192);
let mut hint = Hint::new(); tokio::spawn(decode_file(track, tx));
hint.with_extension(track.extension().unwrap().to_str().unwrap()); let stream = async_stream::stream! {
while let Some(item) = rx.recv().await {
let probed = symphonia::default::get_probe() yield item;
.format(
&hint,
MediaSourceStream::new(file, Default::default()),
&Default::default(),
&Default::default(),
)
.expect("unsupported format");
let mut format = probed.format;
let track = format
.tracks()
.iter()
.find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
.expect("no supported audio tracks");
let mut decoder = symphonia::default::get_codecs()
.make(&track.codec_params, &Default::default())
.expect("unsupported codec");
let track_id = track.id;
let mut sample_rate = 0;
let mut samples = vec![];
loop {
let packet = match format.next_packet() {
Ok(packet) => packet,
_ => break,
};
while !format.metadata().is_latest() {
format.metadata().pop();
} }
if packet.track_id() != track_id {
continue;
}
match decoder.decode(&packet) {
Ok(decoded) => {
sample_rate = decoded.spec().rate;
let mut byte_buf =
SampleBuffer::<i16>::new(decoded.capacity() as u64, *decoded.spec());
byte_buf.copy_interleaved_ref(decoded);
samples.append(&mut byte_buf.samples_mut().to_vec());
continue;
}
_ => {
// Handling any error as track skip
continue;
}
}
}
let md = SentMetadata {
length: samples.len() as u64,
sample_rate,
title,
album,
artist,
}; };
if !samples.is_empty() { if stream_samples(
if stream_samples(samples, args.war, md, &mut s).await { stream,
break; args.war,
} SentMetadata {
} album,
artist,
title,
length: 0,
track_length_frac: time.frac as f32,
track_length_secs: time.seconds,
sample_rate,
channels,
},
&mut s,
)
.await
{
return;
};
} }
} }