diff options
Diffstat (limited to 'mumd/src/audio.rs')
| -rw-r--r-- | mumd/src/audio.rs | 288 |
1 files changed, 71 insertions, 217 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index fdbeaee..df7af70 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -2,14 +2,14 @@ pub mod input; pub mod output; mod noise_gate; -use crate::audio::output::SaturatingAdd; +use crate::audio::input::{DefaultAudioInputDevice, AudioInputDevice}; +use crate::audio::output::{DefaultAudioOutputDevice, AudioOutputDevice, ClientStream}; use crate::audio::noise_gate::{from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt}; -use crate::error::{AudioError, AudioStream}; +use crate::error::AudioError; use crate::network::VoiceStreamType; use crate::state::StatePhase; -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{SampleFormat, SampleRate, StreamConfig}; +use cpal::SampleRate; use dasp_interpolate::linear::Linear; use dasp_signal::{self as signal, Signal}; use futures_util::stream::Stream; @@ -19,7 +19,7 @@ use mumble_protocol::Serverbound; use mumble_protocol::voice::{VoicePacketPayload, VoicePacket}; use mumlib::config::SoundEffect; use std::borrow::Cow; -use std::collections::{hash_map::Entry, HashMap, VecDeque}; +use std::collections::{hash_map::Entry, HashMap}; use std::convert::TryFrom; use std::fmt::Debug; use std::fs::File; @@ -67,170 +67,82 @@ impl TryFrom<&str> for NotificationEvents { } } -pub struct Audio { - output_config: StreamConfig, - _output_stream: cpal::Stream, - _input_stream: cpal::Stream, +pub struct AudioInput { + device: DefaultAudioInputDevice, - input_channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>, - input_volume_sender: watch::Sender<f32>, - - output_volume_sender: watch::Sender<f32>, - - user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, - - client_streams: Arc<Mutex<HashMap<(VoiceStreamType, u32), output::ClientStream>>>, - - sounds: HashMap<NotificationEvents, Vec<f32>>, - play_sounds: Arc<Mutex<VecDeque<f32>>>, + channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>, } -impl Audio { - pub fn new(input_volume: f32, output_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> { +impl AudioInput { + pub fn new(input_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> { + let mut default = DefaultAudioInputDevice::new(input_volume, phase_watcher)?; let sample_rate = SampleRate(SAMPLE_RATE); - let host = cpal::default_host(); - let output_device = host - .default_output_device() - .ok_or(AudioError::NoDevice(AudioStream::Output))?; - let output_supported_config = output_device - .supported_output_configs() - .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))? - .find_map(|c| { - if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { - Some(c) - } else { - None - } - }) - .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))? - .with_sample_rate(sample_rate); - let output_supported_sample_format = output_supported_config.sample_format(); - let output_config: StreamConfig = output_supported_config.into(); + let opus_stream = OpusEncoder::new( + 4, + sample_rate.0, + default.num_channels(), + StreamingSignalExt::into_interleaved_samples( + StreamingNoiseGate::new( + from_interleaved_samples_stream::<_, f32>(default.sample_receiver()), //TODO group frames correctly + 10_000 + ) + ) + ).enumerate() + .map(|(i, e)| VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: i as u64, + payload: VoicePacketPayload::Opus(e.into(), false), + position_info: None, + } + ); - let input_device = host - .default_input_device() - .ok_or(AudioError::NoDevice(AudioStream::Input))?; - let input_supported_config = input_device - .supported_input_configs() - .map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))? - .find_map(|c| { - if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { - Some(c) - } else { - None - } - }) - .ok_or(AudioError::NoSupportedConfig(AudioStream::Input))? - .with_sample_rate(sample_rate); - let input_supported_sample_format = input_supported_config.sample_format(); - let input_config: StreamConfig = input_supported_config.into(); + default.play()?; - let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); + let res = Self { + device: default, + channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))), + }; + Ok(res) + } - let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); - let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume); - let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new())); + pub fn receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> { + Arc::clone(&self.channel_receiver) + } - let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new())); - let output_stream = match output_supported_sample_format { - SampleFormat::F32 => output_device.build_output_stream( - &output_config, - output::curry_callback::<f32>( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - SampleFormat::I16 => output_device.build_output_stream( - &output_config, - output::curry_callback::<i16>( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - SampleFormat::U16 => output_device.build_output_stream( - &output_config, - output::curry_callback::<u16>( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - } - .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; + pub fn set_volume(&self, input_volume: f32) { + self.device.set_volume(input_volume); + } +} - let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000); +pub struct AudioOutput { + device: DefaultAudioOutputDevice, + user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, - let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume); + client_streams: Arc<Mutex<ClientStream>>, - let input_stream = match input_supported_sample_format { - SampleFormat::F32 => input_device.build_input_stream( - &input_config, - input::callback::<f32>( - sample_sender, - input_volume_receiver, - phase_watcher, - ), - err_fn, - ), - SampleFormat::I16 => input_device.build_input_stream( - &input_config, - input::callback::<i16>( - sample_sender, - input_volume_receiver, - phase_watcher, - ), - err_fn, - ), - SampleFormat::U16 => input_device.build_input_stream( - &input_config, - input::callback::<u16>( - sample_sender, - input_volume_receiver, - phase_watcher, - ), - err_fn, - ), - } - .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?; + sounds: HashMap<NotificationEvents, Vec<f32>>, +} - let opus_stream = OpusEncoder::new( - 4, - input_config.sample_rate.0, - input_config.channels as usize, - StreamingSignalExt::into_interleaved_samples( - StreamingNoiseGate::new( - from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly - 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: i as u64, - payload: VoicePacketPayload::Opus(e.into(), false), - position_info: None, - }); +impl AudioOutput { + pub fn new(output_volume: f32) -> Result<Self, AudioError> { + let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); + + let default = DefaultAudioOutputDevice::new( + output_volume, + Arc::clone(&user_volumes), + )?; + default.play()?; - output_stream.play().map_err(|e| AudioError::OutputPlayError(e))?; + let client_streams = default.client_streams(); let mut res = Self { - output_config, - _output_stream: output_stream, - _input_stream: input_stream, - input_volume_sender, - input_channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))), - client_streams, + device: default, sounds: HashMap::new(), - output_volume_sender, + client_streams, user_volumes, - play_sounds, }; res.load_sound_effects(&[]); Ok(res) @@ -278,7 +190,7 @@ impl Audio { .until_exhausted() // if the source audio is stereo and is being played as mono, discard the right audio .flat_map( - |e| if self.output_config.channels == 1 { + |e| if self.device.num_channels() == 1 { vec![e[0]] } else { e.to_vec() @@ -291,64 +203,14 @@ impl Audio { } pub fn decode_packet_payload(&self, stream_type: VoiceStreamType, session_id: u32, payload: VoicePacketPayload) { - match self.client_streams.lock().unwrap().entry((stream_type, session_id)) { - Entry::Occupied(mut entry) => { - entry - .get_mut() - .decode_packet(payload, self.output_config.channels as usize); - } - Entry::Vacant(_) => { - warn!("Can't find session id {}", session_id); - } - } - } - - pub fn add_client(&self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) { - Entry::Occupied(_) => { - warn!("Session id {} already exists", session_id); - } - Entry::Vacant(entry) => { - entry.insert(output::ClientStream::new( - self.output_config.sample_rate.0, - self.output_config.channels, - )); - } - } - } - } - - pub fn remove_client(&self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) { - Entry::Occupied(entry) => { - entry.remove(); - } - Entry::Vacant(_) => { - warn!( - "Tried to remove session id {} that doesn't exist", - session_id - ); - } - } - } - } - - pub fn input_receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> { - Arc::clone(&self.input_channel_receiver) + self.client_streams.lock().unwrap().decode_packet( + (stream_type, session_id), + payload, + ); } - pub fn clear_clients(&mut self) { - self.client_streams.lock().unwrap().clear(); - } - - pub fn set_input_volume(&self, input_volume: f32) { - self.input_volume_sender.send(input_volume).unwrap(); - } - - pub fn set_output_volume(&self, output_volume: f32) { - self.output_volume_sender.send(output_volume).unwrap(); + pub fn set_volume(&self, output_volume: f32) { + self.device.set_volume(output_volume); } pub fn set_user_volume(&self, id: u32, volume: f32) { @@ -375,15 +237,7 @@ impl Audio { pub fn play_effect(&self, effect: NotificationEvents) { let samples = self.sounds.get(&effect).unwrap(); - - let mut play_sounds = self.play_sounds.lock().unwrap(); - - for (val, e) in play_sounds.iter_mut().zip(samples.iter()) { - *val = val.saturating_add(*e); - } - - let l = play_sounds.len(); - play_sounds.extend(samples.iter().skip(l)); + self.client_streams.lock().unwrap().extend(None, samples); } } |
