diff options
| -rw-r--r-- | mumd/src/audio.rs | 71 | ||||
| -rw-r--r-- | mumd/src/audio/output.rs | 108 |
2 files changed, 96 insertions, 83 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 5a839bc..814050b 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -3,7 +3,7 @@ pub mod output; mod noise_gate; use crate::audio::input::{DefaultAudioInputDevice, AudioInputDevice}; -use crate::audio::output::SaturatingAdd; +use crate::audio::output::ClientStream; use crate::audio::noise_gate::{from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt}; use crate::error::{AudioError, AudioStream}; use crate::network::VoiceStreamType; @@ -20,7 +20,7 @@ use mumble_protocol::Serverbound; use mumble_protocol::voice::{VoicePacketPayload, VoicePacket}; use mumlib::config::SoundEffect; use std::borrow::Cow; -use std::collections::{hash_map::Entry, HashMap, VecDeque}; +use std::collections::{hash_map::Entry, HashMap}; use std::convert::TryFrom; use std::fmt::Debug; use std::fs::File; @@ -126,10 +126,9 @@ pub struct AudioOutput { user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, - client_streams: Arc<Mutex<HashMap<(VoiceStreamType, u32), output::ClientStream>>>, + client_streams: Arc<Mutex<ClientStream>>, sounds: HashMap<NotificationEvents, Vec<f32>>, - play_sounds: Arc<Mutex<VecDeque<f32>>>, } impl AudioOutput { @@ -159,14 +158,12 @@ impl AudioOutput { let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume); - let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new())); - let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new())); + let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(sample_rate.0, output_config.channels))); let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( &output_config, output::curry_callback::<f32>( - Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), @@ -176,7 +173,6 @@ impl AudioOutput { SampleFormat::I16 => output_device.build_output_stream( &output_config, output::curry_callback::<i16>( - Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), @@ -186,7 +182,6 @@ impl AudioOutput { SampleFormat::U16 => output_device.build_output_stream( &output_config, output::curry_callback::<u16>( - Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), @@ -204,7 +199,6 @@ impl AudioOutput { sounds: HashMap::new(), volume_sender: output_volume_sender, user_volumes, - play_sounds, }; res.load_sound_effects(&[]); Ok(res) @@ -265,52 +259,23 @@ impl AudioOutput { } pub fn decode_packet_payload(&self, stream_type: VoiceStreamType, session_id: u32, payload: VoicePacketPayload) { - match self.client_streams.lock().unwrap().entry((stream_type, session_id)) { - Entry::Occupied(mut entry) => { - entry - .get_mut() - .decode_packet(payload, self.config.channels as usize); - } - Entry::Vacant(_) => { - warn!("Can't find session id {}", session_id); - } - } + self.client_streams.lock().unwrap().decode_packet( + Some((stream_type, session_id)), + payload, + self.config.channels as usize, + ); } pub fn add_client(&self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) { - Entry::Occupied(_) => { - warn!("Session id {} already exists", session_id); - } - Entry::Vacant(entry) => { - entry.insert(output::ClientStream::new( - self.config.sample_rate.0, - self.config.channels, - )); - } - } - } + self.client_streams.lock().unwrap().add_client(session_id); } pub fn remove_client(&self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) { - Entry::Occupied(entry) => { - entry.remove(); - } - Entry::Vacant(_) => { - warn!( - "Tried to remove session id {} that doesn't exist", - session_id - ); - } - } - } + self.client_streams.lock().unwrap().remove_client(session_id); } - pub fn clear_clients(&mut self) { - self.client_streams.lock().unwrap().clear(); + pub fn clear_clients(&self) { + self.client_streams.lock().unwrap().clear_clients(); } pub fn set_volume(&self, output_volume: f32) { @@ -341,15 +306,7 @@ impl AudioOutput { pub fn play_effect(&self, effect: NotificationEvents) { let samples = self.sounds.get(&effect).unwrap(); - - let mut play_sounds = self.play_sounds.lock().unwrap(); - - for (val, e) in play_sounds.iter_mut().zip(samples.iter()) { - *val = val.saturating_add(*e); - } - - let l = play_sounds.len(); - play_sounds.extend(samples.iter().skip(l)); + self.client_streams.lock().unwrap().extend(None, samples); } } diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 421d395..797cf84 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,22 +1,29 @@ use crate::network::VoiceStreamType; +use log::*; use cpal::{OutputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; use opus::Channels; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, VecDeque, hash_map::Entry}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; +type ClientStreamKey = Option<(VoiceStreamType, u32)>; +type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque<f32>>; + pub struct ClientStream { - buffer: VecDeque<f32>, //TODO ring buffer? + buffer: HashMap<ClientStreamKey, VecDeque<f32>>, //TODO ring buffer? opus_decoder: opus::Decoder, } impl ClientStream { pub fn new(sample_rate: u32, channels: u16) -> Self { + let mut buffer = HashMap::new(); + //None is the system audio effects + buffer.insert(None, VecDeque::new()); Self { - buffer: VecDeque::new(), + buffer, opus_decoder: opus::Decoder::new( sample_rate, match channels { @@ -29,7 +36,7 @@ impl ClientStream { } } - pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) { + pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode @@ -38,13 +45,67 @@ impl ClientStream { .decode_float(&bytes, &mut out, false) .expect("Error decoding"); out.truncate(parsed); - self.buffer.extend(out); + self.extend(client, &out); } _ => { unimplemented!("Payload type not supported"); } } } + + pub fn consume<'a, F>(&mut self, consumer: F) + where + F: FnOnce(ConsumerInput) { + let iter = self.buffer.iter_mut(); + consumer(iter); + } + + pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { + match self.buffer.entry(key) { + Entry::Occupied(mut entry) => { + entry.get_mut().extend(values.iter().copied()); + } + Entry::Vacant(_) => { + match key { + None => warn!("Can't find session None"), + Some(key) => warn!("Can't find session id {}", key.1), + } + } + } + } + + pub fn add_client(&mut self, session_id: u32) { + for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { + match self.buffer.entry(Some((*stream_type, session_id))) { + Entry::Occupied(_) => { + warn!("Session id {} already exists", session_id); + } + Entry::Vacant(entry) => { + entry.insert(VecDeque::new()); + } + } + } + } + + pub fn remove_client(&mut self, session_id: u32) { + for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { + match self.buffer.entry(Some((*stream_type, session_id))) { + Entry::Occupied(entry) => { + entry.remove(); + } + Entry::Vacant(_) => { + warn!( + "Tried to remove session id {} that doesn't exist", + session_id + ); + } + } + } + } + + pub fn clear_clients(&mut self) { + self.buffer.retain(|k , _| k.is_none()); + } } pub trait SaturatingAdd { @@ -74,8 +135,7 @@ impl SaturatingAdd for u16 { } pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>( - effect_sound: Arc<Mutex<VecDeque<f32>>>, - user_bufs: Arc<Mutex<HashMap<(VoiceStreamType, u32), ClientStream>>>, + user_bufs: Arc<Mutex<ClientStream>>, output_volume_receiver: watch::Receiver<f32>, user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { @@ -86,27 +146,23 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display> let volume = *output_volume_receiver.borrow(); - let mut effects_sound = effect_sound.lock().unwrap(); let mut user_bufs = user_bufs.lock().unwrap(); - for ((_, id), client_stream) in &mut *user_bufs { - let (user_volume, muted) = user_volumes - .lock() - .unwrap() - .get(id) - .cloned() - .unwrap_or((1.0, false)); - for sample in data.iter_mut() { - let s = client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume; - if !muted { - *sample = sample.saturating_add(Sample::from(&s)); + let user_volumes = user_volumes.lock().unwrap(); + let mut saturating_add = |input: ConsumerInput| { + for (k, v) in input { + let (user_volume, muted) = match k { + Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)), + None => (1.0, false), + }; + for sample in data.iter_mut() { + if !muted { + *sample = sample.saturating_add(Sample::from( + &(v.pop_front().unwrap_or(0.0) * volume * user_volume), + )); + } } } - } - - for sample in data.iter_mut() { - *sample = sample.saturating_add(Sample::from( - &(effects_sound.pop_front().unwrap_or(0.0) * volume), - )); - } + }; + user_bufs.consume(&mut saturating_add); } } |
