diff options
| author | Rubens Brandao <git@rubens.io> | 2021-04-10 19:43:21 +0200 |
|---|---|---|
| committer | Rubens Brandao <git@rubens.io> | 2021-04-10 19:43:21 +0200 |
| commit | 727710ae7e3ac8c35d66e0431682a2a90f2bd3a4 (patch) | |
| tree | 6d815dada110803aa787d116197d9aa0094821f0 /mumd/src/audio | |
| parent | 3caae1e9e17524cd2fdedc39c075ceda231cf0e1 (diff) | |
| download | mum-727710ae7e3ac8c35d66e0431682a2a90f2bd3a4.tar.gz | |
Restore multiple decoders
Diffstat (limited to 'mumd/src/audio')
| -rw-r--r-- | mumd/src/audio/input.rs | 12 | ||||
| -rw-r--r-- | mumd/src/audio/output.rs | 115 |
2 files changed, 64 insertions, 63 deletions
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index f4e9c4c..4a1ed3d 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -33,11 +33,11 @@ pub trait AudioInputDevice { fn pause(&self) -> Result<(), AudioError>; fn set_volume(&self, volume: f32); fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver<f32>; - fn get_num_channels(&self) -> usize; + fn num_channels(&self) -> usize; } pub struct DefaultAudioInputDevice { - _stream: cpal::Stream, + stream: cpal::Stream, sample_receiver: Option<futures_channel::mpsc::Receiver<f32>>, volume_sender: watch::Sender<f32>, channels: u16, @@ -105,7 +105,7 @@ impl DefaultAudioInputDevice { .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?; let res = Self { - _stream: input_stream, + stream: input_stream, sample_receiver: Some(sample_receiver), volume_sender, channels: input_config.channels, @@ -116,10 +116,10 @@ impl DefaultAudioInputDevice { impl AudioInputDevice for DefaultAudioInputDevice { fn play(&self) -> Result<(), AudioError> { - self._stream.play().map_err(|e| AudioError::InputPlayError(e)) + self.stream.play().map_err(|e| AudioError::InputPlayError(e)) } fn pause(&self) -> Result<(), AudioError> { - self._stream.pause().map_err(|e| AudioError::InputPauseError(e)) + self.stream.pause().map_err(|e| AudioError::InputPauseError(e)) } fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); @@ -128,7 +128,7 @@ impl AudioInputDevice for DefaultAudioInputDevice { let ret = self.sample_receiver.take(); ret.unwrap() } - fn get_num_channels(&self) -> usize { + fn num_channels(&self) -> usize { self.channels as usize } } diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index df9b2e2..658c1c8 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -6,49 +6,57 @@ use log::*; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{SampleFormat, SampleRate, StreamConfig, OutputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; -use opus::Channels; use std::collections::{HashMap, VecDeque}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; -type ClientStreamKey = Option<(VoiceStreamType, u32)>; -type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque<f32>>; +type ClientStreamKey = (VoiceStreamType, u32); pub struct ClientStream { - buffer: HashMap<ClientStreamKey, VecDeque<f32>>, //TODO ring buffer? - opus_decoder: opus::Decoder, + buffer_clients: HashMap<ClientStreamKey, (VecDeque<f32>, opus::Decoder)>, //TODO ring buffer? + buffer_effects: VecDeque<f32>, + sample_rate: u32, + channels: opus::Channels, } impl ClientStream { pub fn new(sample_rate: u32, channels: u16) -> Self { - let mut buffer = HashMap::new(); - //None is the system audio effects - buffer.insert(None, VecDeque::new()); + let channels = match channels { + 1 => opus::Channels::Mono, + 2 => opus::Channels::Stereo, + _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), + }; Self { - buffer, - opus_decoder: opus::Decoder::new( - sample_rate, - match channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), - }, - ) - .unwrap(), + buffer_clients: HashMap::new(), + buffer_effects: VecDeque::new(), + sample_rate, + channels, } } - pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) { + fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque<f32>, opus::Decoder) { + let sample_rate = self.sample_rate; + let channels = self.channels; + self.buffer_clients.entry(client).or_insert_with(|| { + let opus_decoder = opus::Decoder::new( + sample_rate, + channels + ).unwrap(); + (VecDeque::new(), opus_decoder) + }) + } + + pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode - let parsed = self - .opus_decoder + let mut out: Vec<f32> = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode + let (buffer, decoder) = self.get_client(client); + let parsed = decoder .decode_float(&bytes, &mut out, false) .expect("Error decoding"); out.truncate(parsed); - self.extend(client, &out); + buffer.extend(&out); } _ => { unimplemented!("Payload type not supported"); @@ -56,18 +64,12 @@ impl ClientStream { } } - pub fn consume<'a, F>(&mut self, consumer: F) - where - F: FnOnce(ConsumerInput) { - let iter = self.buffer.iter_mut(); - consumer(iter); - //remove empty Vec - self.buffer.retain(|_, v| !v.is_empty()); - } - - pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { - let entry = self.buffer.entry(key).or_insert(VecDeque::new()); - entry.extend(values.iter().copied()); + pub fn extend(&mut self, client: Option<ClientStreamKey>, values: &[f32]) { + let buffer = match client { + Some(x) => &mut self.get_client(x).0, + None => &mut self.buffer_effects, + }; + buffer.extend(values.iter().copied()); } } @@ -101,13 +103,13 @@ pub trait AudioOutputDevice { fn play(&self) -> Result<(), AudioError>; fn pause(&self) -> Result<(), AudioError>; fn set_volume(&self, volume: f32); - fn get_num_channels(&self) -> usize; - fn get_client_streams(&self) -> Arc<Mutex<ClientStream>>; + fn num_channels(&self) -> usize; + fn client_streams(&self) -> Arc<Mutex<ClientStream>>; } pub struct DefaultAudioOutputDevice { config: StreamConfig, - _stream: cpal::Stream, + stream: cpal::Stream, client_streams: Arc<Mutex<ClientStream>>, volume_sender: watch::Sender<f32>, } @@ -176,7 +178,7 @@ impl DefaultAudioOutputDevice { Ok(Self { config: output_config, - _stream: output_stream, + stream: output_stream, volume_sender: output_volume_sender, client_streams, }) @@ -185,22 +187,22 @@ impl DefaultAudioOutputDevice { impl AudioOutputDevice for DefaultAudioOutputDevice { fn play(&self) -> Result<(), AudioError> { - self._stream.play().map_err(|e| AudioError::OutputPlayError(e)) + self.stream.play().map_err(|e| AudioError::OutputPlayError(e)) } fn pause(&self) -> Result<(), AudioError> { - self._stream.pause().map_err(|e| AudioError::OutputPauseError(e)) + self.stream.pause().map_err(|e| AudioError::OutputPauseError(e)) } fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); } - fn get_num_channels(&self) -> usize { + fn num_channels(&self) -> usize { self.config.channels as usize } - fn get_client_streams(&self) -> Arc<Mutex<ClientStream>> { + fn client_streams(&self) -> Arc<Mutex<ClientStream>> { Arc::clone(&self.client_streams) } } @@ -219,21 +221,20 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display> let mut user_bufs = user_bufs.lock().unwrap(); let user_volumes = user_volumes.lock().unwrap(); - let mut saturating_add = |input: ConsumerInput| { - for (k, v) in input { - let (user_volume, muted) = match k { - Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)), - None => (1.0, false), - }; - for sample in data.iter_mut() { - if !muted { - *sample = sample.saturating_add(Sample::from( - &(v.pop_front().unwrap_or(0.0) * volume * user_volume), - )); - } + for (k, v) in user_bufs.buffer_clients.iter_mut() { + let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false)); + for sample in data.iter_mut() { + if !muted { + *sample = sample.saturating_add(Sample::from( + &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume), + )); } } - }; - user_bufs.consume(&mut saturating_add); + } + for sample in data.iter_mut() { + *sample = sample.saturating_add(Sample::from( + &(user_bufs.buffer_effects.pop_front().unwrap_or(0.0) * volume), + )); + } } } |
