diff options
| author | Rubens Brandao <git@rubens.io> | 2021-04-08 21:14:48 +0200 |
|---|---|---|
| committer | Rubens Brandao <git@rubens.io> | 2021-04-08 21:14:48 +0200 |
| commit | 7fed8f81222de570d864487605e42b5cbb023218 (patch) | |
| tree | 03ff7f0e85ce6c356d09ff47f4d782110310657c /mumd | |
| parent | 12554b2f6cd89ad3cd3721bbc790d7772a21c3ae (diff) | |
| download | mum-7fed8f81222de570d864487605e42b5cbb023218.tar.gz | |
Move audio decode logic to ClientStream
This way is possible to deduplicate the opus::Decoder used by audio output.
The audio effects and client network streams are unified in only one place,
allowing the Audio Output device to consume the Samples with only one
call.
Diffstat (limited to 'mumd')
| -rw-r--r-- | mumd/src/audio.rs | 71 | ||||
| -rw-r--r-- | mumd/src/audio/output.rs | 108 |
2 files changed, 96 insertions, 83 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 5a839bc..814050b 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -3,7 +3,7 @@ pub mod output; mod noise_gate; use crate::audio::input::{DefaultAudioInputDevice, AudioInputDevice}; -use crate::audio::output::SaturatingAdd; +use crate::audio::output::ClientStream; use crate::audio::noise_gate::{from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt}; use crate::error::{AudioError, AudioStream}; use crate::network::VoiceStreamType; @@ -20,7 +20,7 @@ use mumble_protocol::Serverbound; use mumble_protocol::voice::{VoicePacketPayload, VoicePacket}; use mumlib::config::SoundEffect; use std::borrow::Cow; -use std::collections::{hash_map::Entry, HashMap, VecDeque}; +use std::collections::{hash_map::Entry, HashMap}; use std::convert::TryFrom; use std::fmt::Debug; use std::fs::File; @@ -126,10 +126,9 @@ pub struct AudioOutput { user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, - client_streams: Arc<Mutex<HashMap<(VoiceStreamType, u32), output::ClientStream>>>, + client_streams: Arc<Mutex<ClientStream>>, sounds: HashMap<NotificationEvents, Vec<f32>>, - play_sounds: Arc<Mutex<VecDeque<f32>>>, } impl AudioOutput { @@ -159,14 +158,12 @@ impl AudioOutput { let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume); - let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new())); - let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new())); + let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(sample_rate.0, output_config.channels))); let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( &output_config, output::curry_callback::<f32>( - Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), @@ -176,7 +173,6 @@ impl AudioOutput { SampleFormat::I16 => output_device.build_output_stream( &output_config, output::curry_callback::<i16>( - Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), @@ -186,7 +182,6 @@ impl AudioOutput { SampleFormat::U16 => output_device.build_output_stream( &output_config, output::curry_callback::<u16>( - Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), @@ -204,7 +199,6 @@ impl AudioOutput { sounds: HashMap::new(), volume_sender: output_volume_sender, user_volumes, - play_sounds, }; res.load_sound_effects(&[]); Ok(res) @@ -265,52 +259,23 @@ impl AudioOutput { } pub fn decode_packet_payload(&self, stream_type: VoiceStreamType, session_id: u32, payload: VoicePacketPayload) { - match self.client_streams.lock().unwrap().entry((stream_type, session_id)) { - Entry::Occupied(mut entry) => { - entry - .get_mut() - .decode_packet(payload, self.config.channels as usize); - } - Entry::Vacant(_) => { - warn!("Can't find session id {}", session_id); - } - } + self.client_streams.lock().unwrap().decode_packet( + Some((stream_type, session_id)), + payload, + self.config.channels as usize, + ); } pub fn add_client(&self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) { - Entry::Occupied(_) => { - warn!("Session id {} already exists", session_id); - } - Entry::Vacant(entry) => { - entry.insert(output::ClientStream::new( - self.config.sample_rate.0, - self.config.channels, - )); - } - } - } + self.client_streams.lock().unwrap().add_client(session_id); } pub fn remove_client(&self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) { - Entry::Occupied(entry) => { - entry.remove(); - } - Entry::Vacant(_) => { - warn!( - "Tried to remove session id {} that doesn't exist", - session_id - ); - } - } - } + self.client_streams.lock().unwrap().remove_client(session_id); } - pub fn clear_clients(&mut self) { - self.client_streams.lock().unwrap().clear(); + pub fn clear_clients(&self) { + self.client_streams.lock().unwrap().clear_clients(); } pub fn set_volume(&self, output_volume: f32) { @@ -341,15 +306,7 @@ impl AudioOutput { pub fn play_effect(&self, effect: NotificationEvents) { let samples = self.sounds.get(&effect).unwrap(); - - let mut play_sounds = self.play_sounds.lock().unwrap(); - - for (val, e) in play_sounds.iter_mut().zip(samples.iter()) { - *val = val.saturating_add(*e); - } - - let l = play_sounds.len(); - play_sounds.extend(samples.iter().skip(l)); + self.client_streams.lock().unwrap().extend(None, samples); } } diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 421d395..797cf84 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,22 +1,29 @@ use crate::network::VoiceStreamType; +use log::*; use cpal::{OutputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; use opus::Channels; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, VecDeque, hash_map::Entry}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; +type ClientStreamKey = Option<(VoiceStreamType, u32)>; +type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque<f32>>; + pub struct ClientStream { - buffer: VecDeque<f32>, //TODO ring buffer? + buffer: HashMap<ClientStreamKey, VecDeque<f32>>, //TODO ring buffer? opus_decoder: opus::Decoder, } impl ClientStream { pub fn new(sample_rate: u32, channels: u16) -> Self { + let mut buffer = HashMap::new(); + //None is the system audio effects + buffer.insert(None, VecDeque::new()); Self { - buffer: VecDeque::new(), + buffer, opus_decoder: opus::Decoder::new( sample_rate, match channels { @@ -29,7 +36,7 @@ impl ClientStream { } } - pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) { + pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode @@ -38,13 +45,67 @@ impl ClientStream { .decode_float(&bytes, &mut out, false) .expect("Error decoding"); out.truncate(parsed); - self.buffer.extend(out); + self.extend(client, &out); } _ => { unimplemented!("Payload type not supported"); } } } + + pub fn consume<'a, F>(&mut self, consumer: F) + where + F: FnOnce(ConsumerInput) { + let iter = self.buffer.iter_mut(); + consumer(iter); + } + + pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { + match self.buffer.entry(key) { + Entry::Occupied(mut entry) => { + entry.get_mut().extend(values.iter().copied()); + } + Entry::Vacant(_) => { + match key { + None => warn!("Can't find session None"), + Some(key) => warn!("Can't find session id {}", key.1), + } + } + } + } + + pub fn add_client(&mut self, session_id: u32) { + for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { + match self.buffer.entry(Some((*stream_type, session_id))) { + Entry::Occupied(_) => { + warn!("Session id {} already exists", session_id); + } + Entry::Vacant(entry) => { + entry.insert(VecDeque::new()); + } + } + } + } + + pub fn remove_client(&mut self, session_id: u32) { + for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { + match self.buffer.entry(Some((*stream_type, session_id))) { + Entry::Occupied(entry) => { + entry.remove(); + } + Entry::Vacant(_) => { + warn!( + "Tried to remove session id {} that doesn't exist", + session_id + ); + } + } + } + } + + pub fn clear_clients(&mut self) { + self.buffer.retain(|k , _| k.is_none()); + } } pub trait SaturatingAdd { @@ -74,8 +135,7 @@ impl SaturatingAdd for u16 { } pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>( - effect_sound: Arc<Mutex<VecDeque<f32>>>, - user_bufs: Arc<Mutex<HashMap<(VoiceStreamType, u32), ClientStream>>>, + user_bufs: Arc<Mutex<ClientStream>>, output_volume_receiver: watch::Receiver<f32>, user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { @@ -86,27 +146,23 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display> let volume = *output_volume_receiver.borrow(); - let mut effects_sound = effect_sound.lock().unwrap(); let mut user_bufs = user_bufs.lock().unwrap(); - for ((_, id), client_stream) in &mut *user_bufs { - let (user_volume, muted) = user_volumes - .lock() - .unwrap() - .get(id) - .cloned() - .unwrap_or((1.0, false)); - for sample in data.iter_mut() { - let s = client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume; - if !muted { - *sample = sample.saturating_add(Sample::from(&s)); + let user_volumes = user_volumes.lock().unwrap(); + let mut saturating_add = |input: ConsumerInput| { + for (k, v) in input { + let (user_volume, muted) = match k { + Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)), + None => (1.0, false), + }; + for sample in data.iter_mut() { + if !muted { + *sample = sample.saturating_add(Sample::from( + &(v.pop_front().unwrap_or(0.0) * volume * user_volume), + )); + } } } - } - - for sample in data.iter_mut() { - *sample = sample.saturating_add(Sample::from( - &(effects_sound.pop_front().unwrap_or(0.0) * volume), - )); - } + }; + user_bufs.consume(&mut saturating_add); } } |
