aboutsummaryrefslogtreecommitdiffstats
path: root/mumd
diff options
context:
space:
mode:
Diffstat (limited to 'mumd')
-rw-r--r--mumd/Cargo.toml1
-rw-r--r--mumd/src/audio.rs29
-rw-r--r--mumd/src/audio/input.rs19
-rw-r--r--mumd/src/audio/output.rs117
-rw-r--r--mumd/src/audio/transformers.rs6
-rw-r--r--mumd/src/state/channel.rs13
6 files changed, 141 insertions, 44 deletions
diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml
index f6474b7..28ff2ce 100644
--- a/mumd/Cargo.toml
+++ b/mumd/Cargo.toml
@@ -23,6 +23,7 @@ cpal = "0.13"
bytes = "1.0"
dasp_interpolate = { version = "0.11", features = ["linear"] }
dasp_signal = "0.11"
+dasp_ring_buffer = "0.11"
futures-util = { version = "0.3", features = ["sink"]}
futures-channel = "0.3"
hound = "3.4"
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index fa22188..6860741 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -1,3 +1,7 @@
+//! All things audio.
+//!
+//! Audio is handled mostly as signals from [dasp_signal]. Input/output is handled by [cpal].
+
pub mod input;
pub mod output;
pub mod transformers;
@@ -27,8 +31,11 @@ use strum::IntoEnumIterator;
use strum_macros::EnumIter;
use tokio::sync::watch;
+/// The sample rate used internally.
const SAMPLE_RATE: u32 = 48000;
+/// All types of notifications that can be shown. Each notification can be bound to its own audio
+/// file.
#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, EnumIter)]
pub enum NotificationEvents {
ServerConnect,
@@ -65,9 +72,12 @@ impl TryFrom<&str> for NotificationEvents {
}
}
+/// Input audio state. Input audio is picket up from an [AudioInputDevice] (e.g.
+/// a microphone) and sent over the network.
pub struct AudioInput {
device: DefaultAudioInputDevice,
+ /// Outgoing voice packets that should be sent over the network.
channel_receiver:
Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>,
}
@@ -122,12 +132,20 @@ impl Debug for AudioInput {
}
#[derive(Debug)]
+/// Audio output state. The audio is received from each client over the network,
+/// decoded, merged and finally played to an [AudioOutputDevice] (e.g. speaker,
+/// headphones, ...).
pub struct AudioOutput {
device: DefaultAudioOutputDevice,
+ /// The volume and mute-status of a user ID.
user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
+ /// The client stream per user ID. A separate stream is kept for UDP and TCP.
+ ///
+ /// Shared with [DefaultAudioOutputDevice].
client_streams: Arc<Mutex<ClientStream>>,
+ /// Which sound effect should be played on an event.
sounds: HashMap<NotificationEvents, Vec<f32>>,
}
@@ -150,6 +168,7 @@ impl AudioOutput {
Ok(res)
}
+ /// Loads sound effects, getting unspecified effects from [get_default_sfx].
pub fn load_sound_effects(&mut self, sound_effects: &[SoundEffect]) {
let overrides: HashMap<_, _> = sound_effects
.iter()
@@ -163,6 +182,7 @@ impl AudioOutput {
})
.collect();
+ // This makes sure that every [NotificationEvent] is present in [self.sounds].
self.sounds = NotificationEvents::iter()
.map(|event| {
let bytes = overrides
@@ -205,6 +225,7 @@ impl AudioOutput {
.collect();
}
+ /// Decodes a voice packet.
pub fn decode_packet_payload(
&self,
stream_type: VoiceStreamType,
@@ -217,10 +238,12 @@ impl AudioOutput {
.decode_packet((stream_type, session_id), payload);
}
+ /// Sets the volume of the output device.
pub fn set_volume(&self, output_volume: f32) {
self.device.set_volume(output_volume);
}
+ /// Sets the incoming volume of a user.
pub fn set_user_volume(&self, id: u32, volume: f32) {
match self.user_volumes.lock().unwrap().entry(id) {
Entry::Occupied(mut entry) => {
@@ -232,6 +255,7 @@ impl AudioOutput {
}
}
+ /// Mutes another user.
pub fn set_mute(&self, id: u32, mute: bool) {
match self.user_volumes.lock().unwrap().entry(id) {
Entry::Occupied(mut entry) => {
@@ -243,12 +267,14 @@ impl AudioOutput {
}
}
+ /// Queues a sound effect.
pub fn play_effect(&self, effect: NotificationEvents) {
let samples = self.sounds.get(&effect).unwrap();
- self.client_streams.lock().unwrap().extend(None, samples);
+ self.client_streams.lock().unwrap().add_sound_effect(samples);
}
}
+/// Reads a sound effect from disk.
// moo
fn get_sfx(file: &str) -> Cow<'static, [u8]> {
let mut buf: Vec<u8> = Vec::new();
@@ -261,6 +287,7 @@ fn get_sfx(file: &str) -> Cow<'static, [u8]> {
}
}
+/// Gets the default sound effect.
fn get_default_sfx() -> Cow<'static, [u8]> {
Cow::from(include_bytes!("fallback_sfx.wav").as_ref())
}
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
index 88efa62..4dfc465 100644
--- a/mumd/src/audio/input.rs
+++ b/mumd/src/audio/input.rs
@@ -1,3 +1,4 @@
+//! Listens to the microphone and sends it to the networking.
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
use log::*;
@@ -9,6 +10,7 @@ use crate::audio::transformers::{NoiseGate, Transformer};
use crate::error::{AudioError, AudioStream};
use crate::state::StatePhase;
+/// Generates a callback that receives [Sample]s and sends them as floats to a [futures_channel::mpsc::Sender].
pub fn callback<T: Sample>(
mut input_sender: futures_channel::mpsc::Sender<Vec<u8>>,
mut transformers: Vec<Box<dyn Transformer + Send + 'static>>,
@@ -30,8 +32,8 @@ pub fn callback<T: Sample>(
buffer.extend(data.by_ref().take(buffer_size - buffer.len()));
let encoded = transformers
.iter_mut()
- .try_fold(&mut buffer[..], |acc, e| e.transform(acc))
- .map(|buf| opus_encoder.encode_vec_float(&*buf, buffer_size).unwrap());
+ .try_fold((opus::Channels::Mono, &mut buffer[..]), |acc, e| e.transform(acc))
+ .map(|buf| opus_encoder.encode_vec_float(&*buf.1, buffer_size).unwrap());
if let Some(encoded) = encoded {
if let Err(e) = input_sender.try_send(encoded) {
@@ -44,11 +46,19 @@ pub fn callback<T: Sample>(
}
}
+/// Something that can listen to audio and send it somewhere.
+///
+/// One sample is assumed to be an encoded opus frame. See [opus::Encoder].
pub trait AudioInputDevice {
+ /// Starts the device.
fn play(&self) -> Result<(), AudioError>;
+ /// Stops the device.
fn pause(&self) -> Result<(), AudioError>;
+ /// Sets the input volume of the device.
fn set_volume(&self, volume: f32);
+ /// Returns a receiver to this device's values.
fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>>;
+ /// The amount of channels this device has.
fn num_channels(&self) -> usize;
}
@@ -60,6 +70,7 @@ pub struct DefaultAudioInputDevice {
}
impl DefaultAudioInputDevice {
+ /// Initializes the default audio input.
pub fn new(
input_volume: f32,
phase_watcher: watch::Receiver<StatePhase>,
@@ -163,17 +174,21 @@ impl AudioInputDevice for DefaultAudioInputDevice {
.play()
.map_err(AudioError::InputPlayError)
}
+
fn pause(&self) -> Result<(), AudioError> {
self.stream
.pause()
.map_err(AudioError::InputPauseError)
}
+
fn set_volume(&self, volume: f32) {
self.volume_sender.send(volume).unwrap();
}
+
fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>> {
self.sample_receiver.take()
}
+
fn num_channels(&self) -> usize {
self.channels as usize
}
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs
index 7487af2..6cec6fc 100644
--- a/mumd/src/audio/output.rs
+++ b/mumd/src/audio/output.rs
@@ -1,25 +1,79 @@
+//! Receives audio packets from the networking and plays them.
+
use crate::audio::SAMPLE_RATE;
use crate::error::{AudioError, AudioStream};
use crate::network::VoiceStreamType;
+use bytes::Bytes;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
+use dasp_ring_buffer::Bounded;
use log::*;
use mumble_protocol::voice::VoicePacketPayload;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
+use std::iter;
use std::ops::AddAssign;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
type ClientStreamKey = (VoiceStreamType, u32);
+/// State for decoding audio received from another user.
+#[derive(Debug)]
+pub struct ClientAudioData {
+ buf: Bounded<Vec<f32>>,
+ output_channels: opus::Channels,
+ // We need both since a client can hypothetically send both mono
+ // and stereo packets, and we can't switch a decoder on the fly
+ // to reuse it.
+ mono_decoder: opus::Decoder,
+ stereo_decoder: opus::Decoder,
+}
+
+impl ClientAudioData {
+ pub fn new(sample_rate: u32, output_channels: opus::Channels) -> Self {
+ Self {
+ mono_decoder: opus::Decoder::new(sample_rate, opus::Channels::Mono).unwrap(),
+ stereo_decoder: opus::Decoder::new(sample_rate, opus::Channels::Stereo).unwrap(),
+ output_channels,
+ buf: Bounded::from_full(vec![0.0; sample_rate as usize * output_channels as usize]), //buffer 1 s of audio
+ }
+ }
+
+ pub fn store_packet(&mut self, bytes: Bytes) {
+ let packet_channels = opus::packet::get_nb_channels(&bytes).unwrap();
+ let (decoder, channels) = match packet_channels {
+ opus::Channels::Mono => (&mut self.mono_decoder, 1),
+ opus::Channels::Stereo => (&mut self.stereo_decoder, 2),
+ };
+ let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode
+ let parsed = decoder
+ .decode_float(&bytes, &mut out, false)
+ .expect("Error decoding");
+ out.truncate(parsed);
+ match (packet_channels, self.output_channels) {
+ (opus::Channels::Mono, opus::Channels::Mono) | (opus::Channels::Stereo, opus::Channels::Stereo) => for sample in out {
+ self.buf.push(sample);
+ },
+ (opus::Channels::Mono, opus::Channels::Stereo) => for sample in out {
+ self.buf.push(sample);
+ self.buf.push(sample);
+ },
+ (opus::Channels::Stereo, opus::Channels::Mono) => for sample in out.into_iter().step_by(2) {
+ self.buf.push(sample);
+ },
+ }
+ }
+}
+
+/// Collected state for client opus decoders and sound effects.
#[derive(Debug)]
pub struct ClientStream {
- buffer_clients: HashMap<ClientStreamKey, (VecDeque<f32>, opus::Decoder)>, //TODO ring buffer?
+ buffer_clients: HashMap<ClientStreamKey, ClientAudioData>,
buffer_effects: VecDeque<f32>,
sample_rate: u32,
- channels: opus::Channels,
+ output_channels: opus::Channels,
}
impl ClientStream {
@@ -33,29 +87,21 @@ impl ClientStream {
buffer_clients: HashMap::new(),
buffer_effects: VecDeque::new(),
sample_rate,
- channels,
+ output_channels: channels,
}
}
- fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque<f32>, opus::Decoder) {
- let sample_rate = self.sample_rate;
- let channels = self.channels;
- self.buffer_clients.entry(client).or_insert_with(|| {
- let opus_decoder = opus::Decoder::new(sample_rate, channels).unwrap();
- (VecDeque::new(), opus_decoder)
- })
+ fn get_client(&mut self, client: ClientStreamKey) -> &mut ClientAudioData {
+ self.buffer_clients.entry(client).or_insert(
+ ClientAudioData::new(self.sample_rate, self.output_channels)
+ )
}
+ /// Decodes a voice packet.
pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) {
match payload {
VoicePacketPayload::Opus(bytes, _eot) => {
- let mut out: Vec<f32> = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode
- let (buffer, decoder) = self.get_client(client);
- let parsed = decoder
- .decode_float(&bytes, &mut out, false)
- .expect("Error decoding");
- out.truncate(parsed);
- buffer.extend(&out);
+ self.get_client(client).store_packet(bytes);
}
_ => {
unimplemented!("Payload type not supported");
@@ -63,16 +109,19 @@ impl ClientStream {
}
}
- pub fn extend(&mut self, client: Option<ClientStreamKey>, values: &[f32]) {
- let buffer = match client {
- Some(x) => &mut self.get_client(x).0,
- None => &mut self.buffer_effects,
- };
- buffer.extend(values.iter().copied());
+ /// Extends the sound effect buffer queue with some received values.
+ pub fn add_sound_effect(&mut self, values: &[f32]) {
+ self.buffer_effects.extend(values.iter().copied());
}
}
+/// Adds two values in some saturating way.
+///
+/// Since we support [f32], [i16] and [u16] we need some way of adding two values
+/// without peaking above/below the edge values. This trait ensures that we can
+/// use all three primitive types as a generic parameter.
pub trait SaturatingAdd {
+ /// Adds two values in some saturating way. See trait documentation.
fn saturating_add(self, rhs: Self) -> Self;
}
@@ -106,14 +155,20 @@ pub trait AudioOutputDevice {
fn client_streams(&self) -> Arc<Mutex<ClientStream>>;
}
+/// The default audio output device, as determined by [cpal].
pub struct DefaultAudioOutputDevice {
config: StreamConfig,
stream: cpal::Stream,
+ /// The client stream per user ID. A separate stream is kept for UDP and TCP.
+ ///
+ /// Shared with [super::AudioOutput].
client_streams: Arc<Mutex<ClientStream>>,
+ /// Output volume configuration.
volume_sender: watch::Sender<f32>,
}
impl DefaultAudioOutputDevice {
+ /// Initializes the default audio output.
pub fn new(
output_volume: f32,
user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
@@ -150,7 +205,7 @@ impl DefaultAudioOutputDevice {
let output_stream = match output_supported_sample_format {
SampleFormat::F32 => output_device.build_output_stream(
&output_config,
- curry_callback::<f32>(
+ callback::<f32>(
Arc::clone(&client_streams),
output_volume_receiver,
user_volumes,
@@ -159,7 +214,7 @@ impl DefaultAudioOutputDevice {
),
SampleFormat::I16 => output_device.build_output_stream(
&output_config,
- curry_callback::<i16>(
+ callback::<i16>(
Arc::clone(&client_streams),
output_volume_receiver,
user_volumes,
@@ -168,7 +223,7 @@ impl DefaultAudioOutputDevice {
),
SampleFormat::U16 => output_device.build_output_stream(
&output_config,
- curry_callback::<u16>(
+ callback::<u16>(
Arc::clone(&client_streams),
output_volume_receiver,
user_volumes,
@@ -213,7 +268,9 @@ impl AudioOutputDevice for DefaultAudioOutputDevice {
}
}
-pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>(
+/// Returns a function that fills a buffer with audio from client streams
+/// modified according to some audio configuration.
+pub fn callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>(
user_bufs: Arc<Mutex<ClientStream>>,
output_volume_receiver: watch::Receiver<f32>,
user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
@@ -229,10 +286,10 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>
let user_volumes = user_volumes.lock().unwrap();
for (k, v) in user_bufs.buffer_clients.iter_mut() {
let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false));
- for sample in data.iter_mut() {
- if !muted {
+ if !muted {
+ for (sample, val) in data.iter_mut().zip(v.buf.drain().chain(iter::repeat(0.0))) {
*sample = sample.saturating_add(Sample::from(
- &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume),
+ &(val * volume * user_volume),
));
}
}
diff --git a/mumd/src/audio/transformers.rs b/mumd/src/audio/transformers.rs
index 74d751a..21a71b5 100644
--- a/mumd/src/audio/transformers.rs
+++ b/mumd/src/audio/transformers.rs
@@ -2,7 +2,7 @@
pub trait Transformer {
/// Do the transform. Returning `None` is interpreted as "the buffer is unwanted".
/// The implementor is free to modify the buffer however it wants to.
- fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]>;
+ fn transform<'a>(&mut self, buf: (opus::Channels, &'a mut [f32])) -> Option<(opus::Channels, &'a mut [f32])>;
}
/// A struct representing a noise gate transform.
@@ -26,7 +26,7 @@ impl NoiseGate {
}
impl Transformer for NoiseGate {
- fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]> {
+ fn transform<'a>(&mut self, (channels, buf): (opus::Channels, &'a mut [f32])) -> Option<(opus::Channels, &'a mut [f32])> {
const MUTE_PERCENTAGE: f32 = 0.1;
let max = buf.iter().map(|e| e.abs()).max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap();
@@ -44,7 +44,7 @@ impl Transformer for NoiseGate {
if self.open == 0 {
None
} else {
- Some(buf)
+ Some((channels, buf))
}
}
}
diff --git a/mumd/src/state/channel.rs b/mumd/src/state/channel.rs
index 6995e1e..2ed05c5 100644
--- a/mumd/src/state/channel.rs
+++ b/mumd/src/state/channel.rs
@@ -169,13 +169,10 @@ pub fn into_channel(
impl From<&Channel> for mumlib::state::Channel {
fn from(channel: &Channel) -> Self {
- mumlib::state::Channel {
- description: channel.description.clone(),
- links: Vec::new(),
- max_users: channel.max_users,
- name: channel.name.clone(),
- children: Vec::new(),
- users: Vec::new(),
- }
+ mumlib::state::Channel::new(
+ channel.name.clone(),
+ channel.description.clone(),
+ channel.max_users,
+ )
}
}