aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/audio
diff options
context:
space:
mode:
authorRubens Brandao <git@rubens.io>2021-04-08 21:14:48 +0200
committerRubens Brandao <git@rubens.io>2021-04-08 21:14:48 +0200
commit7fed8f81222de570d864487605e42b5cbb023218 (patch)
tree03ff7f0e85ce6c356d09ff47f4d782110310657c /mumd/src/audio
parent12554b2f6cd89ad3cd3721bbc790d7772a21c3ae (diff)
downloadmum-7fed8f81222de570d864487605e42b5cbb023218.tar.gz
Move audio decode logic to ClientStream
This way is possible to deduplicate the opus::Decoder used by audio output. The audio effects and client network streams are unified in only one place, allowing the Audio Output device to consume the Samples with only one call.
Diffstat (limited to 'mumd/src/audio')
-rw-r--r--mumd/src/audio/output.rs108
1 files changed, 82 insertions, 26 deletions
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs
index 421d395..797cf84 100644
--- a/mumd/src/audio/output.rs
+++ b/mumd/src/audio/output.rs
@@ -1,22 +1,29 @@
use crate::network::VoiceStreamType;
+use log::*;
use cpal::{OutputCallbackInfo, Sample};
use mumble_protocol::voice::VoicePacketPayload;
use opus::Channels;
-use std::collections::{HashMap, VecDeque};
+use std::collections::{HashMap, VecDeque, hash_map::Entry};
use std::ops::AddAssign;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
+type ClientStreamKey = Option<(VoiceStreamType, u32)>;
+type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque<f32>>;
+
pub struct ClientStream {
- buffer: VecDeque<f32>, //TODO ring buffer?
+ buffer: HashMap<ClientStreamKey, VecDeque<f32>>, //TODO ring buffer?
opus_decoder: opus::Decoder,
}
impl ClientStream {
pub fn new(sample_rate: u32, channels: u16) -> Self {
+ let mut buffer = HashMap::new();
+ //None is the system audio effects
+ buffer.insert(None, VecDeque::new());
Self {
- buffer: VecDeque::new(),
+ buffer,
opus_decoder: opus::Decoder::new(
sample_rate,
match channels {
@@ -29,7 +36,7 @@ impl ClientStream {
}
}
- pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) {
+ pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) {
match payload {
VoicePacketPayload::Opus(bytes, _eot) => {
let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode
@@ -38,13 +45,67 @@ impl ClientStream {
.decode_float(&bytes, &mut out, false)
.expect("Error decoding");
out.truncate(parsed);
- self.buffer.extend(out);
+ self.extend(client, &out);
}
_ => {
unimplemented!("Payload type not supported");
}
}
}
+
+ pub fn consume<'a, F>(&mut self, consumer: F)
+ where
+ F: FnOnce(ConsumerInput) {
+ let iter = self.buffer.iter_mut();
+ consumer(iter);
+ }
+
+ pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) {
+ match self.buffer.entry(key) {
+ Entry::Occupied(mut entry) => {
+ entry.get_mut().extend(values.iter().copied());
+ }
+ Entry::Vacant(_) => {
+ match key {
+ None => warn!("Can't find session None"),
+ Some(key) => warn!("Can't find session id {}", key.1),
+ }
+ }
+ }
+ }
+
+ pub fn add_client(&mut self, session_id: u32) {
+ for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() {
+ match self.buffer.entry(Some((*stream_type, session_id))) {
+ Entry::Occupied(_) => {
+ warn!("Session id {} already exists", session_id);
+ }
+ Entry::Vacant(entry) => {
+ entry.insert(VecDeque::new());
+ }
+ }
+ }
+ }
+
+ pub fn remove_client(&mut self, session_id: u32) {
+ for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() {
+ match self.buffer.entry(Some((*stream_type, session_id))) {
+ Entry::Occupied(entry) => {
+ entry.remove();
+ }
+ Entry::Vacant(_) => {
+ warn!(
+ "Tried to remove session id {} that doesn't exist",
+ session_id
+ );
+ }
+ }
+ }
+ }
+
+ pub fn clear_clients(&mut self) {
+ self.buffer.retain(|k , _| k.is_none());
+ }
}
pub trait SaturatingAdd {
@@ -74,8 +135,7 @@ impl SaturatingAdd for u16 {
}
pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>(
- effect_sound: Arc<Mutex<VecDeque<f32>>>,
- user_bufs: Arc<Mutex<HashMap<(VoiceStreamType, u32), ClientStream>>>,
+ user_bufs: Arc<Mutex<ClientStream>>,
output_volume_receiver: watch::Receiver<f32>,
user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
@@ -86,27 +146,23 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>
let volume = *output_volume_receiver.borrow();
- let mut effects_sound = effect_sound.lock().unwrap();
let mut user_bufs = user_bufs.lock().unwrap();
- for ((_, id), client_stream) in &mut *user_bufs {
- let (user_volume, muted) = user_volumes
- .lock()
- .unwrap()
- .get(id)
- .cloned()
- .unwrap_or((1.0, false));
- for sample in data.iter_mut() {
- let s = client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume;
- if !muted {
- *sample = sample.saturating_add(Sample::from(&s));
+ let user_volumes = user_volumes.lock().unwrap();
+ let mut saturating_add = |input: ConsumerInput| {
+ for (k, v) in input {
+ let (user_volume, muted) = match k {
+ Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)),
+ None => (1.0, false),
+ };
+ for sample in data.iter_mut() {
+ if !muted {
+ *sample = sample.saturating_add(Sample::from(
+ &(v.pop_front().unwrap_or(0.0) * volume * user_volume),
+ ));
+ }
}
}
- }
-
- for sample in data.iter_mut() {
- *sample = sample.saturating_add(Sample::from(
- &(effects_sound.pop_front().unwrap_or(0.0) * volume),
- ));
- }
+ };
+ user_bufs.consume(&mut saturating_add);
}
}