aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/audio/output.rs
diff options
context:
space:
mode:
authorRubens Brandao <git@rubens.io>2021-04-10 19:43:21 +0200
committerRubens Brandao <git@rubens.io>2021-04-10 19:43:21 +0200
commit727710ae7e3ac8c35d66e0431682a2a90f2bd3a4 (patch)
tree6d815dada110803aa787d116197d9aa0094821f0 /mumd/src/audio/output.rs
parent3caae1e9e17524cd2fdedc39c075ceda231cf0e1 (diff)
downloadmum-727710ae7e3ac8c35d66e0431682a2a90f2bd3a4.tar.gz
Restore multiple decoders
Diffstat (limited to 'mumd/src/audio/output.rs')
-rw-r--r--mumd/src/audio/output.rs115
1 files changed, 58 insertions, 57 deletions
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs
index df9b2e2..658c1c8 100644
--- a/mumd/src/audio/output.rs
+++ b/mumd/src/audio/output.rs
@@ -6,49 +6,57 @@ use log::*;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{SampleFormat, SampleRate, StreamConfig, OutputCallbackInfo, Sample};
use mumble_protocol::voice::VoicePacketPayload;
-use opus::Channels;
use std::collections::{HashMap, VecDeque};
use std::ops::AddAssign;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
-type ClientStreamKey = Option<(VoiceStreamType, u32)>;
-type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque<f32>>;
+type ClientStreamKey = (VoiceStreamType, u32);
pub struct ClientStream {
- buffer: HashMap<ClientStreamKey, VecDeque<f32>>, //TODO ring buffer?
- opus_decoder: opus::Decoder,
+ buffer_clients: HashMap<ClientStreamKey, (VecDeque<f32>, opus::Decoder)>, //TODO ring buffer?
+ buffer_effects: VecDeque<f32>,
+ sample_rate: u32,
+ channels: opus::Channels,
}
impl ClientStream {
pub fn new(sample_rate: u32, channels: u16) -> Self {
- let mut buffer = HashMap::new();
- //None is the system audio effects
- buffer.insert(None, VecDeque::new());
+ let channels = match channels {
+ 1 => opus::Channels::Mono,
+ 2 => opus::Channels::Stereo,
+ _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels),
+ };
Self {
- buffer,
- opus_decoder: opus::Decoder::new(
- sample_rate,
- match channels {
- 1 => Channels::Mono,
- 2 => Channels::Stereo,
- _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels),
- },
- )
- .unwrap(),
+ buffer_clients: HashMap::new(),
+ buffer_effects: VecDeque::new(),
+ sample_rate,
+ channels,
}
}
- pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) {
+ fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque<f32>, opus::Decoder) {
+ let sample_rate = self.sample_rate;
+ let channels = self.channels;
+ self.buffer_clients.entry(client).or_insert_with(|| {
+ let opus_decoder = opus::Decoder::new(
+ sample_rate,
+ channels
+ ).unwrap();
+ (VecDeque::new(), opus_decoder)
+ })
+ }
+
+ pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) {
match payload {
VoicePacketPayload::Opus(bytes, _eot) => {
- let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode
- let parsed = self
- .opus_decoder
+ let mut out: Vec<f32> = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode
+ let (buffer, decoder) = self.get_client(client);
+ let parsed = decoder
.decode_float(&bytes, &mut out, false)
.expect("Error decoding");
out.truncate(parsed);
- self.extend(client, &out);
+ buffer.extend(&out);
}
_ => {
unimplemented!("Payload type not supported");
@@ -56,18 +64,12 @@ impl ClientStream {
}
}
- pub fn consume<'a, F>(&mut self, consumer: F)
- where
- F: FnOnce(ConsumerInput) {
- let iter = self.buffer.iter_mut();
- consumer(iter);
- //remove empty Vec
- self.buffer.retain(|_, v| !v.is_empty());
- }
-
- pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) {
- let entry = self.buffer.entry(key).or_insert(VecDeque::new());
- entry.extend(values.iter().copied());
+ pub fn extend(&mut self, client: Option<ClientStreamKey>, values: &[f32]) {
+ let buffer = match client {
+ Some(x) => &mut self.get_client(x).0,
+ None => &mut self.buffer_effects,
+ };
+ buffer.extend(values.iter().copied());
}
}
@@ -101,13 +103,13 @@ pub trait AudioOutputDevice {
fn play(&self) -> Result<(), AudioError>;
fn pause(&self) -> Result<(), AudioError>;
fn set_volume(&self, volume: f32);
- fn get_num_channels(&self) -> usize;
- fn get_client_streams(&self) -> Arc<Mutex<ClientStream>>;
+ fn num_channels(&self) -> usize;
+ fn client_streams(&self) -> Arc<Mutex<ClientStream>>;
}
pub struct DefaultAudioOutputDevice {
config: StreamConfig,
- _stream: cpal::Stream,
+ stream: cpal::Stream,
client_streams: Arc<Mutex<ClientStream>>,
volume_sender: watch::Sender<f32>,
}
@@ -176,7 +178,7 @@ impl DefaultAudioOutputDevice {
Ok(Self {
config: output_config,
- _stream: output_stream,
+ stream: output_stream,
volume_sender: output_volume_sender,
client_streams,
})
@@ -185,22 +187,22 @@ impl DefaultAudioOutputDevice {
impl AudioOutputDevice for DefaultAudioOutputDevice {
fn play(&self) -> Result<(), AudioError> {
- self._stream.play().map_err(|e| AudioError::OutputPlayError(e))
+ self.stream.play().map_err(|e| AudioError::OutputPlayError(e))
}
fn pause(&self) -> Result<(), AudioError> {
- self._stream.pause().map_err(|e| AudioError::OutputPauseError(e))
+ self.stream.pause().map_err(|e| AudioError::OutputPauseError(e))
}
fn set_volume(&self, volume: f32) {
self.volume_sender.send(volume).unwrap();
}
- fn get_num_channels(&self) -> usize {
+ fn num_channels(&self) -> usize {
self.config.channels as usize
}
- fn get_client_streams(&self) -> Arc<Mutex<ClientStream>> {
+ fn client_streams(&self) -> Arc<Mutex<ClientStream>> {
Arc::clone(&self.client_streams)
}
}
@@ -219,21 +221,20 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>
let mut user_bufs = user_bufs.lock().unwrap();
let user_volumes = user_volumes.lock().unwrap();
- let mut saturating_add = |input: ConsumerInput| {
- for (k, v) in input {
- let (user_volume, muted) = match k {
- Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)),
- None => (1.0, false),
- };
- for sample in data.iter_mut() {
- if !muted {
- *sample = sample.saturating_add(Sample::from(
- &(v.pop_front().unwrap_or(0.0) * volume * user_volume),
- ));
- }
+ for (k, v) in user_bufs.buffer_clients.iter_mut() {
+ let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false));
+ for sample in data.iter_mut() {
+ if !muted {
+ *sample = sample.saturating_add(Sample::from(
+ &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume),
+ ));
}
}
- };
- user_bufs.consume(&mut saturating_add);
+ }
+ for sample in data.iter_mut() {
+ *sample = sample.saturating_add(Sample::from(
+ &(user_bufs.buffer_effects.pop_front().unwrap_or(0.0) * volume),
+ ));
+ }
}
}