diff options
Diffstat (limited to 'mumd/src/audio/output.rs')
| -rw-r--r-- | mumd/src/audio/output.rs | 108 |
1 files changed, 82 insertions, 26 deletions
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 421d395..797cf84 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,22 +1,29 @@ use crate::network::VoiceStreamType; +use log::*; use cpal::{OutputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; use opus::Channels; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, VecDeque, hash_map::Entry}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; +type ClientStreamKey = Option<(VoiceStreamType, u32)>; +type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque<f32>>; + pub struct ClientStream { - buffer: VecDeque<f32>, //TODO ring buffer? + buffer: HashMap<ClientStreamKey, VecDeque<f32>>, //TODO ring buffer? opus_decoder: opus::Decoder, } impl ClientStream { pub fn new(sample_rate: u32, channels: u16) -> Self { + let mut buffer = HashMap::new(); + //None is the system audio effects + buffer.insert(None, VecDeque::new()); Self { - buffer: VecDeque::new(), + buffer, opus_decoder: opus::Decoder::new( sample_rate, match channels { @@ -29,7 +36,7 @@ impl ClientStream { } } - pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) { + pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode @@ -38,13 +45,67 @@ impl ClientStream { .decode_float(&bytes, &mut out, false) .expect("Error decoding"); out.truncate(parsed); - self.buffer.extend(out); + self.extend(client, &out); } _ => { unimplemented!("Payload type not supported"); } } } + + pub fn consume<'a, F>(&mut self, consumer: F) + where + F: FnOnce(ConsumerInput) { + let iter = self.buffer.iter_mut(); + consumer(iter); + } + + pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { + match self.buffer.entry(key) { + Entry::Occupied(mut entry) => { + entry.get_mut().extend(values.iter().copied()); + } + Entry::Vacant(_) => { + match key { + None => warn!("Can't find session None"), + Some(key) => warn!("Can't find session id {}", key.1), + } + } + } + } + + pub fn add_client(&mut self, session_id: u32) { + for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { + match self.buffer.entry(Some((*stream_type, session_id))) { + Entry::Occupied(_) => { + warn!("Session id {} already exists", session_id); + } + Entry::Vacant(entry) => { + entry.insert(VecDeque::new()); + } + } + } + } + + pub fn remove_client(&mut self, session_id: u32) { + for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { + match self.buffer.entry(Some((*stream_type, session_id))) { + Entry::Occupied(entry) => { + entry.remove(); + } + Entry::Vacant(_) => { + warn!( + "Tried to remove session id {} that doesn't exist", + session_id + ); + } + } + } + } + + pub fn clear_clients(&mut self) { + self.buffer.retain(|k , _| k.is_none()); + } } pub trait SaturatingAdd { @@ -74,8 +135,7 @@ impl SaturatingAdd for u16 { } pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>( - effect_sound: Arc<Mutex<VecDeque<f32>>>, - user_bufs: Arc<Mutex<HashMap<(VoiceStreamType, u32), ClientStream>>>, + user_bufs: Arc<Mutex<ClientStream>>, output_volume_receiver: watch::Receiver<f32>, user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { @@ -86,27 +146,23 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display> let volume = *output_volume_receiver.borrow(); - let mut effects_sound = effect_sound.lock().unwrap(); let mut user_bufs = user_bufs.lock().unwrap(); - for ((_, id), client_stream) in &mut *user_bufs { - let (user_volume, muted) = user_volumes - .lock() - .unwrap() - .get(id) - .cloned() - .unwrap_or((1.0, false)); - for sample in data.iter_mut() { - let s = client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume; - if !muted { - *sample = sample.saturating_add(Sample::from(&s)); + let user_volumes = user_volumes.lock().unwrap(); + let mut saturating_add = |input: ConsumerInput| { + for (k, v) in input { + let (user_volume, muted) = match k { + Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)), + None => (1.0, false), + }; + for sample in data.iter_mut() { + if !muted { + *sample = sample.saturating_add(Sample::from( + &(v.pop_front().unwrap_or(0.0) * volume * user_volume), + )); + } } } - } - - for sample in data.iter_mut() { - *sample = sample.saturating_add(Sample::from( - &(effects_sound.pop_front().unwrap_or(0.0) * volume), - )); - } + }; + user_bufs.consume(&mut saturating_add); } } |
