pub mod input; pub mod output; use crate::audio::output::SaturatingAdd; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{SampleFormat, SampleRate, StreamConfig}; use dasp_frame::Frame; use dasp_interpolate::linear::Linear; use dasp_sample::{SignedSample, ToSample, Sample}; use dasp_signal::{self as signal, Signal}; use futures::Stream; use futures::stream::StreamExt; use futures::task::{Context, Poll}; use log::*; use mumble_protocol::voice::{VoicePacketPayload, VoicePacket}; use opus::Channels; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::future::{Future}; use std::pin::Pin; use std::sync::{Arc, Mutex}; use tokio::sync::watch; use mumble_protocol::Serverbound; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ (include_bytes!("resources/connect.wav"), NotificationEvents::ServerConnect), ( include_bytes!("resources/disconnect.wav"), NotificationEvents::ServerDisconnect, ), ( include_bytes!("resources/channel_join.wav"), NotificationEvents::UserConnected, ), ( include_bytes!("resources/channel_leave.wav"), NotificationEvents::UserDisconnected, ), ( include_bytes!("resources/channel_join.wav"), NotificationEvents::UserJoinedChannel, ), ( include_bytes!("resources/channel_leave.wav"), NotificationEvents::UserLeftChannel, ), (include_bytes!("resources/mute.wav"), NotificationEvents::Mute), (include_bytes!("resources/unmute.wav"), NotificationEvents::Unmute), (include_bytes!("resources/deafen.wav"), NotificationEvents::Deafen), (include_bytes!("resources/undeafen.wav"), NotificationEvents::Undeafen), ]; const SAMPLE_RATE: u32 = 48000; #[derive(Debug, Eq, PartialEq, Clone, Copy, Hash)] pub enum NotificationEvents { ServerConnect, ServerDisconnect, UserConnected, UserDisconnected, UserJoinedChannel, UserLeftChannel, Mute, Unmute, Deafen, Undeafen, } pub struct Audio { output_config: StreamConfig, _output_stream: cpal::Stream, _input_stream: cpal::Stream, input_channel_receiver: Arc> + Unpin>>>, input_volume_sender: watch::Sender, output_volume_sender: watch::Sender, user_volumes: Arc>>, client_streams: Arc>>, sounds: HashMap>, play_sounds: Arc>>, } impl Audio { pub fn new(input_volume: f32, output_volume: f32) -> Self { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); let output_device = host .default_output_device() .expect("default output device not found"); let output_supported_config = output_device .supported_output_configs() .expect("error querying output configs") .find_map(|c| { if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { Some(c) } else { None } }) .unwrap() .with_sample_rate(sample_rate); let output_supported_sample_format = output_supported_config.sample_format(); let output_config: StreamConfig = output_supported_config.into(); let input_device = host .default_input_device() .expect("default input device not found"); let input_supported_config = input_device .supported_input_configs() .expect("error querying output configs") .find_map(|c| { if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { Some(c) } else { None } }) .unwrap() .with_sample_rate(sample_rate); let input_supported_sample_format = input_supported_config.sample_format(); let input_config: StreamConfig = input_supported_config.into(); let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); let user_volumes = Arc::new(Mutex::new(HashMap::new())); let (output_volume_sender, output_volume_receiver) = watch::channel::(output_volume); let play_sounds = Arc::new(Mutex::new(VecDeque::new())); let client_streams = Arc::new(Mutex::new(HashMap::new())); let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( &output_config, output::curry_callback::( Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), ), err_fn, ), SampleFormat::I16 => output_device.build_output_stream( &output_config, output::curry_callback::( Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), ), err_fn, ), SampleFormat::U16 => output_device.build_output_stream( &output_config, output::curry_callback::( Arc::clone(&play_sounds), Arc::clone(&client_streams), output_volume_receiver, Arc::clone(&user_volumes), ), err_fn, ), } .unwrap(); let (sample_sender, sample_receiver) = futures::channel::mpsc::channel(1_000_000); let (input_volume_sender, input_volume_receiver) = watch::channel::(input_volume); let input_stream = match input_supported_sample_format { SampleFormat::F32 => input_device.build_input_stream( &input_config, input::callback::( sample_sender, input_volume_receiver, ), err_fn, ), SampleFormat::I16 => input_device.build_input_stream( &input_config, input::callback::( sample_sender, input_volume_receiver, ), err_fn, ), SampleFormat::U16 => input_device.build_input_stream( &input_config, input::callback::( sample_sender, input_volume_receiver, ), err_fn, ), } .unwrap(); let opus_stream = OpusEncoder::new( 4, input_config.sample_rate.0, input_config.channels as usize, StreamingSignalExt::into_interleaved_samples( StreamingNoiseGate::new( from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly 0.09, 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio { _dst: std::marker::PhantomData, target: 0, // normal speech session_id: (), // unused for server-bound packets seq_num: i as u64, payload: VoicePacketPayload::Opus(e.into(), false), position_info: None, }); output_stream.play().unwrap(); let sounds = EVENT_SOUNDS .iter() .map(|(bytes, event)| { let reader = hound::WavReader::new(*bytes).unwrap(); let spec = reader.spec(); let samples = match spec.sample_format { hound::SampleFormat::Float => reader .into_samples::() .map(|e| e.unwrap()) .collect::>(), hound::SampleFormat::Int => reader .into_samples::() .map(|e| cpal::Sample::to_f32(&e.unwrap())) .collect::>(), }; let iter: Box> = match spec.channels { 1 => Box::new(samples.into_iter().flat_map(|e| vec![e, e])), 2 => Box::new(samples.into_iter()), _ => unimplemented!() // TODO handle gracefully (this might not even happen) }; let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter); let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal)); let samples = signal .from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64) .until_exhausted() // if the source audio is stereo and is being played as mono, discard the right audio .flat_map(|e| if output_config.channels == 1 { vec![e[0]] } else { e.to_vec() }) .collect::>(); (*event, samples) }) .collect(); Self { output_config, _output_stream: output_stream, _input_stream: input_stream, input_volume_sender, input_channel_receiver: Arc::new(Mutex::new(Box::new(opus_stream))), client_streams, sounds, output_volume_sender, user_volumes, play_sounds, } } pub fn decode_packet(&self, session_id: u32, payload: VoicePacketPayload) { match self.client_streams.lock().unwrap().entry(session_id) { Entry::Occupied(mut entry) => { entry .get_mut() .decode_packet(payload, self.output_config.channels as usize); } Entry::Vacant(_) => { warn!("Can't find session id {}", session_id); } } } pub fn add_client(&self, session_id: u32) { match self.client_streams.lock().unwrap().entry(session_id) { Entry::Occupied(_) => { warn!("Session id {} already exists", session_id); } Entry::Vacant(entry) => { entry.insert(output::ClientStream::new( self.output_config.sample_rate.0, self.output_config.channels, )); } } } pub fn remove_client(&self, session_id: u32) { match self.client_streams.lock().unwrap().entry(session_id) { Entry::Occupied(entry) => { entry.remove(); } Entry::Vacant(_) => { warn!( "Tried to remove session id {} that doesn't exist", session_id ); } } } pub fn take_receiver(&mut self) -> Arc> + Unpin>>> { Arc::clone(&self.input_channel_receiver) } pub fn clear_clients(&mut self) { self.client_streams.lock().unwrap().clear(); } pub fn set_input_volume(&self, input_volume: f32) { self.input_volume_sender.send(input_volume).unwrap(); } pub fn set_output_volume(&self, output_volume: f32) { self.output_volume_sender.send(output_volume).unwrap(); } pub fn set_user_volume(&self, id: u32, volume: f32) { match self.user_volumes.lock().unwrap().entry(id) { Entry::Occupied(mut entry) => { entry.get_mut().0 = volume; } Entry::Vacant(entry) => { entry.insert((volume, false)); } } } pub fn set_mute(&self, id: u32, mute: bool) { match self.user_volumes.lock().unwrap().entry(id) { Entry::Occupied(mut entry) => { entry.get_mut().1 = mute; } Entry::Vacant(entry) => { entry.insert((1.0, mute)); } } } pub fn play_effect(&self, effect: NotificationEvents) { let samples = self.sounds.get(&effect).unwrap(); let mut play_sounds = self.play_sounds.lock().unwrap(); for (val, e) in play_sounds.iter_mut().zip(samples.iter()) { *val = val.saturating_add(*e); } let l = play_sounds.len(); play_sounds.extend(samples.iter().skip(l)); } } struct StreamingNoiseGate { open: usize, signal: S, activate_threshold: <::Sample as Sample>::Float, deactivation_delay: usize, } impl StreamingNoiseGate { pub fn new( signal: S, activate_threshold: <::Sample as Sample>::Float, deactivation_delay: usize, ) -> StreamingNoiseGate { Self { open: 0, signal, activate_threshold, deactivation_delay } } } impl StreamingSignal for StreamingNoiseGate where S: StreamingSignal + Unpin, <<::Frame as Frame>::Sample as Sample>::Float: Unpin, ::Frame: Unpin { type Frame = S::Frame; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); let frame = match S::poll_next(Pin::new(&mut s.signal), cx) { Poll::Ready(v) => v, Poll::Pending => return Poll::Pending, }; match s.open { 0 => { if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) { s.open = s.deactivation_delay; } } _ => { if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) { s.open = s.deactivation_delay; } else { s.open -= 1; } } } if s.open != 0 { Poll::Ready(frame) } else { Poll::Ready(::EQUILIBRIUM) } } fn is_exhausted(&self) -> bool { self.signal.is_exhausted() } } fn abs(sample: S) -> S { let zero = S::EQUILIBRIUM; if sample >= zero { sample } else { -sample } } trait StreamingSignal { type Frame: Frame; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; fn is_exhausted(&self) -> bool { false } } impl StreamingSignal for S where S: Signal + Unpin { type Frame = S::Frame; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { Poll::Ready(self.get_mut().next()) } } trait StreamingSignalExt: StreamingSignal { fn next(&mut self) -> Next<'_, Self> { Next { stream: self } } fn into_interleaved_samples(self) -> IntoInterleavedSamples where Self: Sized { IntoInterleavedSamples { signal: self, current_frame: None } } } impl StreamingSignalExt for S where S: StreamingSignal {} struct Next<'a, S: ?Sized> { stream: &'a mut S } impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> { type Output = S::Frame; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match S::poll_next(Pin::new(self.stream), cx) { Poll::Ready(val) => { Poll::Ready(val) } Poll::Pending => Poll::Pending } } } struct IntoInterleavedSamples { signal: S, current_frame: Option<::Channels>, } impl Stream for IntoInterleavedSamples where S: StreamingSignal + Unpin, <::Frame as Frame>::Channels: Unpin { type Item = ::Sample; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let s = self.get_mut(); loop { if s.current_frame.is_some() { if let Some(channel) = s.current_frame.as_mut().unwrap().next() { return Poll::Ready(Some(channel)); } } match S::poll_next(Pin::new(&mut s.signal), cx) { Poll::Ready(val) => { s.current_frame = Some(val.channels()); } Poll::Pending => return Poll::Pending, } } } } struct FromStream { stream: S, underlying_exhausted: bool, } impl StreamingSignal for FromStream where S: Stream + Unpin, S::Item: Frame + Unpin { type Frame = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); if s.underlying_exhausted { return Poll::Ready(::EQUILIBRIUM); } match S::poll_next(Pin::new(&mut s.stream), cx) { Poll::Ready(Some(val)) => { Poll::Ready(val) } Poll::Ready(None) => { s.underlying_exhausted = true; return Poll::Ready(::EQUILIBRIUM); } Poll::Pending => Poll::Pending, } } fn is_exhausted(&self) -> bool { self.underlying_exhausted } } struct FromInterleavedSamplesStream where F: Frame { stream: S, next_buf: Vec, underlying_exhausted: bool, } fn from_interleaved_samples_stream(stream: S) -> FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample, F: Frame { FromInterleavedSamplesStream { stream, next_buf: Vec::new(), underlying_exhausted: false, } } impl StreamingSignal for FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample + Unpin, F: Frame + Unpin { type Frame = F; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); if s.underlying_exhausted { return Poll::Ready(F::EQUILIBRIUM); } while s.next_buf.len() < F::CHANNELS { match S::poll_next(Pin::new(&mut s.stream), cx) { Poll::Ready(Some(v)) => { s.next_buf.push(v); } Poll::Ready(None) => { s.underlying_exhausted = true; return Poll::Ready(F::EQUILIBRIUM); } Poll::Pending => return Poll::Pending, } } let mut data = s.next_buf.iter().cloned(); let n = F::from_samples(&mut data).unwrap(); s.next_buf.clear(); Poll::Ready(n) } fn is_exhausted(&self) -> bool { self.underlying_exhausted } } struct OpusEncoder { encoder: opus::Encoder, frame_size: u32, sample_rate: u32, stream: S, input_buffer: Vec, exhausted: bool, } impl OpusEncoder where S: Stream, I: ToSample { fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self { let encoder = opus::Encoder::new( sample_rate, match channels { 1 => Channels::Mono, 2 => Channels::Stereo, _ => unimplemented!( "Only 1 or 2 channels supported, got {})", channels ), }, opus::Application::Voip, ).unwrap(); Self { encoder, frame_size, sample_rate, stream, input_buffer: Vec::new(), exhausted: false, } } } impl Stream for OpusEncoder where S: Stream + Unpin, I: Sample + ToSample { type Item = Vec; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let s = self.get_mut(); if s.exhausted { return Poll::Ready(None); } let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize; loop { while s.input_buffer.len() < opus_frame_size { match S::poll_next(Pin::new(&mut s.stream), cx) { Poll::Ready(Some(v)) => { s.input_buffer.push(v.to_sample::()); } Poll::Ready(None) => { s.exhausted = true; return Poll::Ready(None); } Poll::Pending => return Poll::Pending, } } if s.input_buffer.iter().any(|&e| e != 0.0) { break; } s.input_buffer.clear(); } let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap(); s.input_buffer.clear(); Poll::Ready(Some(encoded)) } }