From 12554b2f6cd89ad3cd3721bbc790d7772a21c3ae Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Wed, 7 Apr 2021 09:29:23 -0300 Subject: Create a trait and default implementation for device audio input --- mumd/src/audio/input.rs | 110 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 1 deletion(-) (limited to 'mumd/src/audio') diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 176747d..f4e9c4c 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -1,8 +1,11 @@ -use cpal::{InputCallbackInfo, Sample}; +use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use tokio::sync::watch; use log::*; use crate::state::StatePhase; +use crate::audio::SAMPLE_RATE; +use crate::error::{AudioError, AudioStream}; pub fn callback( mut input_sender: futures_channel::mpsc::Sender, @@ -24,3 +27,108 @@ pub fn callback( } } } + +pub trait AudioInputDevice { + fn play(&self) -> Result<(), AudioError>; + fn pause(&self) -> Result<(), AudioError>; + fn set_volume(&self, volume: f32); + fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver; + fn get_num_channels(&self) -> usize; +} + +pub struct DefaultAudioInputDevice { + _stream: cpal::Stream, + sample_receiver: Option>, + volume_sender: watch::Sender, + channels: u16, +} + +impl DefaultAudioInputDevice { + pub fn new(input_volume: f32, phase_watcher: watch::Receiver) -> Result { + let sample_rate = SampleRate(SAMPLE_RATE); + + let host = cpal::default_host(); + + let input_device = host + .default_input_device() + .ok_or(AudioError::NoDevice(AudioStream::Input))?; + let input_supported_config = input_device + .supported_input_configs() + .map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))? + .find_map(|c| { + if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { + Some(c) + } else { + None + } + }) + .ok_or(AudioError::NoSupportedConfig(AudioStream::Input))? + .with_sample_rate(sample_rate); + let input_supported_sample_format = input_supported_config.sample_format(); + let input_config: StreamConfig = input_supported_config.into(); + + let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); + + let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000); + + let (volume_sender, input_volume_receiver) = watch::channel::(input_volume); + + let input_stream = match input_supported_sample_format { + SampleFormat::F32 => input_device.build_input_stream( + &input_config, + callback::( + sample_sender, + input_volume_receiver, + phase_watcher, + ), + err_fn, + ), + SampleFormat::I16 => input_device.build_input_stream( + &input_config, + callback::( + sample_sender, + input_volume_receiver, + phase_watcher, + ), + err_fn, + ), + SampleFormat::U16 => input_device.build_input_stream( + &input_config, + callback::( + sample_sender, + input_volume_receiver, + phase_watcher, + ), + err_fn, + ), + } + .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?; + + let res = Self { + _stream: input_stream, + sample_receiver: Some(sample_receiver), + volume_sender, + channels: input_config.channels, + }; + Ok(res) + } +} + +impl AudioInputDevice for DefaultAudioInputDevice { + fn play(&self) -> Result<(), AudioError> { + self._stream.play().map_err(|e| AudioError::InputPlayError(e)) + } + fn pause(&self) -> Result<(), AudioError> { + self._stream.pause().map_err(|e| AudioError::InputPauseError(e)) + } + fn set_volume(&self, volume: f32) { + self.volume_sender.send(volume).unwrap(); + } + fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver { + let ret = self.sample_receiver.take(); + ret.unwrap() + } + fn get_num_channels(&self) -> usize { + self.channels as usize + } +} -- cgit v1.2.1 From 7fed8f81222de570d864487605e42b5cbb023218 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Thu, 8 Apr 2021 16:14:48 -0300 Subject: Move audio decode logic to ClientStream This way is possible to deduplicate the opus::Decoder used by audio output. The audio effects and client network streams are unified in only one place, allowing the Audio Output device to consume the Samples with only one call. --- mumd/src/audio/output.rs | 108 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 26 deletions(-) (limited to 'mumd/src/audio') diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 421d395..797cf84 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,22 +1,29 @@ use crate::network::VoiceStreamType; +use log::*; use cpal::{OutputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; use opus::Channels; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, VecDeque, hash_map::Entry}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; +type ClientStreamKey = Option<(VoiceStreamType, u32)>; +type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque>; + pub struct ClientStream { - buffer: VecDeque, //TODO ring buffer? + buffer: HashMap>, //TODO ring buffer? opus_decoder: opus::Decoder, } impl ClientStream { pub fn new(sample_rate: u32, channels: u16) -> Self { + let mut buffer = HashMap::new(); + //None is the system audio effects + buffer.insert(None, VecDeque::new()); Self { - buffer: VecDeque::new(), + buffer, opus_decoder: opus::Decoder::new( sample_rate, match channels { @@ -29,7 +36,7 @@ impl ClientStream { } } - pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) { + pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { let mut out: Vec = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode @@ -38,13 +45,67 @@ impl ClientStream { .decode_float(&bytes, &mut out, false) .expect("Error decoding"); out.truncate(parsed); - self.buffer.extend(out); + self.extend(client, &out); } _ => { unimplemented!("Payload type not supported"); } } } + + pub fn consume<'a, F>(&mut self, consumer: F) + where + F: FnOnce(ConsumerInput) { + let iter = self.buffer.iter_mut(); + consumer(iter); + } + + pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { + match self.buffer.entry(key) { + Entry::Occupied(mut entry) => { + entry.get_mut().extend(values.iter().copied()); + } + Entry::Vacant(_) => { + match key { + None => warn!("Can't find session None"), + Some(key) => warn!("Can't find session id {}", key.1), + } + } + } + } + + pub fn add_client(&mut self, session_id: u32) { + for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { + match self.buffer.entry(Some((*stream_type, session_id))) { + Entry::Occupied(_) => { + warn!("Session id {} already exists", session_id); + } + Entry::Vacant(entry) => { + entry.insert(VecDeque::new()); + } + } + } + } + + pub fn remove_client(&mut self, session_id: u32) { + for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { + match self.buffer.entry(Some((*stream_type, session_id))) { + Entry::Occupied(entry) => { + entry.remove(); + } + Entry::Vacant(_) => { + warn!( + "Tried to remove session id {} that doesn't exist", + session_id + ); + } + } + } + } + + pub fn clear_clients(&mut self) { + self.buffer.retain(|k , _| k.is_none()); + } } pub trait SaturatingAdd { @@ -74,8 +135,7 @@ impl SaturatingAdd for u16 { } pub fn curry_callback( - effect_sound: Arc>>, - user_bufs: Arc>>, + user_bufs: Arc>, output_volume_receiver: watch::Receiver, user_volumes: Arc>>, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { @@ -86,27 +146,23 @@ pub fn curry_callback let volume = *output_volume_receiver.borrow(); - let mut effects_sound = effect_sound.lock().unwrap(); let mut user_bufs = user_bufs.lock().unwrap(); - for ((_, id), client_stream) in &mut *user_bufs { - let (user_volume, muted) = user_volumes - .lock() - .unwrap() - .get(id) - .cloned() - .unwrap_or((1.0, false)); - for sample in data.iter_mut() { - let s = client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume; - if !muted { - *sample = sample.saturating_add(Sample::from(&s)); + let user_volumes = user_volumes.lock().unwrap(); + let mut saturating_add = |input: ConsumerInput| { + for (k, v) in input { + let (user_volume, muted) = match k { + Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)), + None => (1.0, false), + }; + for sample in data.iter_mut() { + if !muted { + *sample = sample.saturating_add(Sample::from( + &(v.pop_front().unwrap_or(0.0) * volume * user_volume), + )); + } } } - } - - for sample in data.iter_mut() { - *sample = sample.saturating_add(Sample::from( - &(effects_sound.pop_front().unwrap_or(0.0) * volume), - )); - } + }; + user_bufs.consume(&mut saturating_add); } } -- cgit v1.2.1 From 07d06b6946e23ecffbf5549376cf464013222274 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Thu, 8 Apr 2021 17:14:29 -0300 Subject: Create a trait and default device audio output Also removed add/remove/clear client from audio interface, it is done on demand now. --- mumd/src/audio/output.rs | 165 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 118 insertions(+), 47 deletions(-) (limited to 'mumd/src/audio') diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 797cf84..3664be8 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,10 +1,13 @@ use crate::network::VoiceStreamType; -use log::*; +use crate::audio::SAMPLE_RATE; +use crate::error::{AudioError, AudioStream}; -use cpal::{OutputCallbackInfo, Sample}; +use log::*; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use cpal::{SampleFormat, SampleRate, StreamConfig, OutputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; use opus::Channels; -use std::collections::{HashMap, VecDeque, hash_map::Entry}; +use std::collections::{HashMap, VecDeque}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; @@ -58,53 +61,13 @@ impl ClientStream { F: FnOnce(ConsumerInput) { let iter = self.buffer.iter_mut(); consumer(iter); + //remove empty Vec + self.buffer.retain(|_, v| v.is_empty()); } pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { - match self.buffer.entry(key) { - Entry::Occupied(mut entry) => { - entry.get_mut().extend(values.iter().copied()); - } - Entry::Vacant(_) => { - match key { - None => warn!("Can't find session None"), - Some(key) => warn!("Can't find session id {}", key.1), - } - } - } - } - - pub fn add_client(&mut self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.buffer.entry(Some((*stream_type, session_id))) { - Entry::Occupied(_) => { - warn!("Session id {} already exists", session_id); - } - Entry::Vacant(entry) => { - entry.insert(VecDeque::new()); - } - } - } - } - - pub fn remove_client(&mut self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.buffer.entry(Some((*stream_type, session_id))) { - Entry::Occupied(entry) => { - entry.remove(); - } - Entry::Vacant(_) => { - warn!( - "Tried to remove session id {} that doesn't exist", - session_id - ); - } - } - } - } - - pub fn clear_clients(&mut self) { - self.buffer.retain(|k , _| k.is_none()); + let entry = self.buffer.entry(key).or_insert(VecDeque::new()); + entry.extend(values.iter().copied()); } } @@ -134,6 +97,114 @@ impl SaturatingAdd for u16 { } } +pub trait AudioOutputDevice { + fn play(&self) -> Result<(), AudioError>; + fn pause(&self) -> Result<(), AudioError>; + fn set_volume(&self, volume: f32); + fn get_num_channels(&self) -> usize; + fn get_client_streams(&self) -> Arc>; +} + +pub struct DefaultAudioOutputDevice { + config: StreamConfig, + _stream: cpal::Stream, + client_streams: Arc>, + volume_sender: watch::Sender, +} + +impl DefaultAudioOutputDevice { + pub fn new( + output_volume: f32, + user_volumes: Arc>>, + ) -> Result { + let sample_rate = SampleRate(SAMPLE_RATE); + + let host = cpal::default_host(); + let output_device = host + .default_output_device() + .ok_or(AudioError::NoDevice(AudioStream::Output))?; + let output_supported_config = output_device + .supported_output_configs() + .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))? + .find_map(|c| { + if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { + Some(c) + } else { + None + } + }) + .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))? + .with_sample_rate(sample_rate); + let output_supported_sample_format = output_supported_config.sample_format(); + let output_config: StreamConfig = output_supported_config.into(); + let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(sample_rate.0, output_config.channels))); + + let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); + + let (output_volume_sender, output_volume_receiver) = watch::channel::(output_volume); + + let output_stream = match output_supported_sample_format { + SampleFormat::F32 => output_device.build_output_stream( + &output_config, + curry_callback::( + Arc::clone(&client_streams), + output_volume_receiver, + user_volumes, + ), + err_fn, + ), + SampleFormat::I16 => output_device.build_output_stream( + &output_config, + curry_callback::( + Arc::clone(&client_streams), + output_volume_receiver, + user_volumes, + ), + err_fn, + ), + SampleFormat::U16 => output_device.build_output_stream( + &output_config, + curry_callback::( + Arc::clone(&client_streams), + output_volume_receiver, + user_volumes, + ), + err_fn, + ), + } + .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; + + Ok(Self { + config: output_config, + _stream: output_stream, + volume_sender: output_volume_sender, + client_streams, + }) + } +} + +impl AudioOutputDevice for DefaultAudioOutputDevice { + fn play(&self) -> Result<(), AudioError> { + self._stream.play().map_err(|e| AudioError::OutputPlayError(e)) + } + + fn pause(&self) -> Result<(), AudioError> { + self._stream.pause().map_err(|e| AudioError::OutputPauseError(e)) + } + + fn set_volume(&self, volume: f32) { + self.volume_sender.send(volume).unwrap(); + } + + fn get_num_channels(&self) -> usize { + self.config.channels as usize + } + + fn get_client_streams(&self) -> Arc> { + Arc::clone(&self.client_streams) + } +} + pub fn curry_callback( user_bufs: Arc>, output_volume_receiver: watch::Receiver, -- cgit v1.2.1 From 3caae1e9e17524cd2fdedc39c075ceda231cf0e1 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Sat, 10 Apr 2021 07:59:29 -0300 Subject: Fix audio output being cut Where did this `!` go? My keyboard have a problem? --- mumd/src/audio/output.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'mumd/src/audio') diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 3664be8..df9b2e2 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -62,7 +62,7 @@ impl ClientStream { let iter = self.buffer.iter_mut(); consumer(iter); //remove empty Vec - self.buffer.retain(|_, v| v.is_empty()); + self.buffer.retain(|_, v| !v.is_empty()); } pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { -- cgit v1.2.1 From 727710ae7e3ac8c35d66e0431682a2a90f2bd3a4 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Sat, 10 Apr 2021 14:43:21 -0300 Subject: Restore multiple decoders --- mumd/src/audio/input.rs | 12 ++--- mumd/src/audio/output.rs | 115 ++++++++++++++++++++++++----------------------- 2 files changed, 64 insertions(+), 63 deletions(-) (limited to 'mumd/src/audio') diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index f4e9c4c..4a1ed3d 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -33,11 +33,11 @@ pub trait AudioInputDevice { fn pause(&self) -> Result<(), AudioError>; fn set_volume(&self, volume: f32); fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver; - fn get_num_channels(&self) -> usize; + fn num_channels(&self) -> usize; } pub struct DefaultAudioInputDevice { - _stream: cpal::Stream, + stream: cpal::Stream, sample_receiver: Option>, volume_sender: watch::Sender, channels: u16, @@ -105,7 +105,7 @@ impl DefaultAudioInputDevice { .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?; let res = Self { - _stream: input_stream, + stream: input_stream, sample_receiver: Some(sample_receiver), volume_sender, channels: input_config.channels, @@ -116,10 +116,10 @@ impl DefaultAudioInputDevice { impl AudioInputDevice for DefaultAudioInputDevice { fn play(&self) -> Result<(), AudioError> { - self._stream.play().map_err(|e| AudioError::InputPlayError(e)) + self.stream.play().map_err(|e| AudioError::InputPlayError(e)) } fn pause(&self) -> Result<(), AudioError> { - self._stream.pause().map_err(|e| AudioError::InputPauseError(e)) + self.stream.pause().map_err(|e| AudioError::InputPauseError(e)) } fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); @@ -128,7 +128,7 @@ impl AudioInputDevice for DefaultAudioInputDevice { let ret = self.sample_receiver.take(); ret.unwrap() } - fn get_num_channels(&self) -> usize { + fn num_channels(&self) -> usize { self.channels as usize } } diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index df9b2e2..658c1c8 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -6,49 +6,57 @@ use log::*; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{SampleFormat, SampleRate, StreamConfig, OutputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; -use opus::Channels; use std::collections::{HashMap, VecDeque}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; -type ClientStreamKey = Option<(VoiceStreamType, u32)>; -type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque>; +type ClientStreamKey = (VoiceStreamType, u32); pub struct ClientStream { - buffer: HashMap>, //TODO ring buffer? - opus_decoder: opus::Decoder, + buffer_clients: HashMap, opus::Decoder)>, //TODO ring buffer? + buffer_effects: VecDeque, + sample_rate: u32, + channels: opus::Channels, } impl ClientStream { pub fn new(sample_rate: u32, channels: u16) -> Self { - let mut buffer = HashMap::new(); - //None is the system audio effects - buffer.insert(None, VecDeque::new()); + let channels = match channels { + 1 => opus::Channels::Mono, + 2 => opus::Channels::Stereo, + _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), + }; Self { - buffer, - opus_decoder: opus::Decoder::new( - sample_rate, - match channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), - }, - ) - .unwrap(), + buffer_clients: HashMap::new(), + buffer_effects: VecDeque::new(), + sample_rate, + channels, } } - pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) { + fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque, opus::Decoder) { + let sample_rate = self.sample_rate; + let channels = self.channels; + self.buffer_clients.entry(client).or_insert_with(|| { + let opus_decoder = opus::Decoder::new( + sample_rate, + channels + ).unwrap(); + (VecDeque::new(), opus_decoder) + }) + } + + pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode - let parsed = self - .opus_decoder + let mut out: Vec = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode + let (buffer, decoder) = self.get_client(client); + let parsed = decoder .decode_float(&bytes, &mut out, false) .expect("Error decoding"); out.truncate(parsed); - self.extend(client, &out); + buffer.extend(&out); } _ => { unimplemented!("Payload type not supported"); @@ -56,18 +64,12 @@ impl ClientStream { } } - pub fn consume<'a, F>(&mut self, consumer: F) - where - F: FnOnce(ConsumerInput) { - let iter = self.buffer.iter_mut(); - consumer(iter); - //remove empty Vec - self.buffer.retain(|_, v| !v.is_empty()); - } - - pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { - let entry = self.buffer.entry(key).or_insert(VecDeque::new()); - entry.extend(values.iter().copied()); + pub fn extend(&mut self, client: Option, values: &[f32]) { + let buffer = match client { + Some(x) => &mut self.get_client(x).0, + None => &mut self.buffer_effects, + }; + buffer.extend(values.iter().copied()); } } @@ -101,13 +103,13 @@ pub trait AudioOutputDevice { fn play(&self) -> Result<(), AudioError>; fn pause(&self) -> Result<(), AudioError>; fn set_volume(&self, volume: f32); - fn get_num_channels(&self) -> usize; - fn get_client_streams(&self) -> Arc>; + fn num_channels(&self) -> usize; + fn client_streams(&self) -> Arc>; } pub struct DefaultAudioOutputDevice { config: StreamConfig, - _stream: cpal::Stream, + stream: cpal::Stream, client_streams: Arc>, volume_sender: watch::Sender, } @@ -176,7 +178,7 @@ impl DefaultAudioOutputDevice { Ok(Self { config: output_config, - _stream: output_stream, + stream: output_stream, volume_sender: output_volume_sender, client_streams, }) @@ -185,22 +187,22 @@ impl DefaultAudioOutputDevice { impl AudioOutputDevice for DefaultAudioOutputDevice { fn play(&self) -> Result<(), AudioError> { - self._stream.play().map_err(|e| AudioError::OutputPlayError(e)) + self.stream.play().map_err(|e| AudioError::OutputPlayError(e)) } fn pause(&self) -> Result<(), AudioError> { - self._stream.pause().map_err(|e| AudioError::OutputPauseError(e)) + self.stream.pause().map_err(|e| AudioError::OutputPauseError(e)) } fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); } - fn get_num_channels(&self) -> usize { + fn num_channels(&self) -> usize { self.config.channels as usize } - fn get_client_streams(&self) -> Arc> { + fn client_streams(&self) -> Arc> { Arc::clone(&self.client_streams) } } @@ -219,21 +221,20 @@ pub fn curry_callback let mut user_bufs = user_bufs.lock().unwrap(); let user_volumes = user_volumes.lock().unwrap(); - let mut saturating_add = |input: ConsumerInput| { - for (k, v) in input { - let (user_volume, muted) = match k { - Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)), - None => (1.0, false), - }; - for sample in data.iter_mut() { - if !muted { - *sample = sample.saturating_add(Sample::from( - &(v.pop_front().unwrap_or(0.0) * volume * user_volume), - )); - } + for (k, v) in user_bufs.buffer_clients.iter_mut() { + let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false)); + for sample in data.iter_mut() { + if !muted { + *sample = sample.saturating_add(Sample::from( + &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume), + )); } } - }; - user_bufs.consume(&mut saturating_add); + } + for sample in data.iter_mut() { + *sample = sample.saturating_add(Sample::from( + &(user_bufs.buffer_effects.pop_front().unwrap_or(0.0) * volume), + )); + } } } -- cgit v1.2.1