diff options
Diffstat (limited to 'mumd/src/audio/output.rs')
| -rw-r--r-- | mumd/src/audio/output.rs | 134 |
1 files changed, 102 insertions, 32 deletions
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index a2f6bcc..6cec6fc 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,23 +1,79 @@ +//! Receives audio packets from the networking and plays them. + use crate::audio::SAMPLE_RATE; use crate::error::{AudioError, AudioStream}; use crate::network::VoiceStreamType; +use bytes::Bytes; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; +use dasp_ring_buffer::Bounded; use log::*; use mumble_protocol::voice::VoicePacketPayload; use std::collections::{HashMap, VecDeque}; +use std::fmt::Debug; +use std::iter; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; type ClientStreamKey = (VoiceStreamType, u32); +/// State for decoding audio received from another user. +#[derive(Debug)] +pub struct ClientAudioData { + buf: Bounded<Vec<f32>>, + output_channels: opus::Channels, + // We need both since a client can hypothetically send both mono + // and stereo packets, and we can't switch a decoder on the fly + // to reuse it. + mono_decoder: opus::Decoder, + stereo_decoder: opus::Decoder, +} + +impl ClientAudioData { + pub fn new(sample_rate: u32, output_channels: opus::Channels) -> Self { + Self { + mono_decoder: opus::Decoder::new(sample_rate, opus::Channels::Mono).unwrap(), + stereo_decoder: opus::Decoder::new(sample_rate, opus::Channels::Stereo).unwrap(), + output_channels, + buf: Bounded::from_full(vec![0.0; sample_rate as usize * output_channels as usize]), //buffer 1 s of audio + } + } + + pub fn store_packet(&mut self, bytes: Bytes) { + let packet_channels = opus::packet::get_nb_channels(&bytes).unwrap(); + let (decoder, channels) = match packet_channels { + opus::Channels::Mono => (&mut self.mono_decoder, 1), + opus::Channels::Stereo => (&mut self.stereo_decoder, 2), + }; + let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode + let parsed = decoder + .decode_float(&bytes, &mut out, false) + .expect("Error decoding"); + out.truncate(parsed); + match (packet_channels, self.output_channels) { + (opus::Channels::Mono, opus::Channels::Mono) | (opus::Channels::Stereo, opus::Channels::Stereo) => for sample in out { + self.buf.push(sample); + }, + (opus::Channels::Mono, opus::Channels::Stereo) => for sample in out { + self.buf.push(sample); + self.buf.push(sample); + }, + (opus::Channels::Stereo, opus::Channels::Mono) => for sample in out.into_iter().step_by(2) { + self.buf.push(sample); + }, + } + } +} + +/// Collected state for client opus decoders and sound effects. +#[derive(Debug)] pub struct ClientStream { - buffer_clients: HashMap<ClientStreamKey, (VecDeque<f32>, opus::Decoder)>, //TODO ring buffer? + buffer_clients: HashMap<ClientStreamKey, ClientAudioData>, buffer_effects: VecDeque<f32>, sample_rate: u32, - channels: opus::Channels, + output_channels: opus::Channels, } impl ClientStream { @@ -31,29 +87,21 @@ impl ClientStream { buffer_clients: HashMap::new(), buffer_effects: VecDeque::new(), sample_rate, - channels, + output_channels: channels, } } - fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque<f32>, opus::Decoder) { - let sample_rate = self.sample_rate; - let channels = self.channels; - self.buffer_clients.entry(client).or_insert_with(|| { - let opus_decoder = opus::Decoder::new(sample_rate, channels).unwrap(); - (VecDeque::new(), opus_decoder) - }) + fn get_client(&mut self, client: ClientStreamKey) -> &mut ClientAudioData { + self.buffer_clients.entry(client).or_insert( + ClientAudioData::new(self.sample_rate, self.output_channels) + ) } + /// Decodes a voice packet. pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec<f32> = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode - let (buffer, decoder) = self.get_client(client); - let parsed = decoder - .decode_float(&bytes, &mut out, false) - .expect("Error decoding"); - out.truncate(parsed); - buffer.extend(&out); + self.get_client(client).store_packet(bytes); } _ => { unimplemented!("Payload type not supported"); @@ -61,16 +109,19 @@ impl ClientStream { } } - pub fn extend(&mut self, client: Option<ClientStreamKey>, values: &[f32]) { - let buffer = match client { - Some(x) => &mut self.get_client(x).0, - None => &mut self.buffer_effects, - }; - buffer.extend(values.iter().copied()); + /// Extends the sound effect buffer queue with some received values. + pub fn add_sound_effect(&mut self, values: &[f32]) { + self.buffer_effects.extend(values.iter().copied()); } } +/// Adds two values in some saturating way. +/// +/// Since we support [f32], [i16] and [u16] we need some way of adding two values +/// without peaking above/below the edge values. This trait ensures that we can +/// use all three primitive types as a generic parameter. pub trait SaturatingAdd { + /// Adds two values in some saturating way. See trait documentation. fn saturating_add(self, rhs: Self) -> Self; } @@ -104,14 +155,20 @@ pub trait AudioOutputDevice { fn client_streams(&self) -> Arc<Mutex<ClientStream>>; } +/// The default audio output device, as determined by [cpal]. pub struct DefaultAudioOutputDevice { config: StreamConfig, stream: cpal::Stream, + /// The client stream per user ID. A separate stream is kept for UDP and TCP. + /// + /// Shared with [super::AudioOutput]. client_streams: Arc<Mutex<ClientStream>>, + /// Output volume configuration. volume_sender: watch::Sender<f32>, } impl DefaultAudioOutputDevice { + /// Initializes the default audio output. pub fn new( output_volume: f32, user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, @@ -148,7 +205,7 @@ impl DefaultAudioOutputDevice { let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( &output_config, - curry_callback::<f32>( + callback::<f32>( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -157,7 +214,7 @@ impl DefaultAudioOutputDevice { ), SampleFormat::I16 => output_device.build_output_stream( &output_config, - curry_callback::<i16>( + callback::<i16>( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -166,7 +223,7 @@ impl DefaultAudioOutputDevice { ), SampleFormat::U16 => output_device.build_output_stream( &output_config, - curry_callback::<u16>( + callback::<u16>( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -189,13 +246,13 @@ impl AudioOutputDevice for DefaultAudioOutputDevice { fn play(&self) -> Result<(), AudioError> { self.stream .play() - .map_err(|e| AudioError::OutputPlayError(e)) + .map_err(AudioError::OutputPlayError) } fn pause(&self) -> Result<(), AudioError> { self.stream .pause() - .map_err(|e| AudioError::OutputPauseError(e)) + .map_err(AudioError::OutputPauseError) } fn set_volume(&self, volume: f32) { @@ -211,7 +268,9 @@ impl AudioOutputDevice for DefaultAudioOutputDevice { } } -pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>( +/// Returns a function that fills a buffer with audio from client streams +/// modified according to some audio configuration. +pub fn callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>( user_bufs: Arc<Mutex<ClientStream>>, output_volume_receiver: watch::Receiver<f32>, user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, @@ -227,10 +286,10 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display> let user_volumes = user_volumes.lock().unwrap(); for (k, v) in user_bufs.buffer_clients.iter_mut() { let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false)); - for sample in data.iter_mut() { - if !muted { + if !muted { + for (sample, val) in data.iter_mut().zip(v.buf.drain().chain(iter::repeat(0.0))) { *sample = sample.saturating_add(Sample::from( - &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume), + &(val * volume * user_volume), )); } } @@ -242,3 +301,14 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display> } } } + +impl Debug for DefaultAudioOutputDevice { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DefaultAudioInputDevice") + .field("client_streams", &self.client_streams) + .field("config", &self.config) + .field("volume_sender", &self.volume_sender) + .field("stream", &"cpal::Stream") + .finish() + } +} |
