diff options
| author | Rubens Brandao <git@rubens.io> | 2021-04-05 18:39:55 +0200 |
|---|---|---|
| committer | Rubens Brandao <git@rubens.io> | 2021-04-05 18:39:55 +0200 |
| commit | e431ecb6c5c8406bde6a54f40ee2f648cc0cec05 (patch) | |
| tree | 93a227ab5f3ceac27286a1c2c36b79c12c72f95f | |
| parent | 1734a72d3caff4f8831f4b366fdb818fddecf32f (diff) | |
| download | mum-e431ecb6c5c8406bde6a54f40ee2f648cc0cec05.tar.gz | |
Separate the input and output audio
| -rw-r--r-- | mumd/src/audio.rs | 181 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 4 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 4 | ||||
| -rw-r--r-- | mumd/src/state.rs | 66 |
4 files changed, 141 insertions, 114 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index fdbeaee..0aac68c 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -67,46 +67,18 @@ impl TryFrom<&str> for NotificationEvents { } } -pub struct Audio { - output_config: StreamConfig, - _output_stream: cpal::Stream, +pub struct AudioInput { _input_stream: cpal::Stream, input_channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>, input_volume_sender: watch::Sender<f32>, - - output_volume_sender: watch::Sender<f32>, - - user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, - - client_streams: Arc<Mutex<HashMap<(VoiceStreamType, u32), output::ClientStream>>>, - - sounds: HashMap<NotificationEvents, Vec<f32>>, - play_sounds: Arc<Mutex<VecDeque<f32>>>, } -impl Audio { - pub fn new(input_volume: f32, output_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> { +impl AudioInput { + pub fn new(input_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); - let output_device = host - .default_output_device() - .ok_or(AudioError::NoDevice(AudioStream::Output))?; - let output_supported_config = output_device - .supported_output_configs() - .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))? - .find_map(|c| { - if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { - Some(c) - } else { - None - } - }) - .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))? - .with_sample_rate(sample_rate); - let output_supported_sample_format = output_supported_config.sample_format(); - let output_config: StreamConfig = output_supported_config.into(); let input_device = host .default_input_device() @@ -128,45 +100,6 @@ impl Audio { let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); - let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); - let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume); - let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new())); - - let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new())); - let output_stream = match output_supported_sample_format { - SampleFormat::F32 => output_device.build_output_stream( - &output_config, - output::curry_callback::<f32>( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - SampleFormat::I16 => output_device.build_output_stream( - &output_config, - output::curry_callback::<i16>( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - SampleFormat::U16 => output_device.build_output_stream( - &output_config, - output::curry_callback::<u16>( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - } - .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; - let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000); let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume); @@ -218,14 +151,106 @@ impl Audio { position_info: None, }); - output_stream.play().map_err(|e| AudioError::OutputPlayError(e))?; + input_stream.play().map_err(|e| AudioError::OutputPlayError(e))?; - let mut res = Self { - output_config, - _output_stream: output_stream, + let res = Self { _input_stream: input_stream, input_volume_sender, input_channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))), + }; + Ok(res) + } + + pub fn input_receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> { + Arc::clone(&self.input_channel_receiver) + } + + pub fn set_input_volume(&self, input_volume: f32) { + self.input_volume_sender.send(input_volume).unwrap(); + } +} + +pub struct AudioOutput { + output_config: StreamConfig, + _output_stream: cpal::Stream, + + output_volume_sender: watch::Sender<f32>, + + user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, + + client_streams: Arc<Mutex<HashMap<(VoiceStreamType, u32), output::ClientStream>>>, + + sounds: HashMap<NotificationEvents, Vec<f32>>, + play_sounds: Arc<Mutex<VecDeque<f32>>>, +} + +impl AudioOutput { + pub fn new(output_volume: f32) -> Result<Self, AudioError> { + let sample_rate = SampleRate(SAMPLE_RATE); + + let host = cpal::default_host(); + let output_device = host + .default_output_device() + .ok_or(AudioError::NoDevice(AudioStream::Output))?; + let output_supported_config = output_device + .supported_output_configs() + .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))? + .find_map(|c| { + if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { + Some(c) + } else { + None + } + }) + .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))? + .with_sample_rate(sample_rate); + let output_supported_sample_format = output_supported_config.sample_format(); + let output_config: StreamConfig = output_supported_config.into(); + + let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); + + let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); + let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume); + let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new())); + + let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new())); + let output_stream = match output_supported_sample_format { + SampleFormat::F32 => output_device.build_output_stream( + &output_config, + output::curry_callback::<f32>( + Arc::clone(&play_sounds), + Arc::clone(&client_streams), + output_volume_receiver, + Arc::clone(&user_volumes), + ), + err_fn, + ), + SampleFormat::I16 => output_device.build_output_stream( + &output_config, + output::curry_callback::<i16>( + Arc::clone(&play_sounds), + Arc::clone(&client_streams), + output_volume_receiver, + Arc::clone(&user_volumes), + ), + err_fn, + ), + SampleFormat::U16 => output_device.build_output_stream( + &output_config, + output::curry_callback::<u16>( + Arc::clone(&play_sounds), + Arc::clone(&client_streams), + output_volume_receiver, + Arc::clone(&user_volumes), + ), + err_fn, + ), + } + .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; + + let mut res = Self { + output_config, + _output_stream: output_stream, client_streams, sounds: HashMap::new(), output_volume_sender, @@ -335,18 +360,10 @@ impl Audio { } } - pub fn input_receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> { - Arc::clone(&self.input_channel_receiver) - } - pub fn clear_clients(&mut self) { self.client_streams.lock().unwrap().clear(); } - pub fn set_input_volume(&self, input_volume: f32) { - self.input_volume_sender.send(input_volume).unwrap(); - } - pub fn set_output_volume(&self, output_volume: f32) { self.output_volume_sender.send(output_volume).unwrap(); } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 6402a89..d2f0b41 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -108,7 +108,7 @@ pub async fn handle( let password = state_lock.password().map(|x| x.to_string()); authenticate(&mut sink, username, password).await?; let phase_watcher = state_lock.phase_receiver(); - let input_receiver = state_lock.audio().input_receiver(); + let input_receiver = state_lock.audio_input().input_receiver(); drop(state_lock); let event_queue = TcpEventQueue::new(); @@ -358,7 +358,7 @@ async fn listen( state .lock() .await - .audio() + .audio_output() .decode_packet_payload( VoiceStreamType::TCP, session_id, diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 5996e43..a8e190d 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -33,7 +33,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>, ) -> Result<(), UdpError> { - let receiver = state.lock().await.audio().input_receiver(); + let receiver = state.lock().await.audio_input().input_receiver(); loop { let connection_info = 'data: loop { @@ -151,7 +151,7 @@ async fn listen( state .lock() //TODO change so that we only have to lock audio and not the whole state .await - .audio() + .audio_output() .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index b52b330..84bb372 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -2,7 +2,7 @@ pub mod channel; pub mod server; pub mod user; -use crate::audio::{Audio, NotificationEvents}; +use crate::audio::{AudioInput, AudioOutput, NotificationEvents}; use crate::error::StateError; use crate::network::{ConnectionInfo, VoiceStreamType}; use crate::network::tcp::{TcpEvent, TcpEventData}; @@ -57,7 +57,8 @@ pub enum StatePhase { pub struct State { config: Config, server: Option<Server>, - audio: Audio, + audio_input: AudioInput, + audio_output: AudioOutput, phase_watcher: (watch::Sender<StatePhase>, watch::Receiver<StatePhase>), } @@ -66,15 +67,18 @@ impl State { pub fn new() -> Result<Self, StateError> { let config = mumlib::config::read_default_cfg()?; let phase_watcher = watch::channel(StatePhase::Disconnected); - let audio = Audio::new( + let audio_input = AudioInput::new( config.audio.input_volume.unwrap_or(1.0), - config.audio.output_volume.unwrap_or(1.0), phase_watcher.1.clone(), ).map_err(|e| StateError::AudioError(e))?; + let audio_output = AudioOutput::new( + config.audio.output_volume.unwrap_or(1.0), + ).map_err(|e| StateError::AudioError(e))?; let mut state = Self { config, server: None, - audio, + audio_input, + audio_output, phase_watcher, }; state.reload_config(); @@ -176,13 +180,13 @@ impl State { let mut new_deaf = None; if let Some((mute, deafen)) = action { if server.deafened() != deafen { - self.audio.play_effect(if deafen { + self.audio_output.play_effect(if deafen { NotificationEvents::Deafen } else { NotificationEvents::Undeafen }); } else if server.muted() != mute { - self.audio.play_effect(if mute { + self.audio_output.play_effect(if mute { NotificationEvents::Mute } else { NotificationEvents::Unmute @@ -207,7 +211,7 @@ impl State { now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }))) } Command::InputVolumeSet(volume) => { - self.audio.set_input_volume(volume); + self.audio_input.set_input_volume(volume); now!(Ok(None)) } Command::MuteOther(string, toggle) => { @@ -240,7 +244,7 @@ impl State { if let Some(action) = action { user.set_suppressed(action); - self.audio.set_mute(id, action); + self.audio_output.set_mute(id, action); } return now!(Ok(None)); @@ -269,13 +273,13 @@ impl State { let mut new_mute = None; if let Some((mute, deafen)) = action { if server.deafened() != deafen { - self.audio.play_effect(if deafen { + self.audio_output.play_effect(if deafen { NotificationEvents::Deafen } else { NotificationEvents::Undeafen }); } else if server.muted() != mute { - self.audio.play_effect(if mute { + self.audio_output.play_effect(if mute { NotificationEvents::Mute } else { NotificationEvents::Unmute @@ -301,7 +305,7 @@ impl State { now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }))) } Command::OutputVolumeSet(volume) => { - self.audio.set_output_volume(volume); + self.audio_output.set_output_volume(volume); now!(Ok(None)) } Command::Ping => { @@ -367,13 +371,13 @@ impl State { } self.server = None; - self.audio.clear_clients(); + self.audio_output.clear_clients(); self.phase_watcher .0 .send(StatePhase::Disconnected) .unwrap(); - self.audio.play_effect(NotificationEvents::ServerDisconnect); + self.audio_output.play_effect(NotificationEvents::ServerDisconnect); now!(Ok(None)) } Command::ServerStatus { host, port } => ExecutionContext::Ping( @@ -420,7 +424,7 @@ impl State { Some(v) => v, }; - self.audio.set_user_volume(user_id, volume); + self.audio_output.set_user_volume(user_id, volume); now!(Ok(None)) } } @@ -453,7 +457,7 @@ impl State { *self.server_mut().unwrap().session_id_mut() = Some(session); } else { // this is someone else - self.audio_mut().add_client(session); + self.audio_output_mut().add_client(session); // send notification only if we've passed the connecting phase if matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { @@ -470,7 +474,7 @@ impl State { )); } - self.audio.play_effect(NotificationEvents::UserConnected); + self.audio_output.play_effect(NotificationEvents::UserConnected); } } } @@ -524,7 +528,7 @@ impl State { } else { warn!("{} moved to invalid channel {}", user.name(), to_channel); } - self.audio.play_effect(if from_channel == this_channel { + self.audio_output.play_effect(if from_channel == this_channel { NotificationEvents::UserJoinedChannel } else { NotificationEvents::UserLeftChannel @@ -559,13 +563,13 @@ impl State { let this_channel = self.get_users_channel(self.server().unwrap().session_id().unwrap()); let other_channel = self.get_users_channel(msg.get_session()); if this_channel == other_channel { - self.audio.play_effect(NotificationEvents::UserDisconnected); + self.audio_output.play_effect(NotificationEvents::UserDisconnected); if let Some(user) = self.server().unwrap().users().get(&msg.get_session()) { notifications::send(format!("{} disconnected", &user.name())); } } - self.audio().remove_client(msg.get_session()); + self.audio_output().remove_client(msg.get_session()); self.server_mut() .unwrap() .users_mut() @@ -581,13 +585,13 @@ impl State { Err(e) => error!("Couldn't read config: {}", e), } if let Some(input_volume) = self.config.audio.input_volume { - self.audio.set_input_volume(input_volume); + self.audio_input.set_input_volume(input_volume); } if let Some(output_volume) = self.config.audio.output_volume { - self.audio.set_output_volume(output_volume); + self.audio_output.set_output_volume(output_volume); } if let Some(sound_effects) = &self.config.audio.sound_effects { - self.audio.load_sound_effects(sound_effects); + self.audio_output.load_sound_effects(sound_effects); } } @@ -600,14 +604,20 @@ impl State { pub fn initialized(&self) { self.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); - self.audio.play_effect(NotificationEvents::ServerConnect); + self.audio_output.play_effect(NotificationEvents::ServerConnect); } - pub fn audio(&self) -> &Audio { - &self.audio + pub fn audio_input(&self) -> &AudioInput { + &self.audio_input + } + pub fn audio_output(&self) -> &AudioOutput { + &self.audio_output + } + pub fn audio_input_mut(&mut self) -> &mut AudioInput { + &mut self.audio_input } - pub fn audio_mut(&mut self) -> &mut Audio { - &mut self.audio + pub fn audio_output_mut(&mut self) -> &mut AudioOutput { + &mut self.audio_output } pub fn phase_receiver(&self) -> watch::Receiver<StatePhase> { self.phase_watcher.1.clone() |
