From bc65445af44a335a0586a393c792614330258249 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Sat, 12 Jun 2021 02:30:01 +0200 Subject: simplify audio output infrastructure --- mumd/src/audio/input.rs | 79 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 67 insertions(+), 12 deletions(-) (limited to 'mumd/src/audio/input.rs') diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index e45ff27..162dd2c 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -8,20 +8,49 @@ use crate::error::{AudioError, AudioStream}; use crate::state::StatePhase; pub fn callback( - mut input_sender: futures_channel::mpsc::Sender, + mut input_sender: futures_channel::mpsc::Sender>, + transformers: Vec Option<&mut [f32]> + Send + 'static>>, + frame_size: u32, + sample_rate: u32, + channels: u16, input_volume_receiver: watch::Receiver, phase_watcher: watch::Receiver, ) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { + let buffer_size = (sample_rate * frame_size / 400) as usize; + let mut opus_encoder = opus::Encoder::new( + sample_rate, + match channels { + 1 => opus::Channels::Mono, + 2 => opus::Channels::Stereo, + _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), + }, + opus::Application::Voip, + ) + .unwrap(); + let mut buffer = Vec::with_capacity(buffer_size); + move |data: &[T], _info: &InputCallbackInfo| { if !matches!(&*phase_watcher.borrow(), StatePhase::Connected(_)) { return; } let input_volume = *input_volume_receiver.borrow(); - for sample in data.iter().map(|e| e.to_f32()).map(|e| e * input_volume) { - if let Err(_e) = input_sender.try_send(sample) { - warn!("Error sending audio: {}", _e); + let mut data = data.iter().map(|e| e.to_f32()).map(|e| e * input_volume); + + while buffer.len() + data.len() > buffer_size { + buffer.extend(data.by_ref().take(buffer_size - buffer.len())); + let encoded = transformers + .iter() + .try_fold(&mut buffer[..], |acc, e| e(acc)) + .map(|buf| opus_encoder.encode_vec_float(&*buf, buffer_size).unwrap()); + + if let Some(encoded) = encoded { + if let Err(e) = input_sender.try_send(encoded) { + warn!("Error sending audio: {}", e); + } } + buffer.clear(); } + buffer.extend(data); } } @@ -29,13 +58,13 @@ pub trait AudioInputDevice { fn play(&self) -> Result<(), AudioError>; fn pause(&self) -> Result<(), AudioError>; fn set_volume(&self, volume: f32); - fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver; + fn sample_receiver(&mut self) -> Option>>; fn num_channels(&self) -> usize; } pub struct DefaultAudioInputDevice { stream: cpal::Stream, - sample_receiver: Option>, + sample_receiver: Option>>, volume_sender: watch::Sender, channels: u16, } @@ -44,6 +73,7 @@ impl DefaultAudioInputDevice { pub fn new( input_volume: f32, phase_watcher: watch::Receiver, + frame_size: u32, ) -> Result { let sample_rate = SampleRate(SAMPLE_RATE); @@ -73,20 +103,46 @@ impl DefaultAudioInputDevice { let (volume_sender, input_volume_receiver) = watch::channel::(input_volume); + let transformers = vec![]; + let input_stream = match input_supported_sample_format { SampleFormat::F32 => input_device.build_input_stream( &input_config, - callback::(sample_sender, input_volume_receiver, phase_watcher), + callback::( + sample_sender, + transformers, + frame_size, + SAMPLE_RATE, + input_config.channels, + input_volume_receiver, + phase_watcher + ), err_fn, ), SampleFormat::I16 => input_device.build_input_stream( &input_config, - callback::(sample_sender, input_volume_receiver, phase_watcher), + callback::( + sample_sender, + transformers, + frame_size, + SAMPLE_RATE, + input_config.channels, + input_volume_receiver, + phase_watcher + ), err_fn, ), SampleFormat::U16 => input_device.build_input_stream( &input_config, - callback::(sample_sender, input_volume_receiver, phase_watcher), + callback::( + sample_sender, + transformers, + frame_size, + SAMPLE_RATE, + input_config.channels, + input_volume_receiver, + phase_watcher + ), err_fn, ), } @@ -116,9 +172,8 @@ impl AudioInputDevice for DefaultAudioInputDevice { fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); } - fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver { - let ret = self.sample_receiver.take(); - ret.unwrap() + fn sample_receiver(&mut self) -> Option>> { + self.sample_receiver.take() } fn num_channels(&self) -> usize { self.channels as usize -- cgit v1.2.1