aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/audio/output.rs
diff options
context:
space:
mode:
Diffstat (limited to 'mumd/src/audio/output.rs')
-rw-r--r--mumd/src/audio/output.rs108
1 files changed, 82 insertions, 26 deletions
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs
index 421d395..797cf84 100644
--- a/mumd/src/audio/output.rs
+++ b/mumd/src/audio/output.rs
@@ -1,22 +1,29 @@
use crate::network::VoiceStreamType;
+use log::*;
use cpal::{OutputCallbackInfo, Sample};
use mumble_protocol::voice::VoicePacketPayload;
use opus::Channels;
-use std::collections::{HashMap, VecDeque};
+use std::collections::{HashMap, VecDeque, hash_map::Entry};
use std::ops::AddAssign;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
+type ClientStreamKey = Option<(VoiceStreamType, u32)>;
+type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque<f32>>;
+
pub struct ClientStream {
- buffer: VecDeque<f32>, //TODO ring buffer?
+ buffer: HashMap<ClientStreamKey, VecDeque<f32>>, //TODO ring buffer?
opus_decoder: opus::Decoder,
}
impl ClientStream {
pub fn new(sample_rate: u32, channels: u16) -> Self {
+ let mut buffer = HashMap::new();
+ //None is the system audio effects
+ buffer.insert(None, VecDeque::new());
Self {
- buffer: VecDeque::new(),
+ buffer,
opus_decoder: opus::Decoder::new(
sample_rate,
match channels {
@@ -29,7 +36,7 @@ impl ClientStream {
}
}
- pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) {
+ pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) {
match payload {
VoicePacketPayload::Opus(bytes, _eot) => {
let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode
@@ -38,13 +45,67 @@ impl ClientStream {
.decode_float(&bytes, &mut out, false)
.expect("Error decoding");
out.truncate(parsed);
- self.buffer.extend(out);
+ self.extend(client, &out);
}
_ => {
unimplemented!("Payload type not supported");
}
}
}
+
+ pub fn consume<'a, F>(&mut self, consumer: F)
+ where
+ F: FnOnce(ConsumerInput) {
+ let iter = self.buffer.iter_mut();
+ consumer(iter);
+ }
+
+ pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) {
+ match self.buffer.entry(key) {
+ Entry::Occupied(mut entry) => {
+ entry.get_mut().extend(values.iter().copied());
+ }
+ Entry::Vacant(_) => {
+ match key {
+ None => warn!("Can't find session None"),
+ Some(key) => warn!("Can't find session id {}", key.1),
+ }
+ }
+ }
+ }
+
+ pub fn add_client(&mut self, session_id: u32) {
+ for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() {
+ match self.buffer.entry(Some((*stream_type, session_id))) {
+ Entry::Occupied(_) => {
+ warn!("Session id {} already exists", session_id);
+ }
+ Entry::Vacant(entry) => {
+ entry.insert(VecDeque::new());
+ }
+ }
+ }
+ }
+
+ pub fn remove_client(&mut self, session_id: u32) {
+ for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() {
+ match self.buffer.entry(Some((*stream_type, session_id))) {
+ Entry::Occupied(entry) => {
+ entry.remove();
+ }
+ Entry::Vacant(_) => {
+ warn!(
+ "Tried to remove session id {} that doesn't exist",
+ session_id
+ );
+ }
+ }
+ }
+ }
+
+ pub fn clear_clients(&mut self) {
+ self.buffer.retain(|k , _| k.is_none());
+ }
}
pub trait SaturatingAdd {
@@ -74,8 +135,7 @@ impl SaturatingAdd for u16 {
}
pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>(
- effect_sound: Arc<Mutex<VecDeque<f32>>>,
- user_bufs: Arc<Mutex<HashMap<(VoiceStreamType, u32), ClientStream>>>,
+ user_bufs: Arc<Mutex<ClientStream>>,
output_volume_receiver: watch::Receiver<f32>,
user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
@@ -86,27 +146,23 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>
let volume = *output_volume_receiver.borrow();
- let mut effects_sound = effect_sound.lock().unwrap();
let mut user_bufs = user_bufs.lock().unwrap();
- for ((_, id), client_stream) in &mut *user_bufs {
- let (user_volume, muted) = user_volumes
- .lock()
- .unwrap()
- .get(id)
- .cloned()
- .unwrap_or((1.0, false));
- for sample in data.iter_mut() {
- let s = client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume;
- if !muted {
- *sample = sample.saturating_add(Sample::from(&s));
+ let user_volumes = user_volumes.lock().unwrap();
+ let mut saturating_add = |input: ConsumerInput| {
+ for (k, v) in input {
+ let (user_volume, muted) = match k {
+ Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)),
+ None => (1.0, false),
+ };
+ for sample in data.iter_mut() {
+ if !muted {
+ *sample = sample.saturating_add(Sample::from(
+ &(v.pop_front().unwrap_or(0.0) * volume * user_volume),
+ ));
+ }
}
}
- }
-
- for sample in data.iter_mut() {
- *sample = sample.saturating_add(Sample::from(
- &(effects_sound.pop_front().unwrap_or(0.0) * volume),
- ));
- }
+ };
+ user_bufs.consume(&mut saturating_add);
}
}