From e431ecb6c5c8406bde6a54f40ee2f648cc0cec05 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Mon, 5 Apr 2021 13:39:55 -0300 Subject: Separate the input and output audio --- mumd/src/audio.rs | 181 ++++++++++++++++++++++++++---------------------- mumd/src/network/tcp.rs | 4 +- mumd/src/network/udp.rs | 4 +- mumd/src/state.rs | 66 ++++++++++-------- 4 files changed, 141 insertions(+), 114 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index fdbeaee..0aac68c 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -67,46 +67,18 @@ impl TryFrom<&str> for NotificationEvents { } } -pub struct Audio { - output_config: StreamConfig, - _output_stream: cpal::Stream, +pub struct AudioInput { _input_stream: cpal::Stream, input_channel_receiver: Arc> + Unpin>>>, input_volume_sender: watch::Sender, - - output_volume_sender: watch::Sender, - - user_volumes: Arc>>, - - client_streams: Arc>>, - - sounds: HashMap>, - play_sounds: Arc>>, } -impl Audio { - pub fn new(input_volume: f32, output_volume: f32, phase_watcher: watch::Receiver) -> Result { +impl AudioInput { + pub fn new(input_volume: f32, phase_watcher: watch::Receiver) -> Result { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); - let output_device = host - .default_output_device() - .ok_or(AudioError::NoDevice(AudioStream::Output))?; - let output_supported_config = output_device - .supported_output_configs() - .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))? - .find_map(|c| { - if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { - Some(c) - } else { - None - } - }) - .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))? - .with_sample_rate(sample_rate); - let output_supported_sample_format = output_supported_config.sample_format(); - let output_config: StreamConfig = output_supported_config.into(); let input_device = host .default_input_device() @@ -128,45 +100,6 @@ impl Audio { let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); - let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); - let (output_volume_sender, output_volume_receiver) = watch::channel::(output_volume); - let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new())); - - let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new())); - let output_stream = match output_supported_sample_format { - SampleFormat::F32 => output_device.build_output_stream( - &output_config, - output::curry_callback::( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - SampleFormat::I16 => output_device.build_output_stream( - &output_config, - output::curry_callback::( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - SampleFormat::U16 => output_device.build_output_stream( - &output_config, - output::curry_callback::( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - } - .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; - let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000); let (input_volume_sender, input_volume_receiver) = watch::channel::(input_volume); @@ -218,14 +151,106 @@ impl Audio { position_info: None, }); - output_stream.play().map_err(|e| AudioError::OutputPlayError(e))?; + input_stream.play().map_err(|e| AudioError::OutputPlayError(e))?; - let mut res = Self { - output_config, - _output_stream: output_stream, + let res = Self { _input_stream: input_stream, input_volume_sender, input_channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))), + }; + Ok(res) + } + + pub fn input_receiver(&self) -> Arc> + Unpin>>> { + Arc::clone(&self.input_channel_receiver) + } + + pub fn set_input_volume(&self, input_volume: f32) { + self.input_volume_sender.send(input_volume).unwrap(); + } +} + +pub struct AudioOutput { + output_config: StreamConfig, + _output_stream: cpal::Stream, + + output_volume_sender: watch::Sender, + + user_volumes: Arc>>, + + client_streams: Arc>>, + + sounds: HashMap>, + play_sounds: Arc>>, +} + +impl AudioOutput { + pub fn new(output_volume: f32) -> Result { + let sample_rate = SampleRate(SAMPLE_RATE); + + let host = cpal::default_host(); + let output_device = host + .default_output_device() + .ok_or(AudioError::NoDevice(AudioStream::Output))?; + let output_supported_config = output_device + .supported_output_configs() + .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))? + .find_map(|c| { + if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { + Some(c) + } else { + None + } + }) + .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))? + .with_sample_rate(sample_rate); + let output_supported_sample_format = output_supported_config.sample_format(); + let output_config: StreamConfig = output_supported_config.into(); + + let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); + + let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); + let (output_volume_sender, output_volume_receiver) = watch::channel::(output_volume); + let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new())); + + let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new())); + let output_stream = match output_supported_sample_format { + SampleFormat::F32 => output_device.build_output_stream( + &output_config, + output::curry_callback::( + Arc::clone(&play_sounds), + Arc::clone(&client_streams), + output_volume_receiver, + Arc::clone(&user_volumes), + ), + err_fn, + ), + SampleFormat::I16 => output_device.build_output_stream( + &output_config, + output::curry_callback::( + Arc::clone(&play_sounds), + Arc::clone(&client_streams), + output_volume_receiver, + Arc::clone(&user_volumes), + ), + err_fn, + ), + SampleFormat::U16 => output_device.build_output_stream( + &output_config, + output::curry_callback::( + Arc::clone(&play_sounds), + Arc::clone(&client_streams), + output_volume_receiver, + Arc::clone(&user_volumes), + ), + err_fn, + ), + } + .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; + + let mut res = Self { + output_config, + _output_stream: output_stream, client_streams, sounds: HashMap::new(), output_volume_sender, @@ -335,18 +360,10 @@ impl Audio { } } - pub fn input_receiver(&self) -> Arc> + Unpin>>> { - Arc::clone(&self.input_channel_receiver) - } - pub fn clear_clients(&mut self) { self.client_streams.lock().unwrap().clear(); } - pub fn set_input_volume(&self, input_volume: f32) { - self.input_volume_sender.send(input_volume).unwrap(); - } - pub fn set_output_volume(&self, output_volume: f32) { self.output_volume_sender.send(output_volume).unwrap(); } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 6402a89..d2f0b41 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -108,7 +108,7 @@ pub async fn handle( let password = state_lock.password().map(|x| x.to_string()); authenticate(&mut sink, username, password).await?; let phase_watcher = state_lock.phase_receiver(); - let input_receiver = state_lock.audio().input_receiver(); + let input_receiver = state_lock.audio_input().input_receiver(); drop(state_lock); let event_queue = TcpEventQueue::new(); @@ -358,7 +358,7 @@ async fn listen( state .lock() .await - .audio() + .audio_output() .decode_packet_payload( VoiceStreamType::TCP, session_id, diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 5996e43..a8e190d 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -33,7 +33,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) -> Result<(), UdpError> { - let receiver = state.lock().await.audio().input_receiver(); + let receiver = state.lock().await.audio_input().input_receiver(); loop { let connection_info = 'data: loop { @@ -151,7 +151,7 @@ async fn listen( state .lock() //TODO change so that we only have to lock audio and not the whole state .await - .audio() + .audio_output() .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index b52b330..84bb372 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -2,7 +2,7 @@ pub mod channel; pub mod server; pub mod user; -use crate::audio::{Audio, NotificationEvents}; +use crate::audio::{AudioInput, AudioOutput, NotificationEvents}; use crate::error::StateError; use crate::network::{ConnectionInfo, VoiceStreamType}; use crate::network::tcp::{TcpEvent, TcpEventData}; @@ -57,7 +57,8 @@ pub enum StatePhase { pub struct State { config: Config, server: Option, - audio: Audio, + audio_input: AudioInput, + audio_output: AudioOutput, phase_watcher: (watch::Sender, watch::Receiver), } @@ -66,15 +67,18 @@ impl State { pub fn new() -> Result { let config = mumlib::config::read_default_cfg()?; let phase_watcher = watch::channel(StatePhase::Disconnected); - let audio = Audio::new( + let audio_input = AudioInput::new( config.audio.input_volume.unwrap_or(1.0), - config.audio.output_volume.unwrap_or(1.0), phase_watcher.1.clone(), ).map_err(|e| StateError::AudioError(e))?; + let audio_output = AudioOutput::new( + config.audio.output_volume.unwrap_or(1.0), + ).map_err(|e| StateError::AudioError(e))?; let mut state = Self { config, server: None, - audio, + audio_input, + audio_output, phase_watcher, }; state.reload_config(); @@ -176,13 +180,13 @@ impl State { let mut new_deaf = None; if let Some((mute, deafen)) = action { if server.deafened() != deafen { - self.audio.play_effect(if deafen { + self.audio_output.play_effect(if deafen { NotificationEvents::Deafen } else { NotificationEvents::Undeafen }); } else if server.muted() != mute { - self.audio.play_effect(if mute { + self.audio_output.play_effect(if mute { NotificationEvents::Mute } else { NotificationEvents::Unmute @@ -207,7 +211,7 @@ impl State { now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }))) } Command::InputVolumeSet(volume) => { - self.audio.set_input_volume(volume); + self.audio_input.set_input_volume(volume); now!(Ok(None)) } Command::MuteOther(string, toggle) => { @@ -240,7 +244,7 @@ impl State { if let Some(action) = action { user.set_suppressed(action); - self.audio.set_mute(id, action); + self.audio_output.set_mute(id, action); } return now!(Ok(None)); @@ -269,13 +273,13 @@ impl State { let mut new_mute = None; if let Some((mute, deafen)) = action { if server.deafened() != deafen { - self.audio.play_effect(if deafen { + self.audio_output.play_effect(if deafen { NotificationEvents::Deafen } else { NotificationEvents::Undeafen }); } else if server.muted() != mute { - self.audio.play_effect(if mute { + self.audio_output.play_effect(if mute { NotificationEvents::Mute } else { NotificationEvents::Unmute @@ -301,7 +305,7 @@ impl State { now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }))) } Command::OutputVolumeSet(volume) => { - self.audio.set_output_volume(volume); + self.audio_output.set_output_volume(volume); now!(Ok(None)) } Command::Ping => { @@ -367,13 +371,13 @@ impl State { } self.server = None; - self.audio.clear_clients(); + self.audio_output.clear_clients(); self.phase_watcher .0 .send(StatePhase::Disconnected) .unwrap(); - self.audio.play_effect(NotificationEvents::ServerDisconnect); + self.audio_output.play_effect(NotificationEvents::ServerDisconnect); now!(Ok(None)) } Command::ServerStatus { host, port } => ExecutionContext::Ping( @@ -420,7 +424,7 @@ impl State { Some(v) => v, }; - self.audio.set_user_volume(user_id, volume); + self.audio_output.set_user_volume(user_id, volume); now!(Ok(None)) } } @@ -453,7 +457,7 @@ impl State { *self.server_mut().unwrap().session_id_mut() = Some(session); } else { // this is someone else - self.audio_mut().add_client(session); + self.audio_output_mut().add_client(session); // send notification only if we've passed the connecting phase if matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { @@ -470,7 +474,7 @@ impl State { )); } - self.audio.play_effect(NotificationEvents::UserConnected); + self.audio_output.play_effect(NotificationEvents::UserConnected); } } } @@ -524,7 +528,7 @@ impl State { } else { warn!("{} moved to invalid channel {}", user.name(), to_channel); } - self.audio.play_effect(if from_channel == this_channel { + self.audio_output.play_effect(if from_channel == this_channel { NotificationEvents::UserJoinedChannel } else { NotificationEvents::UserLeftChannel @@ -559,13 +563,13 @@ impl State { let this_channel = self.get_users_channel(self.server().unwrap().session_id().unwrap()); let other_channel = self.get_users_channel(msg.get_session()); if this_channel == other_channel { - self.audio.play_effect(NotificationEvents::UserDisconnected); + self.audio_output.play_effect(NotificationEvents::UserDisconnected); if let Some(user) = self.server().unwrap().users().get(&msg.get_session()) { notifications::send(format!("{} disconnected", &user.name())); } } - self.audio().remove_client(msg.get_session()); + self.audio_output().remove_client(msg.get_session()); self.server_mut() .unwrap() .users_mut() @@ -581,13 +585,13 @@ impl State { Err(e) => error!("Couldn't read config: {}", e), } if let Some(input_volume) = self.config.audio.input_volume { - self.audio.set_input_volume(input_volume); + self.audio_input.set_input_volume(input_volume); } if let Some(output_volume) = self.config.audio.output_volume { - self.audio.set_output_volume(output_volume); + self.audio_output.set_output_volume(output_volume); } if let Some(sound_effects) = &self.config.audio.sound_effects { - self.audio.load_sound_effects(sound_effects); + self.audio_output.load_sound_effects(sound_effects); } } @@ -600,14 +604,20 @@ impl State { pub fn initialized(&self) { self.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); - self.audio.play_effect(NotificationEvents::ServerConnect); + self.audio_output.play_effect(NotificationEvents::ServerConnect); } - pub fn audio(&self) -> &Audio { - &self.audio + pub fn audio_input(&self) -> &AudioInput { + &self.audio_input + } + pub fn audio_output(&self) -> &AudioOutput { + &self.audio_output + } + pub fn audio_input_mut(&mut self) -> &mut AudioInput { + &mut self.audio_input } - pub fn audio_mut(&mut self) -> &mut Audio { - &mut self.audio + pub fn audio_output_mut(&mut self) -> &mut AudioOutput { + &mut self.audio_output } pub fn phase_receiver(&self) -> watch::Receiver { self.phase_watcher.1.clone() -- cgit v1.2.1 From 38270c4a2374c2ccc04597a28fb191af9d86b814 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Mon, 5 Apr 2021 13:50:58 -0300 Subject: Rename audio functions and basic indentation --- mumd/src/audio.rs | 78 ++++++++++++++++++++++++++----------------------- mumd/src/network/tcp.rs | 2 +- mumd/src/network/udp.rs | 2 +- mumd/src/state.rs | 8 ++--- 4 files changed, 48 insertions(+), 42 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 0aac68c..facca9c 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -68,10 +68,10 @@ impl TryFrom<&str> for NotificationEvents { } pub struct AudioInput { - _input_stream: cpal::Stream, + _stream: cpal::Stream, - input_channel_receiver: Arc> + Unpin>>>, - input_volume_sender: watch::Sender, + channel_receiver: Arc> + Unpin>>>, + volume_sender: watch::Sender, } impl AudioInput { @@ -136,45 +136,50 @@ impl AudioInput { .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?; let opus_stream = OpusEncoder::new( - 4, - input_config.sample_rate.0, - input_config.channels as usize, - StreamingSignalExt::into_interleaved_samples( - StreamingNoiseGate::new( - from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly - 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: i as u64, - payload: VoicePacketPayload::Opus(e.into(), false), - position_info: None, - }); + 4, + input_config.sample_rate.0, + input_config.channels as usize, + StreamingSignalExt::into_interleaved_samples( + StreamingNoiseGate::new( + from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly + 10_000 + ) + ) + ).enumerate() + .map(|(i, e)| VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: i as u64, + payload: VoicePacketPayload::Opus(e.into(), false), + position_info: None, + } + ); input_stream.play().map_err(|e| AudioError::OutputPlayError(e))?; let res = Self { - _input_stream: input_stream, - input_volume_sender, - input_channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))), + _stream: input_stream, + volume_sender: input_volume_sender, + channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))), }; Ok(res) } - pub fn input_receiver(&self) -> Arc> + Unpin>>> { - Arc::clone(&self.input_channel_receiver) + pub fn receiver(&self) -> Arc> + Unpin>>> { + Arc::clone(&self.channel_receiver) } - pub fn set_input_volume(&self, input_volume: f32) { - self.input_volume_sender.send(input_volume).unwrap(); + pub fn set_volume(&self, input_volume: f32) { + self.volume_sender.send(input_volume).unwrap(); } } pub struct AudioOutput { - output_config: StreamConfig, - _output_stream: cpal::Stream, + config: StreamConfig, + _stream: cpal::Stream, - output_volume_sender: watch::Sender, + volume_sender: watch::Sender, user_volumes: Arc>>, @@ -247,13 +252,14 @@ impl AudioOutput { ), } .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; + output_stream.play().map_err(|e| AudioError::OutputPlayError(e))?; let mut res = Self { - output_config, - _output_stream: output_stream, + config: output_config, + _stream: output_stream, client_streams, sounds: HashMap::new(), - output_volume_sender, + volume_sender: output_volume_sender, user_volumes, play_sounds, }; @@ -303,7 +309,7 @@ impl AudioOutput { .until_exhausted() // if the source audio is stereo and is being played as mono, discard the right audio .flat_map( - |e| if self.output_config.channels == 1 { + |e| if self.config.channels == 1 { vec![e[0]] } else { e.to_vec() @@ -320,7 +326,7 @@ impl AudioOutput { Entry::Occupied(mut entry) => { entry .get_mut() - .decode_packet(payload, self.output_config.channels as usize); + .decode_packet(payload, self.config.channels as usize); } Entry::Vacant(_) => { warn!("Can't find session id {}", session_id); @@ -336,8 +342,8 @@ impl AudioOutput { } Entry::Vacant(entry) => { entry.insert(output::ClientStream::new( - self.output_config.sample_rate.0, - self.output_config.channels, + self.config.sample_rate.0, + self.config.channels, )); } } @@ -364,8 +370,8 @@ impl AudioOutput { self.client_streams.lock().unwrap().clear(); } - pub fn set_output_volume(&self, output_volume: f32) { - self.output_volume_sender.send(output_volume).unwrap(); + pub fn set_volume(&self, output_volume: f32) { + self.volume_sender.send(output_volume).unwrap(); } pub fn set_user_volume(&self, id: u32, volume: f32) { diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index d2f0b41..1414318 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -108,7 +108,7 @@ pub async fn handle( let password = state_lock.password().map(|x| x.to_string()); authenticate(&mut sink, username, password).await?; let phase_watcher = state_lock.phase_receiver(); - let input_receiver = state_lock.audio_input().input_receiver(); + let input_receiver = state_lock.audio_input().receiver(); drop(state_lock); let event_queue = TcpEventQueue::new(); diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index a8e190d..8614358 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -33,7 +33,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) -> Result<(), UdpError> { - let receiver = state.lock().await.audio_input().input_receiver(); + let receiver = state.lock().await.audio_input().receiver(); loop { let connection_info = 'data: loop { diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 84bb372..313a985 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -211,7 +211,7 @@ impl State { now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }))) } Command::InputVolumeSet(volume) => { - self.audio_input.set_input_volume(volume); + self.audio_input.set_volume(volume); now!(Ok(None)) } Command::MuteOther(string, toggle) => { @@ -305,7 +305,7 @@ impl State { now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }))) } Command::OutputVolumeSet(volume) => { - self.audio_output.set_output_volume(volume); + self.audio_output.set_volume(volume); now!(Ok(None)) } Command::Ping => { @@ -585,10 +585,10 @@ impl State { Err(e) => error!("Couldn't read config: {}", e), } if let Some(input_volume) = self.config.audio.input_volume { - self.audio_input.set_input_volume(input_volume); + self.audio_input.set_volume(input_volume); } if let Some(output_volume) = self.config.audio.output_volume { - self.audio_output.set_output_volume(output_volume); + self.audio_output.set_volume(output_volume); } if let Some(sound_effects) = &self.config.audio.sound_effects { self.audio_output.load_sound_effects(sound_effects); -- cgit v1.2.1 From 12554b2f6cd89ad3cd3721bbc790d7772a21c3ae Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Wed, 7 Apr 2021 09:29:23 -0300 Subject: Create a trait and default implementation for device audio input --- mumd/src/audio.rs | 75 ++++----------------------------- mumd/src/audio/input.rs | 110 +++++++++++++++++++++++++++++++++++++++++++++++- mumd/src/error.rs | 4 ++ 3 files changed, 122 insertions(+), 67 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index facca9c..5a839bc 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -2,6 +2,7 @@ pub mod input; pub mod output; mod noise_gate; +use crate::audio::input::{DefaultAudioInputDevice, AudioInputDevice}; use crate::audio::output::SaturatingAdd; use crate::audio::noise_gate::{from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt}; use crate::error::{AudioError, AudioStream}; @@ -68,80 +69,23 @@ impl TryFrom<&str> for NotificationEvents { } pub struct AudioInput { - _stream: cpal::Stream, + _device: DefaultAudioInputDevice, channel_receiver: Arc> + Unpin>>>, - volume_sender: watch::Sender, } impl AudioInput { pub fn new(input_volume: f32, phase_watcher: watch::Receiver) -> Result { + let mut default = DefaultAudioInputDevice::new(input_volume, phase_watcher)?; let sample_rate = SampleRate(SAMPLE_RATE); - let host = cpal::default_host(); - - let input_device = host - .default_input_device() - .ok_or(AudioError::NoDevice(AudioStream::Input))?; - let input_supported_config = input_device - .supported_input_configs() - .map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))? - .find_map(|c| { - if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { - Some(c) - } else { - None - } - }) - .ok_or(AudioError::NoSupportedConfig(AudioStream::Input))? - .with_sample_rate(sample_rate); - let input_supported_sample_format = input_supported_config.sample_format(); - let input_config: StreamConfig = input_supported_config.into(); - - let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); - - let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000); - - let (input_volume_sender, input_volume_receiver) = watch::channel::(input_volume); - - let input_stream = match input_supported_sample_format { - SampleFormat::F32 => input_device.build_input_stream( - &input_config, - input::callback::( - sample_sender, - input_volume_receiver, - phase_watcher, - ), - err_fn, - ), - SampleFormat::I16 => input_device.build_input_stream( - &input_config, - input::callback::( - sample_sender, - input_volume_receiver, - phase_watcher, - ), - err_fn, - ), - SampleFormat::U16 => input_device.build_input_stream( - &input_config, - input::callback::( - sample_sender, - input_volume_receiver, - phase_watcher, - ), - err_fn, - ), - } - .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?; - let opus_stream = OpusEncoder::new( 4, - input_config.sample_rate.0, - input_config.channels as usize, + sample_rate.0, + default.get_num_channels(), StreamingSignalExt::into_interleaved_samples( StreamingNoiseGate::new( - from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly + from_interleaved_samples_stream::<_, f32>(default.sample_receiver()), //TODO group frames correctly 10_000 ) ) @@ -156,11 +100,10 @@ impl AudioInput { } ); - input_stream.play().map_err(|e| AudioError::OutputPlayError(e))?; + default.play()?; let res = Self { - _stream: input_stream, - volume_sender: input_volume_sender, + _device: default, channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))), }; Ok(res) @@ -171,7 +114,7 @@ impl AudioInput { } pub fn set_volume(&self, input_volume: f32) { - self.volume_sender.send(input_volume).unwrap(); + self._device.set_volume(input_volume); } } diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 176747d..f4e9c4c 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -1,8 +1,11 @@ -use cpal::{InputCallbackInfo, Sample}; +use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use tokio::sync::watch; use log::*; use crate::state::StatePhase; +use crate::audio::SAMPLE_RATE; +use crate::error::{AudioError, AudioStream}; pub fn callback( mut input_sender: futures_channel::mpsc::Sender, @@ -24,3 +27,108 @@ pub fn callback( } } } + +pub trait AudioInputDevice { + fn play(&self) -> Result<(), AudioError>; + fn pause(&self) -> Result<(), AudioError>; + fn set_volume(&self, volume: f32); + fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver; + fn get_num_channels(&self) -> usize; +} + +pub struct DefaultAudioInputDevice { + _stream: cpal::Stream, + sample_receiver: Option>, + volume_sender: watch::Sender, + channels: u16, +} + +impl DefaultAudioInputDevice { + pub fn new(input_volume: f32, phase_watcher: watch::Receiver) -> Result { + let sample_rate = SampleRate(SAMPLE_RATE); + + let host = cpal::default_host(); + + let input_device = host + .default_input_device() + .ok_or(AudioError::NoDevice(AudioStream::Input))?; + let input_supported_config = input_device + .supported_input_configs() + .map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))? + .find_map(|c| { + if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { + Some(c) + } else { + None + } + }) + .ok_or(AudioError::NoSupportedConfig(AudioStream::Input))? + .with_sample_rate(sample_rate); + let input_supported_sample_format = input_supported_config.sample_format(); + let input_config: StreamConfig = input_supported_config.into(); + + let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); + + let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000); + + let (volume_sender, input_volume_receiver) = watch::channel::(input_volume); + + let input_stream = match input_supported_sample_format { + SampleFormat::F32 => input_device.build_input_stream( + &input_config, + callback::( + sample_sender, + input_volume_receiver, + phase_watcher, + ), + err_fn, + ), + SampleFormat::I16 => input_device.build_input_stream( + &input_config, + callback::( + sample_sender, + input_volume_receiver, + phase_watcher, + ), + err_fn, + ), + SampleFormat::U16 => input_device.build_input_stream( + &input_config, + callback::( + sample_sender, + input_volume_receiver, + phase_watcher, + ), + err_fn, + ), + } + .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?; + + let res = Self { + _stream: input_stream, + sample_receiver: Some(sample_receiver), + volume_sender, + channels: input_config.channels, + }; + Ok(res) + } +} + +impl AudioInputDevice for DefaultAudioInputDevice { + fn play(&self) -> Result<(), AudioError> { + self._stream.play().map_err(|e| AudioError::InputPlayError(e)) + } + fn pause(&self) -> Result<(), AudioError> { + self._stream.pause().map_err(|e| AudioError::InputPauseError(e)) + } + fn set_volume(&self, volume: f32) { + self.volume_sender.send(volume).unwrap(); + } + fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver { + let ret = self.sample_receiver.take(); + ret.unwrap() + } + fn get_num_channels(&self) -> usize { + self.channels as usize + } +} diff --git a/mumd/src/error.rs b/mumd/src/error.rs index f7818a1..2887dd0 100644 --- a/mumd/src/error.rs +++ b/mumd/src/error.rs @@ -86,6 +86,8 @@ pub enum AudioError { NoSupportedConfig(AudioStream), InvalidStream(AudioStream, cpal::BuildStreamError), OutputPlayError(cpal::PlayStreamError), + InputPlayError(cpal::PlayStreamError), + InputPauseError(cpal::PauseStreamError), } impl fmt::Display for AudioError { @@ -96,6 +98,8 @@ impl fmt::Display for AudioError { AudioError::NoSupportedConfig(s) => write!(f, "No supported {} config found", s), AudioError::InvalidStream(s, e) => write!(f, "Invalid {} stream: {}", s, e), AudioError::OutputPlayError(e) => write!(f, "Playback error: {}", e), + AudioError::InputPlayError(e) => write!(f, "Recording error: {}", e), + AudioError::InputPauseError(e) => write!(f, "Recording error: {}", e), } } } -- cgit v1.2.1 From 7fed8f81222de570d864487605e42b5cbb023218 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Thu, 8 Apr 2021 16:14:48 -0300 Subject: Move audio decode logic to ClientStream This way is possible to deduplicate the opus::Decoder used by audio output. The audio effects and client network streams are unified in only one place, allowing the Audio Output device to consume the Samples with only one call. --- mumd/src/audio.rs | 71 ++++++------------------------- mumd/src/audio/output.rs | 108 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 96 insertions(+), 83 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 5a839bc..814050b 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -3,7 +3,7 @@ pub mod output; mod noise_gate; use crate::audio::input::{DefaultAudioInputDevice, AudioInputDevice}; -use crate::audio::output::SaturatingAdd; +use crate::audio::output::ClientStream; use crate::audio::noise_gate::{from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt}; use crate::error::{AudioError, AudioStream}; use crate::network::VoiceStreamType; @@ -20,7 +20,7 @@ use mumble_protocol::Serverbound; use mumble_protocol::voice::{VoicePacketPayload, VoicePacket}; use mumlib::config::SoundEffect; use std::borrow::Cow; -use std::collections::{hash_map::Entry, HashMap, VecDeque}; +use std::collections::{hash_map::Entry, HashMap}; use std::convert::TryFrom; use std::fmt::Debug; use std::fs::File; @@ -126,10 +126,9 @@ pub struct AudioOutput { user_volumes: Arc>>, - client_streams: Arc>>, + client_streams: Arc>, sounds: HashMap>, - play_sounds: Arc>>, } impl AudioOutput { @@ -159,14 +158,12 @@ impl AudioOutput { let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); let (output_volume_sender, output_volume_receiver) = watch::channel::(output_volume); - let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new())); - let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new())); + let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(sample_rate.0, output_config.channels))); let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( &output_config, output::curry_callback::( - Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), @@ -176,7 +173,6 @@ impl AudioOutput { SampleFormat::I16 => output_device.build_output_stream( &output_config, output::curry_callback::( - Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), @@ -186,7 +182,6 @@ impl AudioOutput { SampleFormat::U16 => output_device.build_output_stream( &output_config, output::curry_callback::( - Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), @@ -204,7 +199,6 @@ impl AudioOutput { sounds: HashMap::new(), volume_sender: output_volume_sender, user_volumes, - play_sounds, }; res.load_sound_effects(&[]); Ok(res) @@ -265,52 +259,23 @@ impl AudioOutput { } pub fn decode_packet_payload(&self, stream_type: VoiceStreamType, session_id: u32, payload: VoicePacketPayload) { - match self.client_streams.lock().unwrap().entry((stream_type, session_id)) { - Entry::Occupied(mut entry) => { - entry - .get_mut() - .decode_packet(payload, self.config.channels as usize); - } - Entry::Vacant(_) => { - warn!("Can't find session id {}", session_id); - } - } + self.client_streams.lock().unwrap().decode_packet( + Some((stream_type, session_id)), + payload, + self.config.channels as usize, + ); } pub fn add_client(&self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) { - Entry::Occupied(_) => { - warn!("Session id {} already exists", session_id); - } - Entry::Vacant(entry) => { - entry.insert(output::ClientStream::new( - self.config.sample_rate.0, - self.config.channels, - )); - } - } - } + self.client_streams.lock().unwrap().add_client(session_id); } pub fn remove_client(&self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) { - Entry::Occupied(entry) => { - entry.remove(); - } - Entry::Vacant(_) => { - warn!( - "Tried to remove session id {} that doesn't exist", - session_id - ); - } - } - } + self.client_streams.lock().unwrap().remove_client(session_id); } - pub fn clear_clients(&mut self) { - self.client_streams.lock().unwrap().clear(); + pub fn clear_clients(&self) { + self.client_streams.lock().unwrap().clear_clients(); } pub fn set_volume(&self, output_volume: f32) { @@ -341,15 +306,7 @@ impl AudioOutput { pub fn play_effect(&self, effect: NotificationEvents) { let samples = self.sounds.get(&effect).unwrap(); - - let mut play_sounds = self.play_sounds.lock().unwrap(); - - for (val, e) in play_sounds.iter_mut().zip(samples.iter()) { - *val = val.saturating_add(*e); - } - - let l = play_sounds.len(); - play_sounds.extend(samples.iter().skip(l)); + self.client_streams.lock().unwrap().extend(None, samples); } } diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 421d395..797cf84 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,22 +1,29 @@ use crate::network::VoiceStreamType; +use log::*; use cpal::{OutputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; use opus::Channels; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, VecDeque, hash_map::Entry}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; +type ClientStreamKey = Option<(VoiceStreamType, u32)>; +type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque>; + pub struct ClientStream { - buffer: VecDeque, //TODO ring buffer? + buffer: HashMap>, //TODO ring buffer? opus_decoder: opus::Decoder, } impl ClientStream { pub fn new(sample_rate: u32, channels: u16) -> Self { + let mut buffer = HashMap::new(); + //None is the system audio effects + buffer.insert(None, VecDeque::new()); Self { - buffer: VecDeque::new(), + buffer, opus_decoder: opus::Decoder::new( sample_rate, match channels { @@ -29,7 +36,7 @@ impl ClientStream { } } - pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) { + pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { let mut out: Vec = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode @@ -38,13 +45,67 @@ impl ClientStream { .decode_float(&bytes, &mut out, false) .expect("Error decoding"); out.truncate(parsed); - self.buffer.extend(out); + self.extend(client, &out); } _ => { unimplemented!("Payload type not supported"); } } } + + pub fn consume<'a, F>(&mut self, consumer: F) + where + F: FnOnce(ConsumerInput) { + let iter = self.buffer.iter_mut(); + consumer(iter); + } + + pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { + match self.buffer.entry(key) { + Entry::Occupied(mut entry) => { + entry.get_mut().extend(values.iter().copied()); + } + Entry::Vacant(_) => { + match key { + None => warn!("Can't find session None"), + Some(key) => warn!("Can't find session id {}", key.1), + } + } + } + } + + pub fn add_client(&mut self, session_id: u32) { + for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { + match self.buffer.entry(Some((*stream_type, session_id))) { + Entry::Occupied(_) => { + warn!("Session id {} already exists", session_id); + } + Entry::Vacant(entry) => { + entry.insert(VecDeque::new()); + } + } + } + } + + pub fn remove_client(&mut self, session_id: u32) { + for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { + match self.buffer.entry(Some((*stream_type, session_id))) { + Entry::Occupied(entry) => { + entry.remove(); + } + Entry::Vacant(_) => { + warn!( + "Tried to remove session id {} that doesn't exist", + session_id + ); + } + } + } + } + + pub fn clear_clients(&mut self) { + self.buffer.retain(|k , _| k.is_none()); + } } pub trait SaturatingAdd { @@ -74,8 +135,7 @@ impl SaturatingAdd for u16 { } pub fn curry_callback( - effect_sound: Arc>>, - user_bufs: Arc>>, + user_bufs: Arc>, output_volume_receiver: watch::Receiver, user_volumes: Arc>>, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { @@ -86,27 +146,23 @@ pub fn curry_callback let volume = *output_volume_receiver.borrow(); - let mut effects_sound = effect_sound.lock().unwrap(); let mut user_bufs = user_bufs.lock().unwrap(); - for ((_, id), client_stream) in &mut *user_bufs { - let (user_volume, muted) = user_volumes - .lock() - .unwrap() - .get(id) - .cloned() - .unwrap_or((1.0, false)); - for sample in data.iter_mut() { - let s = client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume; - if !muted { - *sample = sample.saturating_add(Sample::from(&s)); + let user_volumes = user_volumes.lock().unwrap(); + let mut saturating_add = |input: ConsumerInput| { + for (k, v) in input { + let (user_volume, muted) = match k { + Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)), + None => (1.0, false), + }; + for sample in data.iter_mut() { + if !muted { + *sample = sample.saturating_add(Sample::from( + &(v.pop_front().unwrap_or(0.0) * volume * user_volume), + )); + } } } - } - - for sample in data.iter_mut() { - *sample = sample.saturating_add(Sample::from( - &(effects_sound.pop_front().unwrap_or(0.0) * volume), - )); - } + }; + user_bufs.consume(&mut saturating_add); } } -- cgit v1.2.1 From 07d06b6946e23ecffbf5549376cf464013222274 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Thu, 8 Apr 2021 17:14:29 -0300 Subject: Create a trait and default device audio output Also removed add/remove/clear client from audio interface, it is done on demand now. --- mumd/src/audio.rs | 100 +++++----------------------- mumd/src/audio/output.rs | 165 +++++++++++++++++++++++++++++++++-------------- mumd/src/error.rs | 2 + mumd/src/state.rs | 4 -- 4 files changed, 136 insertions(+), 135 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 814050b..28a0707 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -3,14 +3,13 @@ pub mod output; mod noise_gate; use crate::audio::input::{DefaultAudioInputDevice, AudioInputDevice}; -use crate::audio::output::ClientStream; +use crate::audio::output::{DefaultAudioOutputDevice, AudioOutputDevice, ClientStream}; use crate::audio::noise_gate::{from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt}; -use crate::error::{AudioError, AudioStream}; +use crate::error::AudioError; use crate::network::VoiceStreamType; use crate::state::StatePhase; -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{SampleFormat, SampleRate, StreamConfig}; +use cpal::SampleRate; use dasp_interpolate::linear::Linear; use dasp_signal::{self as signal, Signal}; use futures_util::stream::Stream; @@ -119,11 +118,7 @@ impl AudioInput { } pub struct AudioOutput { - config: StreamConfig, - _stream: cpal::Stream, - - volume_sender: watch::Sender, - + device: DefaultAudioOutputDevice, user_volumes: Arc>>, client_streams: Arc>, @@ -133,71 +128,20 @@ pub struct AudioOutput { impl AudioOutput { pub fn new(output_volume: f32) -> Result { - let sample_rate = SampleRate(SAMPLE_RATE); - - let host = cpal::default_host(); - let output_device = host - .default_output_device() - .ok_or(AudioError::NoDevice(AudioStream::Output))?; - let output_supported_config = output_device - .supported_output_configs() - .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))? - .find_map(|c| { - if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { - Some(c) - } else { - None - } - }) - .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))? - .with_sample_rate(sample_rate); - let output_supported_sample_format = output_supported_config.sample_format(); - let output_config: StreamConfig = output_supported_config.into(); - - let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); - let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); - let (output_volume_sender, output_volume_receiver) = watch::channel::(output_volume); - let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(sample_rate.0, output_config.channels))); - let output_stream = match output_supported_sample_format { - SampleFormat::F32 => output_device.build_output_stream( - &output_config, - output::curry_callback::( - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - SampleFormat::I16 => output_device.build_output_stream( - &output_config, - output::curry_callback::( - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - SampleFormat::U16 => output_device.build_output_stream( - &output_config, - output::curry_callback::( - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - } - .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; - output_stream.play().map_err(|e| AudioError::OutputPlayError(e))?; + let default = DefaultAudioOutputDevice::new( + output_volume, + Arc::clone(&user_volumes), + )?; + default.play()?; + + let client_streams = default.get_client_streams(); let mut res = Self { - config: output_config, - _stream: output_stream, - client_streams, + device: default, sounds: HashMap::new(), - volume_sender: output_volume_sender, + client_streams, user_volumes, }; res.load_sound_effects(&[]); @@ -246,7 +190,7 @@ impl AudioOutput { .until_exhausted() // if the source audio is stereo and is being played as mono, discard the right audio .flat_map( - |e| if self.config.channels == 1 { + |e| if self.device.get_num_channels() == 1 { vec![e[0]] } else { e.to_vec() @@ -262,24 +206,12 @@ impl AudioOutput { self.client_streams.lock().unwrap().decode_packet( Some((stream_type, session_id)), payload, - self.config.channels as usize, + self.device.get_num_channels(), ); } - pub fn add_client(&self, session_id: u32) { - self.client_streams.lock().unwrap().add_client(session_id); - } - - pub fn remove_client(&self, session_id: u32) { - self.client_streams.lock().unwrap().remove_client(session_id); - } - - pub fn clear_clients(&self) { - self.client_streams.lock().unwrap().clear_clients(); - } - pub fn set_volume(&self, output_volume: f32) { - self.volume_sender.send(output_volume).unwrap(); + self.device.set_volume(output_volume); } pub fn set_user_volume(&self, id: u32, volume: f32) { diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 797cf84..3664be8 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,10 +1,13 @@ use crate::network::VoiceStreamType; -use log::*; +use crate::audio::SAMPLE_RATE; +use crate::error::{AudioError, AudioStream}; -use cpal::{OutputCallbackInfo, Sample}; +use log::*; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use cpal::{SampleFormat, SampleRate, StreamConfig, OutputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; use opus::Channels; -use std::collections::{HashMap, VecDeque, hash_map::Entry}; +use std::collections::{HashMap, VecDeque}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; @@ -58,53 +61,13 @@ impl ClientStream { F: FnOnce(ConsumerInput) { let iter = self.buffer.iter_mut(); consumer(iter); + //remove empty Vec + self.buffer.retain(|_, v| v.is_empty()); } pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { - match self.buffer.entry(key) { - Entry::Occupied(mut entry) => { - entry.get_mut().extend(values.iter().copied()); - } - Entry::Vacant(_) => { - match key { - None => warn!("Can't find session None"), - Some(key) => warn!("Can't find session id {}", key.1), - } - } - } - } - - pub fn add_client(&mut self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.buffer.entry(Some((*stream_type, session_id))) { - Entry::Occupied(_) => { - warn!("Session id {} already exists", session_id); - } - Entry::Vacant(entry) => { - entry.insert(VecDeque::new()); - } - } - } - } - - pub fn remove_client(&mut self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.buffer.entry(Some((*stream_type, session_id))) { - Entry::Occupied(entry) => { - entry.remove(); - } - Entry::Vacant(_) => { - warn!( - "Tried to remove session id {} that doesn't exist", - session_id - ); - } - } - } - } - - pub fn clear_clients(&mut self) { - self.buffer.retain(|k , _| k.is_none()); + let entry = self.buffer.entry(key).or_insert(VecDeque::new()); + entry.extend(values.iter().copied()); } } @@ -134,6 +97,114 @@ impl SaturatingAdd for u16 { } } +pub trait AudioOutputDevice { + fn play(&self) -> Result<(), AudioError>; + fn pause(&self) -> Result<(), AudioError>; + fn set_volume(&self, volume: f32); + fn get_num_channels(&self) -> usize; + fn get_client_streams(&self) -> Arc>; +} + +pub struct DefaultAudioOutputDevice { + config: StreamConfig, + _stream: cpal::Stream, + client_streams: Arc>, + volume_sender: watch::Sender, +} + +impl DefaultAudioOutputDevice { + pub fn new( + output_volume: f32, + user_volumes: Arc>>, + ) -> Result { + let sample_rate = SampleRate(SAMPLE_RATE); + + let host = cpal::default_host(); + let output_device = host + .default_output_device() + .ok_or(AudioError::NoDevice(AudioStream::Output))?; + let output_supported_config = output_device + .supported_output_configs() + .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))? + .find_map(|c| { + if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { + Some(c) + } else { + None + } + }) + .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))? + .with_sample_rate(sample_rate); + let output_supported_sample_format = output_supported_config.sample_format(); + let output_config: StreamConfig = output_supported_config.into(); + let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(sample_rate.0, output_config.channels))); + + let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); + + let (output_volume_sender, output_volume_receiver) = watch::channel::(output_volume); + + let output_stream = match output_supported_sample_format { + SampleFormat::F32 => output_device.build_output_stream( + &output_config, + curry_callback::( + Arc::clone(&client_streams), + output_volume_receiver, + user_volumes, + ), + err_fn, + ), + SampleFormat::I16 => output_device.build_output_stream( + &output_config, + curry_callback::( + Arc::clone(&client_streams), + output_volume_receiver, + user_volumes, + ), + err_fn, + ), + SampleFormat::U16 => output_device.build_output_stream( + &output_config, + curry_callback::( + Arc::clone(&client_streams), + output_volume_receiver, + user_volumes, + ), + err_fn, + ), + } + .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; + + Ok(Self { + config: output_config, + _stream: output_stream, + volume_sender: output_volume_sender, + client_streams, + }) + } +} + +impl AudioOutputDevice for DefaultAudioOutputDevice { + fn play(&self) -> Result<(), AudioError> { + self._stream.play().map_err(|e| AudioError::OutputPlayError(e)) + } + + fn pause(&self) -> Result<(), AudioError> { + self._stream.pause().map_err(|e| AudioError::OutputPauseError(e)) + } + + fn set_volume(&self, volume: f32) { + self.volume_sender.send(volume).unwrap(); + } + + fn get_num_channels(&self) -> usize { + self.config.channels as usize + } + + fn get_client_streams(&self) -> Arc> { + Arc::clone(&self.client_streams) + } +} + pub fn curry_callback( user_bufs: Arc>, output_volume_receiver: watch::Receiver, diff --git a/mumd/src/error.rs b/mumd/src/error.rs index 2887dd0..eb63df8 100644 --- a/mumd/src/error.rs +++ b/mumd/src/error.rs @@ -86,6 +86,7 @@ pub enum AudioError { NoSupportedConfig(AudioStream), InvalidStream(AudioStream, cpal::BuildStreamError), OutputPlayError(cpal::PlayStreamError), + OutputPauseError(cpal::PauseStreamError), InputPlayError(cpal::PlayStreamError), InputPauseError(cpal::PauseStreamError), } @@ -98,6 +99,7 @@ impl fmt::Display for AudioError { AudioError::NoSupportedConfig(s) => write!(f, "No supported {} config found", s), AudioError::InvalidStream(s, e) => write!(f, "Invalid {} stream: {}", s, e), AudioError::OutputPlayError(e) => write!(f, "Playback error: {}", e), + AudioError::OutputPauseError(e) => write!(f, "Playback error: {}", e), AudioError::InputPlayError(e) => write!(f, "Recording error: {}", e), AudioError::InputPauseError(e) => write!(f, "Recording error: {}", e), } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 313a985..132da74 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -371,7 +371,6 @@ impl State { } self.server = None; - self.audio_output.clear_clients(); self.phase_watcher .0 @@ -457,8 +456,6 @@ impl State { *self.server_mut().unwrap().session_id_mut() = Some(session); } else { // this is someone else - self.audio_output_mut().add_client(session); - // send notification only if we've passed the connecting phase if matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { let channel_id = msg.get_channel_id(); @@ -569,7 +566,6 @@ impl State { } } - self.audio_output().remove_client(msg.get_session()); self.server_mut() .unwrap() .users_mut() -- cgit v1.2.1 From a39934e562fd2755fcb7b1ed271bcf3f31aaa0d5 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Mon, 5 Apr 2021 11:04:19 -0300 Subject: Replace State tokio::sync::Mutex by std::sync::RwLock --- mumd/src/client.rs | 6 +++--- mumd/src/command.rs | 8 ++++---- mumd/src/network/tcp.rs | 28 ++++++++++++++-------------- mumd/src/network/udp.rs | 28 +++++++++++++++++++--------- 4 files changed, 40 insertions(+), 30 deletions(-) diff --git a/mumd/src/client.rs b/mumd/src/client.rs index c1a0152..9c2c2a0 100644 --- a/mumd/src/client.rs +++ b/mumd/src/client.rs @@ -6,8 +6,8 @@ use crate::state::State; use futures_util::{select, FutureExt}; use mumble_protocol::{Serverbound, control::ControlPacket, crypt::ClientCryptState}; use mumlib::command::{Command, CommandResponse}; -use std::sync::Arc; -use tokio::sync::{Mutex, mpsc, oneshot, watch}; +use std::sync::{Arc, RwLock}; +use tokio::sync::{mpsc, oneshot, watch}; pub async fn handle( state: State, @@ -27,7 +27,7 @@ pub async fn handle( let (response_sender, response_receiver) = mpsc::unbounded_channel(); - let state = Arc::new(Mutex::new(state)); + let state = Arc::new(RwLock::new(state)); select! { r = tcp::handle( diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 3e462b1..7eec388 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -8,11 +8,11 @@ use crate::state::{ExecutionContext, State}; use log::*; use mumble_protocol::{Serverbound, control::ControlPacket}; use mumlib::command::{Command, CommandResponse}; -use std::sync::Arc; -use tokio::sync::{mpsc, oneshot, watch, Mutex}; +use std::sync::{Arc, RwLock}; +use tokio::sync::{mpsc, oneshot, watch}; pub async fn handle( - state: Arc>, + state: Arc>, mut command_receiver: mpsc::UnboundedReceiver<( Command, oneshot::Sender>>, @@ -25,7 +25,7 @@ pub async fn handle( debug!("Begin listening for commands"); while let Some((command, response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); - let mut state = state.lock().await; + let mut state = state.write().unwrap(); let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender); drop(state); match event { diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 1414318..c5eded2 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -10,7 +10,7 @@ use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPa use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; use mumble_protocol::{Clientbound, Serverbound}; -use std::collections::HashMap; +use std::{collections::HashMap, sync::RwLock}; use std::convert::{Into, TryInto}; use std::net::SocketAddr; use std::sync::Arc; @@ -79,7 +79,7 @@ impl TcpEventQueue { } pub async fn handle( - state: Arc>, + state: Arc>, mut connection_info_receiver: watch::Receiver>, crypt_state_sender: mpsc::Sender, packet_sender: mpsc::UnboundedSender>, @@ -103,7 +103,7 @@ pub async fn handle( .await?; // Handshake (omitting `Version` message for brevity) - let state_lock = state.lock().await; + let state_lock = state.read().unwrap(); let username = state_lock.username().unwrap().to_string(); let password = state_lock.password().map(|x| x.to_string()); authenticate(&mut sink, username, password).await?; @@ -241,7 +241,7 @@ async fn send_voice( } async fn listen( - state: Arc>, + state: Arc>, mut stream: TcpReceiver, crypt_state_sender: mpsc::Sender, event_queue: TcpEventQueue, @@ -260,7 +260,7 @@ async fn listen( // We end up here if the login was rejected. We probably want // to exit before that. warn!("TCP stream gone"); - state.lock().await.broadcast_phase(StatePhase::Disconnected); + state.read().unwrap().broadcast_phase(StatePhase::Disconnected); break; } }; @@ -299,7 +299,7 @@ async fn listen( .await; } event_queue.resolve(TcpEventData::Connected(Ok(&msg))).await; - let mut state = state.lock().await; + let mut state = state.write().unwrap(); let server = state.server_mut().unwrap(); server.parse_server_sync(*msg); match &server.welcome_text { @@ -323,24 +323,24 @@ async fn listen( } } ControlPacket::UserState(msg) => { - state.lock().await.parse_user_state(*msg); + state.write().unwrap().parse_user_state(*msg); } ControlPacket::UserRemove(msg) => { - state.lock().await.remove_client(*msg); + state.write().unwrap().remove_client(*msg); } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); state - .lock() - .await + .write() + .unwrap() .server_mut() .unwrap() .parse_channel_state(*msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { state - .lock() - .await + .write() + .unwrap() .server_mut() .unwrap() .parse_channel_remove(*msg); @@ -356,8 +356,8 @@ async fn listen( .. } => { state - .lock() - .await + .read() + .unwrap() .audio_output() .decode_packet_payload( VoiceStreamType::TCP, diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 8614358..f24d4b4 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -9,7 +9,7 @@ use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; use mumble_protocol::voice::VoicePacket; use mumble_protocol::Serverbound; -use std::collections::HashMap; +use std::{collections::HashMap, sync::RwLock}; use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; use std::rc::Rc; @@ -29,11 +29,15 @@ type UdpSender = SplitSink, (VoicePacket>; pub async fn handle( - state: Arc>, + state: Arc>, mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) -> Result<(), UdpError> { +<<<<<<< HEAD let receiver = state.lock().await.audio_input().receiver(); +======= + let receiver = state.read().unwrap().audio().input_receiver(); +>>>>>>> 48f0d38 (Replace State tokio::sync::Mutex by std::sync::RwLock) loop { let connection_info = 'data: loop { @@ -49,7 +53,7 @@ pub async fn handle( let sink = Arc::new(Mutex::new(sink)); let source = Arc::new(Mutex::new(source)); - let phase_watcher = state.lock().await.phase_receiver(); + let phase_watcher = state.read().unwrap().phase_receiver(); let last_ping_recv = AtomicU64::new(0); run_until( @@ -119,7 +123,7 @@ async fn new_crypt_state( } async fn listen( - state: Arc>, + state: Arc>, source: Arc>, last_ping_recv: &AtomicU64, ) { @@ -136,8 +140,8 @@ async fn listen( match packet { VoicePacket::Ping { timestamp } => { state - .lock() //TODO clean up unnecessary lock by only updating phase if it should change - .await + .read() + .unwrap() .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP)); last_ping_recv.store(timestamp, Ordering::Relaxed); } @@ -149,9 +153,15 @@ async fn listen( .. } => { state +<<<<<<< HEAD .lock() //TODO change so that we only have to lock audio and not the whole state .await .audio_output() +======= + .read() + .unwrap() + .audio() +>>>>>>> 48f0d38 (Replace State tokio::sync::Mutex by std::sync::RwLock) .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } @@ -159,7 +169,7 @@ async fn listen( } async fn send_pings( - state: Arc>, + state: Arc>, sink: Arc>, server_addr: SocketAddr, last_ping_recv: &AtomicU64, @@ -173,8 +183,8 @@ async fn send_pings( if last_send.is_some() && last_send.unwrap() != last_recv { debug!("Sending TCP voice"); state - .lock() - .await + .read() + .unwrap() .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); } match sink -- cgit v1.2.1 From 14b0ce912735243e566a95838e9ed0c93a2e7f3e Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Fri, 9 Apr 2021 15:02:51 -0300 Subject: Resolved merge upstream conflicts --- mumd/src/network/udp.rs | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index f24d4b4..11a3f27 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -33,11 +33,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) -> Result<(), UdpError> { -<<<<<<< HEAD - let receiver = state.lock().await.audio_input().receiver(); -======= - let receiver = state.read().unwrap().audio().input_receiver(); ->>>>>>> 48f0d38 (Replace State tokio::sync::Mutex by std::sync::RwLock) + let receiver = state.read().unwrap().audio_input().receiver(); loop { let connection_info = 'data: loop { @@ -153,15 +149,9 @@ async fn listen( .. } => { state -<<<<<<< HEAD - .lock() //TODO change so that we only have to lock audio and not the whole state - .await - .audio_output() -======= .read() .unwrap() - .audio() ->>>>>>> 48f0d38 (Replace State tokio::sync::Mutex by std::sync::RwLock) + .audio_output() .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } -- cgit v1.2.1 From 7c5f60f210bfd05ea22d3a65f04e245989fdaade Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Tue, 6 Apr 2021 15:18:55 -0300 Subject: Resolved merge upstream conflicts --- mumd/src/network/tcp.rs | 20 ++++++++++++-------- mumd/src/network/udp.rs | 4 ++-- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index c5eded2..7606987 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -10,10 +10,10 @@ use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPa use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; use mumble_protocol::{Clientbound, Serverbound}; -use std::{collections::HashMap, sync::RwLock}; +use std::collections::HashMap; use std::convert::{Into, TryInto}; use std::net::SocketAddr; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use tokio::net::TcpStream; use tokio::sync::{mpsc, watch, Mutex}; use tokio::time::{self, Duration}; @@ -103,13 +103,17 @@ pub async fn handle( .await?; // Handshake (omitting `Version` message for brevity) - let state_lock = state.read().unwrap(); - let username = state_lock.username().unwrap().to_string(); - let password = state_lock.password().map(|x| x.to_string()); + let (username, password) = { + let state_lock = state.read().unwrap(); + (state_lock.username().unwrap().to_string(), + state_lock.password().map(|x| x.to_string())) + }; authenticate(&mut sink, username, password).await?; - let phase_watcher = state_lock.phase_receiver(); - let input_receiver = state_lock.audio_input().receiver(); - drop(state_lock); + let (phase_watcher, input_receiver) = { + let state_lock = state.read().unwrap(); + (state_lock.phase_receiver(), + state_lock.audio_input().receiver()) + }; let event_queue = TcpEventQueue::new(); info!("Logging in..."); diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 11a3f27..3ca77af 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -9,12 +9,12 @@ use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; use mumble_protocol::voice::VoicePacket; use mumble_protocol::Serverbound; -use std::{collections::HashMap, sync::RwLock}; +use std::collections::HashMap; use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use tokio::{join, net::UdpSocket}; use tokio::sync::{mpsc, watch, Mutex}; use tokio::time::{interval, Duration}; -- cgit v1.2.1 From ab649ca21286baf182c18d11450bf58d25af0c84 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Sat, 10 Apr 2021 07:39:46 -0300 Subject: Ops, forgot to fix upstream conflicts --- mumd/src/network/udp.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 086e072..3ca77af 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -33,11 +33,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) -> Result<(), UdpError> { -<<<<<<< HEAD let receiver = state.read().unwrap().audio_input().receiver(); -======= - let receiver = state.read().unwrap().audio().input_receiver(); ->>>>>>> main loop { let connection_info = 'data: loop { @@ -155,11 +151,7 @@ async fn listen( state .read() .unwrap() -<<<<<<< HEAD .audio_output() -======= - .audio() ->>>>>>> main .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } -- cgit v1.2.1 From 3caae1e9e17524cd2fdedc39c075ceda231cf0e1 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Sat, 10 Apr 2021 07:59:29 -0300 Subject: Fix audio output being cut Where did this `!` go? My keyboard have a problem? --- mumd/src/audio/output.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 3664be8..df9b2e2 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -62,7 +62,7 @@ impl ClientStream { let iter = self.buffer.iter_mut(); consumer(iter); //remove empty Vec - self.buffer.retain(|_, v| v.is_empty()); + self.buffer.retain(|_, v| !v.is_empty()); } pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { -- cgit v1.2.1 From 727710ae7e3ac8c35d66e0431682a2a90f2bd3a4 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Sat, 10 Apr 2021 14:43:21 -0300 Subject: Restore multiple decoders --- mumd/src/audio.rs | 15 +++---- mumd/src/audio/input.rs | 12 ++--- mumd/src/audio/output.rs | 115 ++++++++++++++++++++++++----------------------- 3 files changed, 71 insertions(+), 71 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 28a0707..df7af70 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -68,7 +68,7 @@ impl TryFrom<&str> for NotificationEvents { } pub struct AudioInput { - _device: DefaultAudioInputDevice, + device: DefaultAudioInputDevice, channel_receiver: Arc> + Unpin>>>, } @@ -81,7 +81,7 @@ impl AudioInput { let opus_stream = OpusEncoder::new( 4, sample_rate.0, - default.get_num_channels(), + default.num_channels(), StreamingSignalExt::into_interleaved_samples( StreamingNoiseGate::new( from_interleaved_samples_stream::<_, f32>(default.sample_receiver()), //TODO group frames correctly @@ -102,7 +102,7 @@ impl AudioInput { default.play()?; let res = Self { - _device: default, + device: default, channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))), }; Ok(res) @@ -113,7 +113,7 @@ impl AudioInput { } pub fn set_volume(&self, input_volume: f32) { - self._device.set_volume(input_volume); + self.device.set_volume(input_volume); } } @@ -136,7 +136,7 @@ impl AudioOutput { )?; default.play()?; - let client_streams = default.get_client_streams(); + let client_streams = default.client_streams(); let mut res = Self { device: default, @@ -190,7 +190,7 @@ impl AudioOutput { .until_exhausted() // if the source audio is stereo and is being played as mono, discard the right audio .flat_map( - |e| if self.device.get_num_channels() == 1 { + |e| if self.device.num_channels() == 1 { vec![e[0]] } else { e.to_vec() @@ -204,9 +204,8 @@ impl AudioOutput { pub fn decode_packet_payload(&self, stream_type: VoiceStreamType, session_id: u32, payload: VoicePacketPayload) { self.client_streams.lock().unwrap().decode_packet( - Some((stream_type, session_id)), + (stream_type, session_id), payload, - self.device.get_num_channels(), ); } diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index f4e9c4c..4a1ed3d 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -33,11 +33,11 @@ pub trait AudioInputDevice { fn pause(&self) -> Result<(), AudioError>; fn set_volume(&self, volume: f32); fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver; - fn get_num_channels(&self) -> usize; + fn num_channels(&self) -> usize; } pub struct DefaultAudioInputDevice { - _stream: cpal::Stream, + stream: cpal::Stream, sample_receiver: Option>, volume_sender: watch::Sender, channels: u16, @@ -105,7 +105,7 @@ impl DefaultAudioInputDevice { .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?; let res = Self { - _stream: input_stream, + stream: input_stream, sample_receiver: Some(sample_receiver), volume_sender, channels: input_config.channels, @@ -116,10 +116,10 @@ impl DefaultAudioInputDevice { impl AudioInputDevice for DefaultAudioInputDevice { fn play(&self) -> Result<(), AudioError> { - self._stream.play().map_err(|e| AudioError::InputPlayError(e)) + self.stream.play().map_err(|e| AudioError::InputPlayError(e)) } fn pause(&self) -> Result<(), AudioError> { - self._stream.pause().map_err(|e| AudioError::InputPauseError(e)) + self.stream.pause().map_err(|e| AudioError::InputPauseError(e)) } fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); @@ -128,7 +128,7 @@ impl AudioInputDevice for DefaultAudioInputDevice { let ret = self.sample_receiver.take(); ret.unwrap() } - fn get_num_channels(&self) -> usize { + fn num_channels(&self) -> usize { self.channels as usize } } diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index df9b2e2..658c1c8 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -6,49 +6,57 @@ use log::*; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{SampleFormat, SampleRate, StreamConfig, OutputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; -use opus::Channels; use std::collections::{HashMap, VecDeque}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; -type ClientStreamKey = Option<(VoiceStreamType, u32)>; -type ConsumerInput<'a> = std::collections::hash_map::IterMut<'a, ClientStreamKey, VecDeque>; +type ClientStreamKey = (VoiceStreamType, u32); pub struct ClientStream { - buffer: HashMap>, //TODO ring buffer? - opus_decoder: opus::Decoder, + buffer_clients: HashMap, opus::Decoder)>, //TODO ring buffer? + buffer_effects: VecDeque, + sample_rate: u32, + channels: opus::Channels, } impl ClientStream { pub fn new(sample_rate: u32, channels: u16) -> Self { - let mut buffer = HashMap::new(); - //None is the system audio effects - buffer.insert(None, VecDeque::new()); + let channels = match channels { + 1 => opus::Channels::Mono, + 2 => opus::Channels::Stereo, + _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), + }; Self { - buffer, - opus_decoder: opus::Decoder::new( - sample_rate, - match channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), - }, - ) - .unwrap(), + buffer_clients: HashMap::new(), + buffer_effects: VecDeque::new(), + sample_rate, + channels, } } - pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload, channels: usize) { + fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque, opus::Decoder) { + let sample_rate = self.sample_rate; + let channels = self.channels; + self.buffer_clients.entry(client).or_insert_with(|| { + let opus_decoder = opus::Decoder::new( + sample_rate, + channels + ).unwrap(); + (VecDeque::new(), opus_decoder) + }) + } + + pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode - let parsed = self - .opus_decoder + let mut out: Vec = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode + let (buffer, decoder) = self.get_client(client); + let parsed = decoder .decode_float(&bytes, &mut out, false) .expect("Error decoding"); out.truncate(parsed); - self.extend(client, &out); + buffer.extend(&out); } _ => { unimplemented!("Payload type not supported"); @@ -56,18 +64,12 @@ impl ClientStream { } } - pub fn consume<'a, F>(&mut self, consumer: F) - where - F: FnOnce(ConsumerInput) { - let iter = self.buffer.iter_mut(); - consumer(iter); - //remove empty Vec - self.buffer.retain(|_, v| !v.is_empty()); - } - - pub fn extend(&mut self, key: ClientStreamKey, values: &[f32]) { - let entry = self.buffer.entry(key).or_insert(VecDeque::new()); - entry.extend(values.iter().copied()); + pub fn extend(&mut self, client: Option, values: &[f32]) { + let buffer = match client { + Some(x) => &mut self.get_client(x).0, + None => &mut self.buffer_effects, + }; + buffer.extend(values.iter().copied()); } } @@ -101,13 +103,13 @@ pub trait AudioOutputDevice { fn play(&self) -> Result<(), AudioError>; fn pause(&self) -> Result<(), AudioError>; fn set_volume(&self, volume: f32); - fn get_num_channels(&self) -> usize; - fn get_client_streams(&self) -> Arc>; + fn num_channels(&self) -> usize; + fn client_streams(&self) -> Arc>; } pub struct DefaultAudioOutputDevice { config: StreamConfig, - _stream: cpal::Stream, + stream: cpal::Stream, client_streams: Arc>, volume_sender: watch::Sender, } @@ -176,7 +178,7 @@ impl DefaultAudioOutputDevice { Ok(Self { config: output_config, - _stream: output_stream, + stream: output_stream, volume_sender: output_volume_sender, client_streams, }) @@ -185,22 +187,22 @@ impl DefaultAudioOutputDevice { impl AudioOutputDevice for DefaultAudioOutputDevice { fn play(&self) -> Result<(), AudioError> { - self._stream.play().map_err(|e| AudioError::OutputPlayError(e)) + self.stream.play().map_err(|e| AudioError::OutputPlayError(e)) } fn pause(&self) -> Result<(), AudioError> { - self._stream.pause().map_err(|e| AudioError::OutputPauseError(e)) + self.stream.pause().map_err(|e| AudioError::OutputPauseError(e)) } fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); } - fn get_num_channels(&self) -> usize { + fn num_channels(&self) -> usize { self.config.channels as usize } - fn get_client_streams(&self) -> Arc> { + fn client_streams(&self) -> Arc> { Arc::clone(&self.client_streams) } } @@ -219,21 +221,20 @@ pub fn curry_callback let mut user_bufs = user_bufs.lock().unwrap(); let user_volumes = user_volumes.lock().unwrap(); - let mut saturating_add = |input: ConsumerInput| { - for (k, v) in input { - let (user_volume, muted) = match k { - Some((_, id)) => user_volumes.get(id).cloned().unwrap_or((1.0, false)), - None => (1.0, false), - }; - for sample in data.iter_mut() { - if !muted { - *sample = sample.saturating_add(Sample::from( - &(v.pop_front().unwrap_or(0.0) * volume * user_volume), - )); - } + for (k, v) in user_bufs.buffer_clients.iter_mut() { + let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false)); + for sample in data.iter_mut() { + if !muted { + *sample = sample.saturating_add(Sample::from( + &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume), + )); } } - }; - user_bufs.consume(&mut saturating_add); + } + for sample in data.iter_mut() { + *sample = sample.saturating_add(Sample::from( + &(user_bufs.buffer_effects.pop_front().unwrap_or(0.0) * volume), + )); + } } } -- cgit v1.2.1