//! Receives audio packets from the networking and plays them. use crate::audio::SAMPLE_RATE; use crate::error::{AudioError, AudioStream}; use crate::network::VoiceStreamType; use bytes::Bytes; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; use dasp_ring_buffer::Bounded; use log::*; use mumble_protocol::voice::VoicePacketPayload; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::iter; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; type ClientStreamKey = (VoiceStreamType, u32); /// State for decoding audio received from another user. #[derive(Debug)] pub struct ClientAudioData { buf: Bounded>, output_channels: opus::Channels, // We need both since a client can hypothetically send both mono // and stereo packets, and we can't switch a decoder on the fly // to reuse it. mono_decoder: opus::Decoder, stereo_decoder: opus::Decoder, } impl ClientAudioData { pub fn new(sample_rate: u32, output_channels: opus::Channels) -> Self { Self { mono_decoder: opus::Decoder::new(sample_rate, opus::Channels::Mono).unwrap(), stereo_decoder: opus::Decoder::new(sample_rate, opus::Channels::Stereo).unwrap(), output_channels, buf: Bounded::from_full(vec![0.0; sample_rate as usize * output_channels as usize]), //buffer 1 s of audio } } pub fn store_packet(&mut self, bytes: Bytes) { let packet_channels = opus::packet::get_nb_channels(&bytes).unwrap(); let (decoder, channels) = match packet_channels { opus::Channels::Mono => (&mut self.mono_decoder, 1), opus::Channels::Stereo => (&mut self.stereo_decoder, 2), }; let mut out: Vec = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode let parsed = decoder .decode_float(&bytes, &mut out, false) .expect("Error decoding"); out.truncate(parsed); match (packet_channels, self.output_channels) { (opus::Channels::Mono, opus::Channels::Mono) | (opus::Channels::Stereo, opus::Channels::Stereo) => for sample in out { self.buf.push(sample); }, (opus::Channels::Mono, opus::Channels::Stereo) => for sample in out { self.buf.push(sample); self.buf.push(sample); }, (opus::Channels::Stereo, opus::Channels::Mono) => for sample in out.into_iter().step_by(2) { self.buf.push(sample); }, } } } /// Collected state for client opus decoders and sound effects. #[derive(Debug)] pub struct ClientStream { buffer_clients: HashMap, buffer_effects: VecDeque, sample_rate: u32, output_channels: opus::Channels, } impl ClientStream { pub fn new(sample_rate: u32, channels: u16) -> Self { let channels = match channels { 1 => opus::Channels::Mono, 2 => opus::Channels::Stereo, _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), }; Self { buffer_clients: HashMap::new(), buffer_effects: VecDeque::new(), sample_rate, output_channels: channels, } } fn get_client(&mut self, client: ClientStreamKey) -> &mut ClientAudioData { self.buffer_clients.entry(client).or_insert( ClientAudioData::new(self.sample_rate, self.output_channels) ) } /// Decodes a voice packet. pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { self.get_client(client).store_packet(bytes); } _ => { unimplemented!("Payload type not supported"); } } } /// Extends the sound effect buffer queue with some received values. pub fn add_sound_effect(&mut self, values: &[f32]) { self.buffer_effects.extend(values.iter().copied()); } } /// Adds two values in some saturating way. /// /// Since we support [f32], [i16] and [u16] we need some way of adding two values /// without peaking above/below the edge values. This trait ensures that we can /// use all three primitive types as a generic parameter. pub trait SaturatingAdd { /// Adds two values in some saturating way. See trait documentation. fn saturating_add(self, rhs: Self) -> Self; } impl SaturatingAdd for f32 { fn saturating_add(self, rhs: Self) -> Self { match self + rhs { a if a < -1.0 => -1.0, a if a > 1.0 => 1.0, a => a, } } } impl SaturatingAdd for i16 { fn saturating_add(self, rhs: Self) -> Self { i16::saturating_add(self, rhs) } } impl SaturatingAdd for u16 { fn saturating_add(self, rhs: Self) -> Self { u16::saturating_add(self, rhs) } } pub trait AudioOutputDevice { fn play(&self) -> Result<(), AudioError>; fn pause(&self) -> Result<(), AudioError>; fn set_volume(&self, volume: f32); fn num_channels(&self) -> usize; fn client_streams(&self) -> Arc>; } /// The default audio output device, as determined by [cpal]. pub struct DefaultAudioOutputDevice { config: StreamConfig, stream: cpal::Stream, /// The client stream per user ID. A separate stream is kept for UDP and TCP. /// /// Shared with [super::AudioOutput]. client_streams: Arc>, /// Output volume configuration. volume_sender: watch::Sender, } impl DefaultAudioOutputDevice { /// Initializes the default audio output. pub fn new( output_volume: f32, user_volumes: Arc>>, ) -> Result { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); let output_device = host .default_output_device() .ok_or(AudioError::NoDevice(AudioStream::Output))?; let output_supported_config = output_device .supported_output_configs() .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))? .find_map(|c| { if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate && c.channels() == 2 { Some(c) } else { None } }) .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))? .with_sample_rate(sample_rate); let output_supported_sample_format = output_supported_config.sample_format(); let output_config: StreamConfig = output_supported_config.into(); let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new( sample_rate.0, output_config.channels, ))); let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); let (output_volume_sender, output_volume_receiver) = watch::channel::(output_volume); let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( &output_config, callback::( Arc::clone(&client_streams), output_volume_receiver, user_volumes, ), err_fn, ), SampleFormat::I16 => output_device.build_output_stream( &output_config, callback::( Arc::clone(&client_streams), output_volume_receiver, user_volumes, ), err_fn, ), SampleFormat::U16 => output_device.build_output_stream( &output_config, callback::( Arc::clone(&client_streams), output_volume_receiver, user_volumes, ), err_fn, ), } .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; Ok(Self { config: output_config, stream: output_stream, volume_sender: output_volume_sender, client_streams, }) } } impl AudioOutputDevice for DefaultAudioOutputDevice { fn play(&self) -> Result<(), AudioError> { self.stream .play() .map_err(AudioError::OutputPlayError) } fn pause(&self) -> Result<(), AudioError> { self.stream .pause() .map_err(AudioError::OutputPauseError) } fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); } fn num_channels(&self) -> usize { self.config.channels as usize } fn client_streams(&self) -> Arc> { Arc::clone(&self.client_streams) } } /// Returns a function that fills a buffer with audio from client streams /// modified according to some audio configuration. pub fn callback( user_bufs: Arc>, output_volume_receiver: watch::Receiver, user_volumes: Arc>>, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { move |data: &mut [T], _info: &OutputCallbackInfo| { for sample in data.iter_mut() { *sample = Sample::from(&0.0); } let volume = *output_volume_receiver.borrow(); let mut user_bufs = user_bufs.lock().unwrap(); let user_volumes = user_volumes.lock().unwrap(); for (k, v) in user_bufs.buffer_clients.iter_mut() { let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false)); if !muted { for (sample, val) in data.iter_mut().zip(v.buf.drain().chain(iter::repeat(0.0))) { *sample = sample.saturating_add(Sample::from( &(val * volume * user_volume), )); } } } for sample in data.iter_mut() { *sample = sample.saturating_add(Sample::from( &(user_bufs.buffer_effects.pop_front().unwrap_or(0.0) * volume), )); } } } impl Debug for DefaultAudioOutputDevice { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DefaultAudioInputDevice") .field("client_streams", &self.client_streams) .field("config", &self.config) .field("volume_sender", &self.volume_sender) .field("stream", &"cpal::Stream") .finish() } }