diff options
Diffstat (limited to 'mumd/src')
| -rw-r--r-- | mumd/src/audio.rs | 29 | ||||
| -rw-r--r-- | mumd/src/audio/input.rs | 19 | ||||
| -rw-r--r-- | mumd/src/audio/output.rs | 117 | ||||
| -rw-r--r-- | mumd/src/audio/transformers.rs | 6 | ||||
| -rw-r--r-- | mumd/src/state/channel.rs | 13 |
5 files changed, 140 insertions, 44 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index fa22188..6860741 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -1,3 +1,7 @@ +//! All things audio. +//! +//! Audio is handled mostly as signals from [dasp_signal]. Input/output is handled by [cpal]. + pub mod input; pub mod output; pub mod transformers; @@ -27,8 +31,11 @@ use strum::IntoEnumIterator; use strum_macros::EnumIter; use tokio::sync::watch; +/// The sample rate used internally. const SAMPLE_RATE: u32 = 48000; +/// All types of notifications that can be shown. Each notification can be bound to its own audio +/// file. #[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, EnumIter)] pub enum NotificationEvents { ServerConnect, @@ -65,9 +72,12 @@ impl TryFrom<&str> for NotificationEvents { } } +/// Input audio state. Input audio is picket up from an [AudioInputDevice] (e.g. +/// a microphone) and sent over the network. pub struct AudioInput { device: DefaultAudioInputDevice, + /// Outgoing voice packets that should be sent over the network. channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>, } @@ -122,12 +132,20 @@ impl Debug for AudioInput { } #[derive(Debug)] +/// Audio output state. The audio is received from each client over the network, +/// decoded, merged and finally played to an [AudioOutputDevice] (e.g. speaker, +/// headphones, ...). pub struct AudioOutput { device: DefaultAudioOutputDevice, + /// The volume and mute-status of a user ID. user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, + /// The client stream per user ID. A separate stream is kept for UDP and TCP. + /// + /// Shared with [DefaultAudioOutputDevice]. client_streams: Arc<Mutex<ClientStream>>, + /// Which sound effect should be played on an event. sounds: HashMap<NotificationEvents, Vec<f32>>, } @@ -150,6 +168,7 @@ impl AudioOutput { Ok(res) } + /// Loads sound effects, getting unspecified effects from [get_default_sfx]. pub fn load_sound_effects(&mut self, sound_effects: &[SoundEffect]) { let overrides: HashMap<_, _> = sound_effects .iter() @@ -163,6 +182,7 @@ impl AudioOutput { }) .collect(); + // This makes sure that every [NotificationEvent] is present in [self.sounds]. self.sounds = NotificationEvents::iter() .map(|event| { let bytes = overrides @@ -205,6 +225,7 @@ impl AudioOutput { .collect(); } + /// Decodes a voice packet. pub fn decode_packet_payload( &self, stream_type: VoiceStreamType, @@ -217,10 +238,12 @@ impl AudioOutput { .decode_packet((stream_type, session_id), payload); } + /// Sets the volume of the output device. pub fn set_volume(&self, output_volume: f32) { self.device.set_volume(output_volume); } + /// Sets the incoming volume of a user. pub fn set_user_volume(&self, id: u32, volume: f32) { match self.user_volumes.lock().unwrap().entry(id) { Entry::Occupied(mut entry) => { @@ -232,6 +255,7 @@ impl AudioOutput { } } + /// Mutes another user. pub fn set_mute(&self, id: u32, mute: bool) { match self.user_volumes.lock().unwrap().entry(id) { Entry::Occupied(mut entry) => { @@ -243,12 +267,14 @@ impl AudioOutput { } } + /// Queues a sound effect. pub fn play_effect(&self, effect: NotificationEvents) { let samples = self.sounds.get(&effect).unwrap(); - self.client_streams.lock().unwrap().extend(None, samples); + self.client_streams.lock().unwrap().add_sound_effect(samples); } } +/// Reads a sound effect from disk. // moo fn get_sfx(file: &str) -> Cow<'static, [u8]> { let mut buf: Vec<u8> = Vec::new(); @@ -261,6 +287,7 @@ fn get_sfx(file: &str) -> Cow<'static, [u8]> { } } +/// Gets the default sound effect. fn get_default_sfx() -> Cow<'static, [u8]> { Cow::from(include_bytes!("fallback_sfx.wav").as_ref()) } diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 88efa62..4dfc465 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -1,3 +1,4 @@ +//! Listens to the microphone and sends it to the networking. use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; use log::*; @@ -9,6 +10,7 @@ use crate::audio::transformers::{NoiseGate, Transformer}; use crate::error::{AudioError, AudioStream}; use crate::state::StatePhase; +/// Generates a callback that receives [Sample]s and sends them as floats to a [futures_channel::mpsc::Sender]. pub fn callback<T: Sample>( mut input_sender: futures_channel::mpsc::Sender<Vec<u8>>, mut transformers: Vec<Box<dyn Transformer + Send + 'static>>, @@ -30,8 +32,8 @@ pub fn callback<T: Sample>( buffer.extend(data.by_ref().take(buffer_size - buffer.len())); let encoded = transformers .iter_mut() - .try_fold(&mut buffer[..], |acc, e| e.transform(acc)) - .map(|buf| opus_encoder.encode_vec_float(&*buf, buffer_size).unwrap()); + .try_fold((opus::Channels::Mono, &mut buffer[..]), |acc, e| e.transform(acc)) + .map(|buf| opus_encoder.encode_vec_float(&*buf.1, buffer_size).unwrap()); if let Some(encoded) = encoded { if let Err(e) = input_sender.try_send(encoded) { @@ -44,11 +46,19 @@ pub fn callback<T: Sample>( } } +/// Something that can listen to audio and send it somewhere. +/// +/// One sample is assumed to be an encoded opus frame. See [opus::Encoder]. pub trait AudioInputDevice { + /// Starts the device. fn play(&self) -> Result<(), AudioError>; + /// Stops the device. fn pause(&self) -> Result<(), AudioError>; + /// Sets the input volume of the device. fn set_volume(&self, volume: f32); + /// Returns a receiver to this device's values. fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>>; + /// The amount of channels this device has. fn num_channels(&self) -> usize; } @@ -60,6 +70,7 @@ pub struct DefaultAudioInputDevice { } impl DefaultAudioInputDevice { + /// Initializes the default audio input. pub fn new( input_volume: f32, phase_watcher: watch::Receiver<StatePhase>, @@ -163,17 +174,21 @@ impl AudioInputDevice for DefaultAudioInputDevice { .play() .map_err(AudioError::InputPlayError) } + fn pause(&self) -> Result<(), AudioError> { self.stream .pause() .map_err(AudioError::InputPauseError) } + fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); } + fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>> { self.sample_receiver.take() } + fn num_channels(&self) -> usize { self.channels as usize } diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 7487af2..6cec6fc 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,25 +1,79 @@ +//! Receives audio packets from the networking and plays them. + use crate::audio::SAMPLE_RATE; use crate::error::{AudioError, AudioStream}; use crate::network::VoiceStreamType; +use bytes::Bytes; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; +use dasp_ring_buffer::Bounded; use log::*; use mumble_protocol::voice::VoicePacketPayload; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; +use std::iter; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; type ClientStreamKey = (VoiceStreamType, u32); +/// State for decoding audio received from another user. +#[derive(Debug)] +pub struct ClientAudioData { + buf: Bounded<Vec<f32>>, + output_channels: opus::Channels, + // We need both since a client can hypothetically send both mono + // and stereo packets, and we can't switch a decoder on the fly + // to reuse it. + mono_decoder: opus::Decoder, + stereo_decoder: opus::Decoder, +} + +impl ClientAudioData { + pub fn new(sample_rate: u32, output_channels: opus::Channels) -> Self { + Self { + mono_decoder: opus::Decoder::new(sample_rate, opus::Channels::Mono).unwrap(), + stereo_decoder: opus::Decoder::new(sample_rate, opus::Channels::Stereo).unwrap(), + output_channels, + buf: Bounded::from_full(vec![0.0; sample_rate as usize * output_channels as usize]), //buffer 1 s of audio + } + } + + pub fn store_packet(&mut self, bytes: Bytes) { + let packet_channels = opus::packet::get_nb_channels(&bytes).unwrap(); + let (decoder, channels) = match packet_channels { + opus::Channels::Mono => (&mut self.mono_decoder, 1), + opus::Channels::Stereo => (&mut self.stereo_decoder, 2), + }; + let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode + let parsed = decoder + .decode_float(&bytes, &mut out, false) + .expect("Error decoding"); + out.truncate(parsed); + match (packet_channels, self.output_channels) { + (opus::Channels::Mono, opus::Channels::Mono) | (opus::Channels::Stereo, opus::Channels::Stereo) => for sample in out { + self.buf.push(sample); + }, + (opus::Channels::Mono, opus::Channels::Stereo) => for sample in out { + self.buf.push(sample); + self.buf.push(sample); + }, + (opus::Channels::Stereo, opus::Channels::Mono) => for sample in out.into_iter().step_by(2) { + self.buf.push(sample); + }, + } + } +} + +/// Collected state for client opus decoders and sound effects. #[derive(Debug)] pub struct ClientStream { - buffer_clients: HashMap<ClientStreamKey, (VecDeque<f32>, opus::Decoder)>, //TODO ring buffer? + buffer_clients: HashMap<ClientStreamKey, ClientAudioData>, buffer_effects: VecDeque<f32>, sample_rate: u32, - channels: opus::Channels, + output_channels: opus::Channels, } impl ClientStream { @@ -33,29 +87,21 @@ impl ClientStream { buffer_clients: HashMap::new(), buffer_effects: VecDeque::new(), sample_rate, - channels, + output_channels: channels, } } - fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque<f32>, opus::Decoder) { - let sample_rate = self.sample_rate; - let channels = self.channels; - self.buffer_clients.entry(client).or_insert_with(|| { - let opus_decoder = opus::Decoder::new(sample_rate, channels).unwrap(); - (VecDeque::new(), opus_decoder) - }) + fn get_client(&mut self, client: ClientStreamKey) -> &mut ClientAudioData { + self.buffer_clients.entry(client).or_insert( + ClientAudioData::new(self.sample_rate, self.output_channels) + ) } + /// Decodes a voice packet. pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec<f32> = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode - let (buffer, decoder) = self.get_client(client); - let parsed = decoder - .decode_float(&bytes, &mut out, false) - .expect("Error decoding"); - out.truncate(parsed); - buffer.extend(&out); + self.get_client(client).store_packet(bytes); } _ => { unimplemented!("Payload type not supported"); @@ -63,16 +109,19 @@ impl ClientStream { } } - pub fn extend(&mut self, client: Option<ClientStreamKey>, values: &[f32]) { - let buffer = match client { - Some(x) => &mut self.get_client(x).0, - None => &mut self.buffer_effects, - }; - buffer.extend(values.iter().copied()); + /// Extends the sound effect buffer queue with some received values. + pub fn add_sound_effect(&mut self, values: &[f32]) { + self.buffer_effects.extend(values.iter().copied()); } } +/// Adds two values in some saturating way. +/// +/// Since we support [f32], [i16] and [u16] we need some way of adding two values +/// without peaking above/below the edge values. This trait ensures that we can +/// use all three primitive types as a generic parameter. pub trait SaturatingAdd { + /// Adds two values in some saturating way. See trait documentation. fn saturating_add(self, rhs: Self) -> Self; } @@ -106,14 +155,20 @@ pub trait AudioOutputDevice { fn client_streams(&self) -> Arc<Mutex<ClientStream>>; } +/// The default audio output device, as determined by [cpal]. pub struct DefaultAudioOutputDevice { config: StreamConfig, stream: cpal::Stream, + /// The client stream per user ID. A separate stream is kept for UDP and TCP. + /// + /// Shared with [super::AudioOutput]. client_streams: Arc<Mutex<ClientStream>>, + /// Output volume configuration. volume_sender: watch::Sender<f32>, } impl DefaultAudioOutputDevice { + /// Initializes the default audio output. pub fn new( output_volume: f32, user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, @@ -150,7 +205,7 @@ impl DefaultAudioOutputDevice { let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( &output_config, - curry_callback::<f32>( + callback::<f32>( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -159,7 +214,7 @@ impl DefaultAudioOutputDevice { ), SampleFormat::I16 => output_device.build_output_stream( &output_config, - curry_callback::<i16>( + callback::<i16>( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -168,7 +223,7 @@ impl DefaultAudioOutputDevice { ), SampleFormat::U16 => output_device.build_output_stream( &output_config, - curry_callback::<u16>( + callback::<u16>( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -213,7 +268,9 @@ impl AudioOutputDevice for DefaultAudioOutputDevice { } } -pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>( +/// Returns a function that fills a buffer with audio from client streams +/// modified according to some audio configuration. +pub fn callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>( user_bufs: Arc<Mutex<ClientStream>>, output_volume_receiver: watch::Receiver<f32>, user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, @@ -229,10 +286,10 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display> let user_volumes = user_volumes.lock().unwrap(); for (k, v) in user_bufs.buffer_clients.iter_mut() { let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false)); - for sample in data.iter_mut() { - if !muted { + if !muted { + for (sample, val) in data.iter_mut().zip(v.buf.drain().chain(iter::repeat(0.0))) { *sample = sample.saturating_add(Sample::from( - &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume), + &(val * volume * user_volume), )); } } diff --git a/mumd/src/audio/transformers.rs b/mumd/src/audio/transformers.rs index 74d751a..21a71b5 100644 --- a/mumd/src/audio/transformers.rs +++ b/mumd/src/audio/transformers.rs @@ -2,7 +2,7 @@ pub trait Transformer { /// Do the transform. Returning `None` is interpreted as "the buffer is unwanted". /// The implementor is free to modify the buffer however it wants to. - fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]>; + fn transform<'a>(&mut self, buf: (opus::Channels, &'a mut [f32])) -> Option<(opus::Channels, &'a mut [f32])>; } /// A struct representing a noise gate transform. @@ -26,7 +26,7 @@ impl NoiseGate { } impl Transformer for NoiseGate { - fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]> { + fn transform<'a>(&mut self, (channels, buf): (opus::Channels, &'a mut [f32])) -> Option<(opus::Channels, &'a mut [f32])> { const MUTE_PERCENTAGE: f32 = 0.1; let max = buf.iter().map(|e| e.abs()).max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(); @@ -44,7 +44,7 @@ impl Transformer for NoiseGate { if self.open == 0 { None } else { - Some(buf) + Some((channels, buf)) } } } diff --git a/mumd/src/state/channel.rs b/mumd/src/state/channel.rs index 6995e1e..2ed05c5 100644 --- a/mumd/src/state/channel.rs +++ b/mumd/src/state/channel.rs @@ -169,13 +169,10 @@ pub fn into_channel( impl From<&Channel> for mumlib::state::Channel { fn from(channel: &Channel) -> Self { - mumlib::state::Channel { - description: channel.description.clone(), - links: Vec::new(), - max_users: channel.max_users, - name: channel.name.clone(), - children: Vec::new(), - users: Vec::new(), - } + mumlib::state::Channel::new( + channel.name.clone(), + channel.description.clone(), + channel.max_users, + ) } } |
