0.4.0: XOR encryption + memory optimization

- Optional XOR encryption is now available in lonelyradio and monolib
- lonelyradio was optimized to use less TCP calls and less memory copies
- New program: monoloader - music downloader for lonelyradio
- New monolib function - get_track() for downloading track (as samples),
  not playing
- lonelyradio and monolib are using extensible Writer and Reader enums
  to provide ability to use lonelyradio with different transport
  protocols or encryption
- Add lonelyradio_types crate to share types

Signed-off-by: Ivan Bushchik <ivabus@ivabus.dev>
This commit is contained in:
Ivan Bushchik 2024-05-16 08:28:18 +03:00
parent 05b65e25ed
commit 39cc35e16d
No known key found for this signature in database
GPG key ID: 2F16FBF3262E090C
16 changed files with 1074 additions and 343 deletions

818
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,10 +1,16 @@
[workspace]
members = ["monoclient", "monolib"]
members = [
"lonelyradio_types",
"monoclient",
"monolib",
"monoloader",
"platform/gtk",
]
[package]
name = "lonelyradio"
description = "TCP radio for lonely ones"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
license = "MIT"
authors = ["Ivan Bushchik <ivabus@ivabus.dev>"]
@ -36,6 +42,8 @@ async-stream = "0.3.5"
tokio-stream = { version = "0.1.15", features = ["sync"] }
futures-util = "0.3.30"
samplerate = "0.2.4"
lonelyradio_types = { path = "./lonelyradio_types" }
once_cell = "1.19.0"
[profile.release]
opt-level = 3

View file

@ -21,11 +21,13 @@ cargo build -r
## Run
```
lonelyradio [-a <ADDRESS:PORT>] <MUSIC_FOLDER> [-p] [-w]
lonelyradio <MUSIC_FOLDER> [-a <ADDRESS:PORT>] [-p] [-w] [-m|--max-samplerate M]
```
All files (recursively) will be shuffled and played back. Public log will be displayed to stdout, private to stderr.
`-m|--max-samplerate M` will resample tracks which samplerate exceeds M to M
### Clients
[monoclient](./monoclient) is a recommended CLI client for lonelyradio that uses [monolib](./monolib)

View file

@ -0,0 +1,9 @@
[package]
name = "lonelyradio_types"
version = "0.4.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = { version = "1.0.197", features = ["derive"] }

View file

@ -0,0 +1,29 @@
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub enum Message {
T(TrackMetadata),
F(FragmentMetadata),
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub struct TrackMetadata {
pub track_length_secs: u64,
pub track_length_frac: f32,
pub channels: u16,
pub sample_rate: u32,
pub title: String,
pub album: String,
pub artist: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub struct FragmentMetadata {
// In samples
pub length: u64,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub struct SessionSettings {
pub gzip: bool,
}

View file

@ -1,6 +1,6 @@
[package]
name = "monoclient"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
license = "MIT"

View file

@ -3,6 +3,7 @@ use crossterm::cursor::MoveToColumn;
use crossterm::style::Print;
use crossterm::terminal::{Clear, ClearType};
use std::io::{stdout, IsTerminal};
use std::path::PathBuf;
use std::time::{Duration, Instant};
#[derive(Parser)]
@ -13,12 +14,20 @@ struct Args {
/// Do not use backspace control char
#[arg(short)]
no_backspace: bool,
#[arg(long)]
xor_key_file: Option<PathBuf>,
}
fn main() {
let mut args = Args::parse();
args.no_backspace |= !std::io::stdout().is_terminal();
std::thread::spawn(move || monolib::run(&args.address));
std::thread::spawn(move || {
monolib::run(
&args.address,
args.xor_key_file.map(|key| std::fs::read(key).expect("Failed to read preshared key")),
)
});
while monolib::get_metadata().is_none() {}
let mut md = monolib::get_metadata().unwrap();
let mut track_start = Instant::now();
@ -35,9 +44,13 @@ fn main() {
))
)
.unwrap();
let mut track_length = md.track_length_secs as f64 + md.track_length_frac as f64;
let mut next_md = md.clone();
loop {
if monolib::get_metadata().unwrap() != md {
md = monolib::get_metadata().unwrap();
if monolib::get_metadata().unwrap() != md
&& track_length <= (Instant::now() - track_start).as_secs_f64()
{
md = next_md.clone();
crossterm::execute!(stdout(), Clear(ClearType::CurrentLine), MoveToColumn(0)).unwrap();
print!(
"Playing: {} - {} - {} (0:00 / {}:{:02})",
@ -49,6 +62,9 @@ fn main() {
);
track_start = Instant::now();
seconds_past = 0;
track_length = md.track_length_secs as f64 + md.track_length_frac as f64
} else if next_md == md {
next_md = monolib::get_metadata().unwrap();
}
if (Instant::now() - track_start).as_secs() > seconds_past && !args.no_backspace {
seconds_past = (Instant::now() - track_start).as_secs();

View file

@ -1,6 +1,6 @@
[package]
name = "monolib"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
license = "MIT"
description = "A library implementing the lonely radio audio streaming protocol"
@ -15,4 +15,4 @@ crate-type = ["staticlib", "cdylib", "rlib"]
rodio = { version = "0.17.3", default-features = false }
byteorder = "1.5.0"
rmp-serde = "1.1.2"
serde = { version = "1.0.197", features = ["derive"] }
lonelyradio_types = { path = "../lonelyradio_types" }

View file

@ -7,10 +7,13 @@ use std::ffi::{CStr, CString};
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn c_start(server: *const c_char) {
let serv = unsafe { CStr::from_ptr(server) };
run(match serv.to_str() {
run(
match serv.to_str() {
Ok(s) => s,
_ => "",
})
},
None,
)
}
#[no_mangle]
@ -69,7 +72,7 @@ pub extern "C" fn c_get_metadata_title() -> *mut c_char {
pub extern "C" fn c_get_metadata_length() -> *mut c_float {
let md = MD.read().unwrap();
match md.as_ref() {
Some(md) => &mut (md.length as c_float / md.sample_rate as c_float),
Some(md) => &mut (md.track_length_secs as c_float + md.track_length_frac as c_float),
None => &mut 0.0,
}
}

View file

@ -16,20 +16,21 @@
/// Functions, providing C-like API
pub mod c;
mod reader;
use byteorder::ByteOrder;
use byteorder::{LittleEndian, ReadBytesExt};
use lonelyradio_types::{Message, TrackMetadata};
use rodio::buffer::SamplesBuffer;
use rodio::{OutputStream, Sink};
use serde::Deserialize;
use std::io::Read;
use std::io::BufReader;
use std::net::TcpStream;
use std::sync::RwLock;
use std::time::Instant;
const CACHE_SIZE: usize = 500;
const CACHE_SIZE: usize = 32;
static SINK: RwLock<Option<Sink>> = RwLock::new(None);
static MD: RwLock<Option<Metadata>> = RwLock::new(None);
static MD: RwLock<Option<TrackMetadata>> = RwLock::new(None);
static STATE: RwLock<State> = RwLock::new(State::NotStarted);
/// Player state
@ -42,22 +43,6 @@ pub enum State {
Paused = 3,
}
/// Track metadata
#[derive(Deserialize, Clone, Debug, PartialEq)]
pub struct Metadata {
/// Fragment length
pub length: u64,
/// Total track length
pub track_length_secs: u64,
pub track_length_frac: f32,
pub channels: u16,
// Yep, no more interpolation
pub sample_rate: u32,
pub title: String,
pub album: String,
pub artist: String,
}
/// Play/pauses playback
pub fn toggle() {
let mut state = crate::STATE.write().unwrap();
@ -81,6 +66,9 @@ pub fn toggle() {
/// Stops playback
pub fn stop() {
let mut state = STATE.write().unwrap();
if *state == State::NotStarted {
return;
}
*state = State::Resetting;
let sink = SINK.read().unwrap();
@ -99,7 +87,7 @@ pub fn get_state() -> State {
*STATE.read().unwrap()
}
pub fn get_metadata() -> Option<Metadata> {
pub fn get_metadata() -> Option<TrackMetadata> {
MD.read().unwrap().clone()
}
@ -130,8 +118,37 @@ fn watching_sleep(dur: f32) -> bool {
false
}
/// Download track as samples
pub fn get_track(server: &str, xor_key: Option<Vec<u8>>) -> Option<(TrackMetadata, Vec<i16>)> {
let mut stream = BufReader::new(match xor_key {
Some(k) => reader::Reader::XorEncrypted(TcpStream::connect(server).unwrap(), k, 0),
None => reader::Reader::Unencrypted(TcpStream::connect(server).unwrap()),
});
let mut samples = vec![];
let mut md: Option<TrackMetadata> = None;
loop {
let recv_md: Message = rmp_serde::from_read(&mut stream).expect("Failed to parse message");
match recv_md {
Message::T(tmd) => {
if md.is_some() {
break;
}
md = Some(tmd);
}
Message::F(fmd) => {
let mut buf = vec![0; fmd.length as usize];
stream.read_i16_into::<LittleEndian>(&mut buf).unwrap();
samples.append(&mut buf);
}
}
}
md.map(|md| (md, samples))
}
/// Starts playing at "server:port"
pub fn run(server: &str) {
pub fn run(server: &str, xor_key: Option<Vec<u8>>) {
let mut state = STATE.write().unwrap();
if *state == State::Playing || *state == State::Paused {
return;
@ -139,7 +156,11 @@ pub fn run(server: &str) {
*state = State::Playing;
drop(state);
let mut stream = TcpStream::connect(server).unwrap();
let mut stream = BufReader::new(match xor_key {
Some(k) => reader::Reader::XorEncrypted(TcpStream::connect(server).unwrap(), k, 0),
None => reader::Reader::Unencrypted(TcpStream::connect(server).unwrap()),
});
let mut sink = SINK.write().unwrap();
let (_stream, stream_handle) = OutputStream::try_default().unwrap();
@ -148,16 +169,15 @@ pub fn run(server: &str) {
*sink = Some(audio_sink);
drop(sink);
let mut buffer = [0u8; 2];
let mut samples = Vec::with_capacity(8192);
loop {
let recv_md: Metadata =
rmp_serde::from_read(&stream).expect("Failed to parse track metadata");
let recv_md: Message = rmp_serde::from_read(&mut stream).expect("Failed to parse message");
match recv_md {
Message::T(tmd) => {
let mut md = MD.write().unwrap();
*md = Some(recv_md.clone());
drop(md);
for _ in 0..recv_md.length {
*md = Some(tmd.clone());
}
Message::F(fmd) => {
while *STATE.read().unwrap() == State::Paused {
std::thread::sleep(std::time::Duration::from_secs_f32(0.25))
}
@ -165,17 +185,21 @@ pub fn run(server: &str) {
_stop();
return;
}
if stream.read_exact(&mut buffer).is_err() {
let mut samples_i16 = vec![0; fmd.length as usize];
if stream.read_i16_into::<LittleEndian>(&mut samples_i16).is_err() {
return;
};
samples.push(byteorder::LittleEndian::read_i16(&buffer[..2]) as f32 / 32768.0);
}
samples.append(
&mut samples_i16.iter().map(|sample| *sample as f32 / 32767.0).collect(),
);
// Sink's thread is detached from main thread, so we need to synchronize with it
// Why we should synchronize with it?
// Let's say, that if we don't synchronize with it, we would have
// a lot (no upper limit, actualy) of buffered sound, waiting for playing in sink
let sink = SINK.read().unwrap();
let md = MD.read().unwrap();
let md = md.as_ref().unwrap();
if let Some(sink) = sink.as_ref() {
while sink.len() >= CACHE_SIZE {
// Sleeping exactly one buffer and watching for reset signal
@ -184,20 +208,21 @@ pub fn run(server: &str) {
sink.len() as f32 - 2.0
} else {
0.25
} * recv_md.length as f32
/ recv_md.sample_rate as f32
/ 2.0,
} * fmd.length as f32 / md.sample_rate as f32
/ 4.0,
) {
_stop();
return;
}
}
sink.append(SamplesBuffer::new(
recv_md.channels,
recv_md.sample_rate,
md.channels,
md.sample_rate,
samples.as_slice(),
));
samples.clear();
}
}
}
}
}

25
monolib/src/reader.rs Normal file
View file

@ -0,0 +1,25 @@
use std::{io, net::TcpStream};
pub(crate) enum Reader {
Unencrypted(TcpStream),
XorEncrypted(TcpStream, Vec<u8>, u64),
}
impl io::Read for Reader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
Self::Unencrypted(s) => s.read(buf),
Self::XorEncrypted(s, key, n) => {
let out = s.read(buf);
if let Ok(i) = &out {
for k in buf.iter_mut().take(*i) {
*k ^= key[*n as usize];
*n += 1;
*n %= key.len() as u64;
}
}
out
}
}
}
}

11
monoloader/Cargo.toml Normal file
View file

@ -0,0 +1,11 @@
[package]
name = "monoloader"
version = "0.4.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
monolib = { path = "../monolib" }
clap = { version = "4.4.18", features = ["derive"] }
hound = "3.5.1"

38
monoloader/src/main.rs Normal file
View file

@ -0,0 +1,38 @@
use clap::Parser;
use std::path::PathBuf;
#[derive(Parser)]
struct Args {
/// Remote address
address: String,
#[arg(long)]
xor_key_file: Option<PathBuf>,
}
fn main() {
let args = Args::parse();
let (md, samples) = monolib::get_track(
&args.address,
args.xor_key_file.map(|key| std::fs::read(key).expect("Failed to read preshared key")),
)
.unwrap();
println!(
"Downloaded: {} - {} - {} ({} MB)",
md.artist,
md.album,
md.title,
samples.len() as f32 * 2.0 / 1024.0 / 1024.0
);
let spec = hound::WavSpec {
channels: md.channels,
sample_rate: md.sample_rate,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};
let mut writer =
hound::WavWriter::create(format!("{} - {}.wav", md.artist, md.title), spec).unwrap();
let mut writer_i16 = writer.get_i16_writer(samples.len() as u32);
samples.iter().for_each(|s| writer_i16.write_sample(*s));
writer_i16.flush().unwrap();
}

View file

@ -1,6 +1,8 @@
use std::path::{Path, PathBuf};
use async_stream::stream;
use clap::Parser;
use futures_util::Stream;
use symphonia::core::audio::SampleBuffer;
use symphonia::core::codecs::CODEC_TYPE_NULL;
use symphonia::core::io::MediaSourceStream;
@ -75,7 +77,7 @@ pub async fn get_meta(file_path: &Path) -> (u16, u32, Time) {
}
/// Getting samples
pub async fn decode_file(file_path: PathBuf, tx: tokio::sync::mpsc::Sender<Vec<i16>>) {
pub fn decode_file_stream(file_path: PathBuf) -> impl Stream<Item = Vec<i16>> {
let args = Args::parse();
let file = Box::new(std::fs::File::open(&file_path).unwrap());
let mut hint = Hint::new();
@ -102,6 +104,7 @@ pub async fn decode_file(file_path: PathBuf, tx: tokio::sync::mpsc::Sender<Vec<i
.make(&track.codec_params, &Default::default())
.expect("unsupported codec");
let track_id = track.id;
stream! {
loop {
let packet = match format.next_packet() {
Ok(packet) => packet,
@ -120,8 +123,9 @@ pub async fn decode_file(file_path: PathBuf, tx: tokio::sync::mpsc::Sender<Vec<i
SampleBuffer::<f32>::new(decoded.capacity() as u64, *decoded.spec());
byte_buf.copy_interleaved_ref(decoded);
tx.send(
samplerate::convert(
// About Samplerate struct:
// We are downsampling, not upsampling, so we should be fine
yield (samplerate::convert(
spec.rate,
args.max_samplerate,
spec.channels.count(),
@ -130,16 +134,14 @@ pub async fn decode_file(file_path: PathBuf, tx: tokio::sync::mpsc::Sender<Vec<i
)
.unwrap()
.iter()
.map(|x| (*x * 32767.0) as i16)
.collect(),
)
.await
.unwrap();
.map(|x| (*x * 32768.0) as i16)
.collect());
} else {
let mut byte_buf =
SampleBuffer::<i16>::new(decoded.capacity() as u64, *decoded.spec());
byte_buf.copy_interleaved_ref(decoded);
tx.send(byte_buf.samples().to_vec()).await.unwrap();
yield (byte_buf.samples().to_vec());
}
continue;
}
@ -150,3 +152,4 @@ pub async fn decode_file(file_path: PathBuf, tx: tokio::sync::mpsc::Sender<Vec<i
}
}
}
}

View file

@ -1,4 +1,5 @@
mod decode;
mod writer;
use std::path::Path;
use std::path::PathBuf;
@ -9,16 +10,17 @@ use clap::Parser;
use futures_util::pin_mut;
use lofty::Accessor;
use lofty::TaggedFileExt;
use lonelyradio_types::{FragmentMetadata, Message, TrackMetadata};
use once_cell::sync::Lazy;
use rand::prelude::*;
use serde::Serialize;
use tokio::io::AsyncWriteExt;
use std::io::Write;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
use walkdir::DirEntry;
use writer::Writer;
use crate::decode::decode_file;
use crate::decode::decode_file_stream;
use crate::decode::get_meta;
#[derive(Parser)]
@ -35,50 +37,58 @@ struct Args {
#[arg(short, long, default_value = "96000")]
max_samplerate: u32,
#[arg(long)]
xor_key_file: Option<PathBuf>,
}
#[derive(Serialize, Clone)]
struct SentMetadata {
/// Fragment length
length: u64,
/// Total track length
track_length_secs: u64,
track_length_frac: f32,
channels: u16,
sample_rate: u32,
title: String,
album: String,
artist: String,
static KEY: Lazy<Option<Arc<Vec<u8>>>> = Lazy::new(|| {
let args = Args::parse();
if let Some(path) = args.xor_key_file {
let key = std::fs::read(path).expect("Failed to read preshared key");
Some(Arc::new(key))
} else {
None
}
});
async fn stream_samples(
async fn stream_track(
samples_stream: impl Stream<Item = Vec<i16>>,
war: bool,
md: SentMetadata,
s: &mut TcpStream,
md: TrackMetadata,
s: &mut Writer,
) -> bool {
pin_mut!(samples_stream);
while let Some(samples) = samples_stream.next().await {
let mut md = md.clone();
md.length = samples.len() as u64;
if s.write_all(rmp_serde::to_vec(&md).unwrap().as_slice()).await.is_err() {
return true;
}
for sample in samples {
if s.write_all(
&(if war {
sample.signum() * 32767
} else {
sample
}
.to_le_bytes()),
)
.await
.is_err()
{
if s.write_all(rmp_serde::to_vec(&Message::T(md)).unwrap().as_slice()).is_err() {
return true;
};
while let Some(mut _samples) = samples_stream.next().await {
let md = Message::F(FragmentMetadata {
length: _samples.len() as u64,
});
if s.write_all(rmp_serde::to_vec(&md).unwrap().as_slice()).is_err() {
return true;
}
if war {
_samples.iter_mut().for_each(|sample| {
*sample = sample.signum() * 32767;
});
}
// Launching lonelyradio on the router moment
if cfg!(target_endian = "big") {
_samples.iter_mut().for_each(|sample| {
*sample = sample.to_le();
});
}
// Sowwy about that
let (_, samples, _) = unsafe { _samples.align_to::<u8>() };
if s.write_all(samples).is_err() {
return true;
}
}
false
@ -117,10 +127,24 @@ fn track_valid(track: &Path) -> bool {
true
}
async fn stream(mut s: TcpStream, tracklist: Arc<Vec<PathBuf>>) {
async fn stream(s: TcpStream, tracklist: Arc<Vec<PathBuf>>) {
let args = Args::parse();
s.set_nodelay(true).unwrap();
let s = s.into_std().unwrap();
s.set_nonblocking(false).unwrap();
let mut s = if args.xor_key_file.is_some() {
Writer::XorEncrypted(
s,
match &*KEY {
Some(a) => a.clone(),
_ => {
unreachable!()
}
},
0,
)
} else {
Writer::Unencrypted(s)
};
loop {
let track = tracklist.choose(&mut thread_rng()).unwrap().clone();
@ -163,23 +187,16 @@ async fn stream(mut s: TcpStream, tracklist: Arc<Vec<PathBuf>>) {
);
}
let (channels, sample_rate, time) = get_meta(track.as_path()).await;
let (tx, mut rx) = mpsc::channel::<Vec<i16>>(8192);
tokio::spawn(decode_file(track, tx));
let stream = async_stream::stream! {
while let Some(item) = rx.recv().await {
yield item;
}
};
if stream_samples(
let stream = decode_file_stream(track);
if stream_track(
stream,
args.war,
SentMetadata {
TrackMetadata {
track_length_frac: time.frac as f32,
track_length_secs: time.seconds,
album,
artist,
title,
length: 0,
track_length_frac: time.frac as f32,
track_length_secs: time.seconds,
sample_rate,
channels,
},

59
src/writer.rs Normal file
View file

@ -0,0 +1,59 @@
use std::{
borrow::BorrowMut,
io,
net::{SocketAddr, TcpStream},
sync::Arc,
};
pub(crate) enum Writer {
Unencrypted(TcpStream),
XorEncrypted(TcpStream, Arc<Vec<u8>>, u64),
}
impl io::Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
Self::Unencrypted(s) => s.write(buf),
Self::XorEncrypted(s, key, n) => {
for mut k in buf.iter().copied() {
k ^= key[*n as usize];
*n += 1;
*n %= key.len() as u64;
s.write_all(&[k])?;
}
Ok(buf.len())
}
}
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
match self {
Self::Unencrypted(s) => s.write_all(buf),
Self::XorEncrypted(s, key, n) => s.write_all(
&buf.iter()
.borrow_mut()
.copied()
.map(|mut k| {
k ^= key[*n as usize];
*n += 1;
*n %= key.len() as u64;
k
})
// I don't like it
.collect::<Vec<u8>>(),
),
}
}
fn flush(&mut self) -> io::Result<()> {
match self {
Self::XorEncrypted(s, _, _) | Self::Unencrypted(s) => s.flush(),
}
}
}
impl Writer {
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
match self {
Self::XorEncrypted(s, _, _) | Self::Unencrypted(s) => s.peer_addr(),
}
}
}