aboutsummaryrefslogtreecommitdiffstats
path: root/mumd
diff options
context:
space:
mode:
Diffstat (limited to 'mumd')
-rw-r--r--mumd/src/audio.rs71
-rw-r--r--mumd/src/audio/output.rs108
2 files changed, 96 insertions, 83 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index 5a839bc..814050b 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -3,7 +3,7 @@ pub mod output;
mod noise_gate;
use crate::audio::input::{DefaultAudioInputDevice, AudioInputDevice};
-use crate::audio::output::SaturatingAdd;
+use crate::audio::output::ClientStream;
use crate::audio::noise_gate::{from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt};
use crate::error::{AudioError, AudioStream};
use crate::network::VoiceStreamType;
@@ -20,7 +20,7 @@ use mumble_protocol::Serverbound;
use mumble_protocol::voice::{VoicePacketPayload, VoicePacket};
use mumlib::config::SoundEffect;
use std::borrow::Cow;
-use std::collections::{hash_map::Entry, HashMap, VecDeque};
+use std::collections::{hash_map::Entry, HashMap};
use std::convert::TryFrom;
use std::fmt::Debug;
use std::fs::File;
@@ -126,10 +126,9 @@ pub struct AudioOutput {
user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
- client_streams: Arc<Mutex<HashMap<(VoiceStreamType, u32), output::ClientStream>>>,
+ client_streams: Arc<Mutex<ClientStream>>,
sounds: HashMap<NotificationEvents, Vec<f32>>,
- play_sounds: Arc<Mutex<VecDeque<f32>>>,
}
impl AudioOutput {
@@ -159,14 +158,12 @@ impl AudioOutput {
let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new()));
let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume);
- let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new()));
- let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new()));
+ let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(sample_rate.0, output_config.channels)));
let output_stream = match output_supported_sample_format {
SampleFormat::F32 => output_device.build_output_stream(
&output_config,
output::curry_callback::<f32>(
- Arc::clone(&play_sounds),
Arc::clone(&client_streams),
output_volume_receiver,
Arc::clone(&user_volumes),
@@ -176,7 +173,6 @@ impl AudioOutput {
SampleFormat::I16 => output_device.build_output_stream(
&output_config,
output::curry_callback::<i16>(
- Arc::clone(&play_sounds),
Arc::clone(&client_streams),
output_volume_receiver,
Arc::clone(&user_volumes),
@@ -186,7 +182,6 @@ impl AudioOutput {
SampleFormat::U16 => output_device.build_output_stream(
&output_config,
output::curry_callback::<u16>(
- Arc::clone(&play_sounds),
Arc::clone(&client_streams),
output_volume_receiver,
Arc::clone(&user_volumes),
@@ -204,7 +199,6 @@ impl AudioOutput {
sounds: HashMap::new(),
volume_sender: output_volume_sender,
user_volumes,
- play_sounds,
};
res.load_sound_effects(&[]);
Ok(res)
@@ -265,52 +259,23 @@ impl AudioOutput {
}
pub fn decode_packet_payload(&self, stream_type: VoiceStreamType, session_id: u32, payload: VoicePacketPayload) {
- match self.client_streams.lock().unwrap().entry((stream_type, session_id)) {
- Entry::Occupied(mut entry) => {
- entry
- .get_mut()
- .decode_packet(payload, self.config.channels as usize);
- }
- Entry::Vacant(_) => {
- warn!("Can't find session id {}", session_id);
- }
- }
+ self.client_streams.lock().unwrap().decode_packet(
+ Some((stream_type, session_id)),
+ payload,
+ self.config.channels as usize,
+ );
}
pub fn add_client(&self, session_id: u32) {
- for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() {
- match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) {
- Entry::Occupied(_) => {
- warn!("Session id {} already exists", session_id);
- }
- Entry::Vacant(entry) => {
- entry.insert(output::ClientStream::new(
- self.config.sample_rate.0,
- self.config.channels,
- ));
- }
- }
- }
+ self.client_streams.lock().unwrap().add_client(session_id);
}
pub fn remove_client(&self, session_id: u32) {
- for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() {
- match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) {
- Entry::Occupied(entry) => {
- entry.remove();
- }
- Entry::Vacant(_) => {
- warn!(
- "Tried to remove session id {} that doesn't exist",
- session_id
- );
- }
- }
- }
+ self.client_streams.lock().unwrap().remove_client(session_id);
}
- pub fn clear_clients(&mut self) {
- self.client_streams.lock().unwrap().clear();
+ pub fn clear_clients(&self) {
+ self.client_streams.lock().unwrap().clear_clients();
}
pub fn set_volume(&self, output_volume: f32) {
@@ -341,15 +306,7 @@ impl AudioOutput {
pub fn play_effect(&self, effect: NotificationEvents) {
let samples = self.sounds.get(&effect).unwrap();
-
- let mut play_sounds = self.play_sounds.lock().unwrap();
-
- for (val, e) in play_sounds.iter_mut().zip(samples.iter()) {
- *val = val.saturating_add(*e);
- }
-
- let l = play_sounds.len();
- play_sounds.extend(samples.iter().skip(l));
+ self.client_streams.lock().unwrap().extend(None, samples);
}
}
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs
index 421d395..797cf84 100644
--- a/mumd/src/audio/output.rs
+++ b/mumd/src/audio/output.rs
@@ -1,22 +1,29 @@
use crate::network::VoiceStreamType;
+use log::*;
use cpal::{OutputCallbackInfo, Sample};
use mumble_protocol::voice::VoicePacketPayload;
use opus::Channels;
-use std::collections::{HashMap, VecDeque};
+use std::collections::{HashMap, VecDeque, hash_map::Entry};
use std::ops::AddAssign;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
+type ClientStreamKey = Option<(VoiceStreamType, u32)>;
+type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque<f32>>;
+
pub struct ClientStream {
- buffer: VecDeque<f32>, //TODO ring buffer?
+ buffer: HashMap<ClientStreamKey, VecDeque<f32>>, //TODO ring buffer?
opus_decoder: opus::Decoder,
}
impl ClientStream {
pub fn new(sample_rate: u32, channels: u16) -> Self {
+ let mut buffer = HashMap::new();
+ //None is the system audio effects
+ buffer.insert(None, VecDeque::new());
Self {
- buffer: VecDeque::new(),
+ buffer,
opus_decoder: opus::Decoder::new(
sample_rate,
match channels {
@@ -29,7 +36,7 @@ impl ClientStream {
}
}
- pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) {
+ pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) {
match payload {
VoicePacketPayload::Opus(bytes, _eot) => {
let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode
@@ -38,13 +45,67 @@ impl ClientStream {
.decode_float(&bytes, &mut out, false)
.expect("Error decoding");
out.truncate(parsed);
- self.buffer.extend(out);
+ self.extend(client, &out);
}
_ => {
unimplemented!("Payload type not supported");
}
}
}
+
+ pub fn consume<'a, F>(&mut self, consumer: F)
+ where
+ F: FnOnce(ConsumerInput) {
+ let iter = self.buffer.iter_mut();
+ consumer(iter);
+ }
+
+ pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) {
+ match self.buffer.entry(key) {
+ Entry::Occupied(mut entry) => {
+ entry.get_mut().extend(values.iter().copied());
+ }
+ Entry::Vacant(_) => {
+ match key {
+ None => warn!("Can't find session None"),
+ Some(key) => warn!("Can't find session id {}", key.1),
+ }
+ }
+ }
+ }
+
+ pub fn add_client(&mut self, session_id: u32) {
+ for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() {
+ match self.buffer.entry(Some((*stream_type, session_id))) {
+ Entry::Occupied(_) => {
+ warn!("Session id {} already exists", session_id);
+ }
+ Entry::Vacant(entry) => {
+ entry.insert(VecDeque::new());
+ }
+ }
+ }
+ }
+
+ pub fn remove_client(&mut self, session_id: u32) {
+ for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() {
+ match self.buffer.entry(Some((*stream_type, session_id))) {
+ Entry::Occupied(entry) => {
+ entry.remove();
+ }
+ Entry::Vacant(_) => {
+ warn!(
+ "Tried to remove session id {} that doesn't exist",
+ session_id
+ );
+ }
+ }
+ }
+ }
+
+ pub fn clear_clients(&mut self) {
+ self.buffer.retain(|k , _| k.is_none());
+ }
}
pub trait SaturatingAdd {
@@ -74,8 +135,7 @@ impl SaturatingAdd for u16 {
}
pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>(
- effect_sound: Arc<Mutex<VecDeque<f32>>>,
- user_bufs: Arc<Mutex<HashMap<(VoiceStreamType, u32), ClientStream>>>,
+ user_bufs: Arc<Mutex<ClientStream>>,
output_volume_receiver: watch::Receiver<f32>,
user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
@@ -86,27 +146,23 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>
let volume = *output_volume_receiver.borrow();
- let mut effects_sound = effect_sound.lock().unwrap();
let mut user_bufs = user_bufs.lock().unwrap();
- for ((_, id), client_stream) in &mut *user_bufs {
- let (user_volume, muted) = user_volumes
- .lock()
- .unwrap()
- .get(id)
- .cloned()
- .unwrap_or((1.0, false));
- for sample in data.iter_mut() {
- let s = client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume;
- if !muted {
- *sample = sample.saturating_add(Sample::from(&s));
+ let user_volumes = user_volumes.lock().unwrap();
+ let mut saturating_add = |input: ConsumerInput| {
+ for (k, v) in input {
+ let (user_volume, muted) = match k {
+ Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)),
+ None => (1.0, false),
+ };
+ for sample in data.iter_mut() {
+ if !muted {
+ *sample = sample.saturating_add(Sample::from(
+ &(v.pop_front().unwrap_or(0.0) * volume * user_volume),
+ ));
+ }
}
}
- }
-
- for sample in data.iter_mut() {
- *sample = sample.saturating_add(Sample::from(
- &(effects_sound.pop_front().unwrap_or(0.0) * volume),
- ));
- }
+ };
+ user_bufs.consume(&mut saturating_add);
}
}