From c27074573f4025f4487f18557c57f8c14b8652da Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Sun, 13 Jun 2021 01:42:31 +0200 Subject: rework audio decoding --- mumd/src/audio.rs | 2 +- mumd/src/audio/output.rs | 96 +++++++++++++++++++++++++++++++++--------------- 2 files changed, 67 insertions(+), 31 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 2e20583..beff9cd 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -235,7 +235,7 @@ impl AudioOutput { pub fn play_effect(&self, effect: NotificationEvents) { let samples = self.sounds.get(&effect).unwrap(); - self.client_streams.lock().unwrap().extend(None, samples); + self.client_streams.lock().unwrap().add_sound_effect(samples); } } diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index a2f6bcc..d74e6e1 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -2,8 +2,10 @@ use crate::audio::SAMPLE_RATE; use crate::error::{AudioError, AudioStream}; use crate::network::VoiceStreamType; +use bytes::Bytes; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; +use dasp_ring_buffer::Bounded; use log::*; use mumble_protocol::voice::VoicePacketPayload; use std::collections::{HashMap, VecDeque}; @@ -13,11 +15,57 @@ use tokio::sync::watch; type ClientStreamKey = (VoiceStreamType, u32); +pub struct ClientAudioData { + buf: Bounded>, + output_channels: opus::Channels, + //we need two since a client can hypothetically send both mono + //and stereo packets, and we can't switch a decoder on the fly + //to reuse it + mono_decoder: opus::Decoder, + stereo_decoder: opus::Decoder, +} + +impl ClientAudioData { + pub fn new(sample_rate: u32, output_channels: opus::Channels) -> Self { + Self { + mono_decoder: opus::Decoder::new(sample_rate, opus::Channels::Mono).unwrap(), + stereo_decoder: opus::Decoder::new(sample_rate, opus::Channels::Stereo).unwrap(), + output_channels, + buf: Bounded::from_full(vec![0.0; sample_rate as usize * output_channels as usize]), //buffer 1 s of audio + } + } + + pub fn store_packet(&mut self, bytes: Bytes) { + let packet_channels = opus::packet::get_nb_channels(&bytes).unwrap(); + let (decoder, channels) = match packet_channels { + opus::Channels::Mono => (&mut self.mono_decoder, 1), + opus::Channels::Stereo => (&mut self.stereo_decoder, 2), + }; + let mut out: Vec = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode + let parsed = decoder + .decode_float(&bytes, &mut out, false) + .expect("Error decoding"); + out.truncate(parsed); + match (packet_channels, self.output_channels) { + (opus::Channels::Mono, opus::Channels::Mono) | (opus::Channels::Stereo, opus::Channels::Stereo) => for sample in out { + self.buf.push(sample); + }, + (opus::Channels::Mono, opus::Channels::Stereo) => for sample in out { + self.buf.push(sample); + self.buf.push(sample); + }, + (opus::Channels::Stereo, opus::Channels::Mono) => for sample in out.into_iter().step_by(2) { + self.buf.push(sample); + }, + } + } +} + pub struct ClientStream { - buffer_clients: HashMap, opus::Decoder)>, //TODO ring buffer? + buffer_clients: HashMap, buffer_effects: VecDeque, sample_rate: u32, - channels: opus::Channels, + output_channels: opus::Channels, } impl ClientStream { @@ -31,29 +79,21 @@ impl ClientStream { buffer_clients: HashMap::new(), buffer_effects: VecDeque::new(), sample_rate, - channels, + output_channels: channels, } } - fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque, opus::Decoder) { - let sample_rate = self.sample_rate; - let channels = self.channels; - self.buffer_clients.entry(client).or_insert_with(|| { - let opus_decoder = opus::Decoder::new(sample_rate, channels).unwrap(); - (VecDeque::new(), opus_decoder) - }) + fn get_client(&mut self, client: ClientStreamKey) -> &mut ClientAudioData { + self.buffer_clients.entry(client).or_insert( + ClientAudioData::new(self.sample_rate, self.output_channels) + ) } pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode - let (buffer, decoder) = self.get_client(client); - let parsed = decoder - .decode_float(&bytes, &mut out, false) - .expect("Error decoding"); - out.truncate(parsed); - buffer.extend(&out); + debug!("{:?} {:?}", self.output_channels, opus::packet::get_nb_channels(&bytes)); + self.get_client(client).store_packet(bytes); } _ => { unimplemented!("Payload type not supported"); @@ -61,12 +101,8 @@ impl ClientStream { } } - pub fn extend(&mut self, client: Option, values: &[f32]) { - let buffer = match client { - Some(x) => &mut self.get_client(x).0, - None => &mut self.buffer_effects, - }; - buffer.extend(values.iter().copied()); + pub fn add_sound_effect(&mut self, values: &[f32]) { + self.buffer_effects.extend(values.iter().copied()); } } @@ -148,7 +184,7 @@ impl DefaultAudioOutputDevice { let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( &output_config, - curry_callback::( + callback::( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -157,7 +193,7 @@ impl DefaultAudioOutputDevice { ), SampleFormat::I16 => output_device.build_output_stream( &output_config, - curry_callback::( + callback::( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -166,7 +202,7 @@ impl DefaultAudioOutputDevice { ), SampleFormat::U16 => output_device.build_output_stream( &output_config, - curry_callback::( + callback::( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -211,7 +247,7 @@ impl AudioOutputDevice for DefaultAudioOutputDevice { } } -pub fn curry_callback( +pub fn callback( user_bufs: Arc>, output_volume_receiver: watch::Receiver, user_volumes: Arc>>, @@ -227,10 +263,10 @@ pub fn curry_callback let user_volumes = user_volumes.lock().unwrap(); for (k, v) in user_bufs.buffer_clients.iter_mut() { let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false)); - for sample in data.iter_mut() { - if !muted { + if !muted { + for sample in data.iter_mut() { *sample = sample.saturating_add(Sample::from( - &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume), + &(v.buf.pop().unwrap_or(0.0) * volume * user_volume), )); } } -- cgit v1.2.1 From 3d14ef37acb4060937ba0463f74976ef8e4ede79 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Sun, 13 Jun 2021 01:50:43 +0200 Subject: remove unecessary debug print --- mumd/src/audio/output.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index d74e6e1..f313f8f 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -92,7 +92,6 @@ impl ClientStream { pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { - debug!("{:?} {:?}", self.output_channels, opus::packet::get_nb_channels(&bytes)); self.get_client(client).store_packet(bytes); } _ => { -- cgit v1.2.1 From d2ee549d248af9988bcbd011a86aaa37e4db15ba Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Mon, 14 Jun 2021 22:21:24 +0200 Subject: rework transformer trait --- mumd/src/audio/transformers.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mumd/src/audio/transformers.rs b/mumd/src/audio/transformers.rs index 25e28b8..91cf3ac 100644 --- a/mumd/src/audio/transformers.rs +++ b/mumd/src/audio/transformers.rs @@ -2,7 +2,7 @@ pub trait Transformer { /// Do the transform. Returning `None` is interpreted as "the buffer is unwanted". /// The implementor is free to modify the buffer however it wants to. - fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]>; + fn transform<'a>(&mut self, buf: (opus::Channels, &'a mut [f32])) -> Option<(opus::Channels, &'a mut [f32])>; } /// A struct representing a noise gate transform. @@ -25,7 +25,7 @@ impl NoiseGate { } impl Transformer for NoiseGate { - fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]> { + fn transform<'a>(&mut self, (channels, buf): (opus::Channels, &'a mut [f32])) -> Option<(opus::Channels, &'a mut [f32])> { const MUTE_PERCENTAGE: f32 = 0.1; let max = buf.iter().map(|e| e.abs()).max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(); @@ -43,7 +43,7 @@ impl Transformer for NoiseGate { if self.open == 0 { None } else { - Some(buf) + Some((channels, buf)) } } } -- cgit v1.2.1 From ffdcb556f5602ac23c7c7660bb3b034459b3adc0 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Mon, 14 Jun 2021 23:55:07 +0200 Subject: update input code to use new transformer trait --- mumd/src/audio/input.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index a1227e3..25d15d5 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -29,8 +29,8 @@ pub fn callback( buffer.extend(data.by_ref().take(buffer_size - buffer.len())); let encoded = transformers .iter_mut() - .try_fold(&mut buffer[..], |acc, e| e.transform(acc)) - .map(|buf| opus_encoder.encode_vec_float(&*buf, buffer_size).unwrap()); + .try_fold((opus::Channels::Mono, &mut buffer[..]), |acc, e| e.transform(acc)) + .map(|buf| opus_encoder.encode_vec_float(&*buf.1, buffer_size).unwrap()); if let Some(encoded) = encoded { if let Err(e) = input_sender.try_send(encoded) { -- cgit v1.2.1 From ee7a35251ef7a24eb4c8d8f892f6a73a8f69f01e Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Sat, 19 Jun 2021 16:41:18 +0200 Subject: create callbacks in the right way --- mumd/src/audio/output.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index f313f8f..2177fb2 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -9,6 +9,7 @@ use dasp_ring_buffer::Bounded; use log::*; use mumble_protocol::voice::VoicePacketPayload; use std::collections::{HashMap, VecDeque}; +use std::iter; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; @@ -250,6 +251,7 @@ pub fn callback( user_bufs: Arc>, output_volume_receiver: watch::Receiver, user_volumes: Arc>>, + output_channels: opus::Channels, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { move |data: &mut [T], _info: &OutputCallbackInfo| { for sample in data.iter_mut() { @@ -263,9 +265,9 @@ pub fn callback( for (k, v) in user_bufs.buffer_clients.iter_mut() { let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false)); if !muted { - for sample in data.iter_mut() { + for (sample, val) in data.iter_mut().zip(v.buf.drain().chain(iter::repeat(0.0))) { *sample = sample.saturating_add(Sample::from( - &(v.buf.pop().unwrap_or(0.0) * volume * user_volume), + &(val * volume * user_volume), )); } } -- cgit v1.2.1 From 4a1d75e0f5f960c59dfba153212aca4830c28237 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Sat, 19 Jun 2021 17:23:25 +0200 Subject: make it compile --- mumd/src/audio/output.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 2177fb2..2015435 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -180,6 +180,14 @@ impl DefaultAudioOutputDevice { let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); let (output_volume_sender, output_volume_receiver) = watch::channel::(output_volume); + let output_channels = match output_config.channels { + 1 => opus::Channels::Mono, + 2 => opus::Channels::Stereo, + _ => { + warn!("Trying to output to an unsupported number of channels ({}), defaulting to mono", output_config.channels); + opus::Channels::Mono + } + }; let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( @@ -188,6 +196,7 @@ impl DefaultAudioOutputDevice { Arc::clone(&client_streams), output_volume_receiver, user_volumes, + output_channels, ), err_fn, ), @@ -197,6 +206,7 @@ impl DefaultAudioOutputDevice { Arc::clone(&client_streams), output_volume_receiver, user_volumes, + output_channels, ), err_fn, ), @@ -206,6 +216,7 @@ impl DefaultAudioOutputDevice { Arc::clone(&client_streams), output_volume_receiver, user_volumes, + output_channels, ), err_fn, ), -- cgit v1.2.1 From 37f1089cb5f2468c0659f7d83c7530b35eedeaec Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Sat, 19 Jun 2021 17:48:21 +0200 Subject: fix compiler warning --- mumd/src/audio/output.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 2015435..cdb7b8e 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -180,14 +180,6 @@ impl DefaultAudioOutputDevice { let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); let (output_volume_sender, output_volume_receiver) = watch::channel::(output_volume); - let output_channels = match output_config.channels { - 1 => opus::Channels::Mono, - 2 => opus::Channels::Stereo, - _ => { - warn!("Trying to output to an unsupported number of channels ({}), defaulting to mono", output_config.channels); - opus::Channels::Mono - } - }; let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( @@ -196,7 +188,6 @@ impl DefaultAudioOutputDevice { Arc::clone(&client_streams), output_volume_receiver, user_volumes, - output_channels, ), err_fn, ), @@ -206,7 +197,6 @@ impl DefaultAudioOutputDevice { Arc::clone(&client_streams), output_volume_receiver, user_volumes, - output_channels, ), err_fn, ), @@ -216,7 +206,6 @@ impl DefaultAudioOutputDevice { Arc::clone(&client_streams), output_volume_receiver, user_volumes, - output_channels, ), err_fn, ), @@ -262,7 +251,6 @@ pub fn callback( user_bufs: Arc>, output_volume_receiver: watch::Receiver, user_volumes: Arc>>, - output_channels: opus::Channels, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { move |data: &mut [T], _info: &OutputCallbackInfo| { for sample in data.iter_mut() { -- cgit v1.2.1