diff options
| author | Kapten Z∅∅m <55669224+default-username-852@users.noreply.github.com> | 2021-06-13 12:14:47 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-06-13 12:14:47 +0200 |
| commit | 42275c61510f38318332a20c1ee41dbc17663b13 (patch) | |
| tree | f1bece192603ae4562f79780ebef63d0852d27a8 /mumd/src/audio | |
| parent | cf3f8c185cede889faccd3d55655a494ccd6f707 (diff) | |
| parent | 9481f3edb37d44957273a8b856ac823d2b5b5f28 (diff) | |
| download | mum-42275c61510f38318332a20c1ee41dbc17663b13.tar.gz | |
Merge pull request #102 from mum-rs/output-rework
Input rework
Diffstat (limited to 'mumd/src/audio')
| -rw-r--r-- | mumd/src/audio/input.rs | 77 | ||||
| -rw-r--r-- | mumd/src/audio/noise_gate.rs | 350 | ||||
| -rw-r--r-- | mumd/src/audio/transformers.rs | 49 |
3 files changed, 114 insertions, 362 deletions
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index e45ff27..a1227e3 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -4,24 +4,42 @@ use log::*; use tokio::sync::watch; use crate::audio::SAMPLE_RATE; +use crate::audio::transformers::{NoiseGate, Transformer}; use crate::error::{AudioError, AudioStream}; use crate::state::StatePhase; pub fn callback<T: Sample>( - mut input_sender: futures_channel::mpsc::Sender<f32>, + mut input_sender: futures_channel::mpsc::Sender<Vec<u8>>, + mut transformers: Vec<Box<dyn Transformer + Send + 'static>>, + mut opus_encoder: opus::Encoder, + buffer_size: usize, input_volume_receiver: watch::Receiver<f32>, phase_watcher: watch::Receiver<StatePhase>, ) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { + let mut buffer = Vec::with_capacity(buffer_size); + move |data: &[T], _info: &InputCallbackInfo| { if !matches!(&*phase_watcher.borrow(), StatePhase::Connected(_)) { return; } let input_volume = *input_volume_receiver.borrow(); - for sample in data.iter().map(|e| e.to_f32()).map(|e| e * input_volume) { - if let Err(_e) = input_sender.try_send(sample) { - warn!("Error sending audio: {}", _e); + let mut data = data.iter().map(|e| e.to_f32()).map(|e| e * input_volume); + + while buffer.len() + data.len() > buffer_size { + buffer.extend(data.by_ref().take(buffer_size - buffer.len())); + let encoded = transformers + .iter_mut() + .try_fold(&mut buffer[..], |acc, e| e.transform(acc)) + .map(|buf| opus_encoder.encode_vec_float(&*buf, buffer_size).unwrap()); + + if let Some(encoded) = encoded { + if let Err(e) = input_sender.try_send(encoded) { + warn!("Error sending audio: {}", e); + } } + buffer.clear(); } + buffer.extend(data); } } @@ -29,13 +47,13 @@ pub trait AudioInputDevice { fn play(&self) -> Result<(), AudioError>; fn pause(&self) -> Result<(), AudioError>; fn set_volume(&self, volume: f32); - fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver<f32>; + fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>>; fn num_channels(&self) -> usize; } pub struct DefaultAudioInputDevice { stream: cpal::Stream, - sample_receiver: Option<futures_channel::mpsc::Receiver<f32>>, + sample_receiver: Option<futures_channel::mpsc::Receiver<Vec<u8>>>, volume_sender: watch::Sender<f32>, channels: u16, } @@ -44,6 +62,7 @@ impl DefaultAudioInputDevice { pub fn new( input_volume: f32, phase_watcher: watch::Receiver<StatePhase>, + frame_size: u32, // blocks of 2.5 ms ) -> Result<Self, AudioError> { let sample_rate = SampleRate(SAMPLE_RATE); @@ -73,20 +92,55 @@ impl DefaultAudioInputDevice { let (volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume); + let opus_encoder = opus::Encoder::new( + sample_rate.0, + match input_config.channels { + 1 => opus::Channels::Mono, + 2 => opus::Channels::Stereo, + _ => unimplemented!("Only 1 or 2 channels supported, got {}", input_config.channels), + }, + opus::Application::Voip, + ) + .unwrap(); + let buffer_size = (sample_rate.0 * frame_size / 400) as usize; + + let transformers = vec![Box::new(NoiseGate::new(50)) as Box<dyn Transformer + Send + 'static>]; + let input_stream = match input_supported_sample_format { SampleFormat::F32 => input_device.build_input_stream( &input_config, - callback::<f32>(sample_sender, input_volume_receiver, phase_watcher), + callback::<f32>( + sample_sender, + transformers, + opus_encoder, + buffer_size, + input_volume_receiver, + phase_watcher + ), err_fn, ), SampleFormat::I16 => input_device.build_input_stream( &input_config, - callback::<i16>(sample_sender, input_volume_receiver, phase_watcher), + callback::<i16>( + sample_sender, + transformers, + opus_encoder, + buffer_size, + input_volume_receiver, + phase_watcher + ), err_fn, ), SampleFormat::U16 => input_device.build_input_stream( &input_config, - callback::<u16>(sample_sender, input_volume_receiver, phase_watcher), + callback::<u16>( + sample_sender, + transformers, + opus_encoder, + buffer_size, + input_volume_receiver, + phase_watcher + ), err_fn, ), } @@ -116,9 +170,8 @@ impl AudioInputDevice for DefaultAudioInputDevice { fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); } - fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver<f32> { - let ret = self.sample_receiver.take(); - ret.unwrap() + fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>> { + self.sample_receiver.take() } fn num_channels(&self) -> usize { self.channels as usize diff --git a/mumd/src/audio/noise_gate.rs b/mumd/src/audio/noise_gate.rs deleted file mode 100644 index bd1a262..0000000 --- a/mumd/src/audio/noise_gate.rs +++ /dev/null @@ -1,350 +0,0 @@ -use dasp_frame::Frame; -use dasp_sample::{Sample, SignedSample, ToSample}; -use dasp_signal::Signal; -use futures_util::stream::Stream; -use opus::Channels; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -type FloatSample<S> = <<S as Frame>::Sample as Sample>::Float; - -pub struct StreamingNoiseGate<S: StreamingSignal> { - open: usize, - signal: S, - deactivation_delay: usize, - alltime_high: FloatSample<S::Frame>, -} - -impl<S: StreamingSignal> StreamingNoiseGate<S> { - pub fn new(signal: S, deactivation_delay: usize) -> StreamingNoiseGate<S> { - Self { - open: 0, - signal, - deactivation_delay, - alltime_high: FloatSample::<S::Frame>::EQUILIBRIUM, - } - } -} - -impl<S> StreamingSignal for StreamingNoiseGate<S> -where - S: StreamingSignal + Unpin, - FloatSample<S::Frame>: Unpin, - <S as StreamingSignal>::Frame: Unpin, -{ - type Frame = S::Frame; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> { - const MUTE_PERCENTAGE: f32 = 0.1; - - let s = self.get_mut(); - - let frame = match S::poll_next(Pin::new(&mut s.signal), cx) { - Poll::Ready(v) => v, - Poll::Pending => return Poll::Pending, - }; - - if let Some(highest) = frame - .to_float_frame() - .channels() - .find(|e| abs(e.clone()) > s.alltime_high) - { - s.alltime_high = highest; - } - - match s.open { - 0 => { - if frame - .to_float_frame() - .channels() - .any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample())) - { - s.open = s.deactivation_delay; - } - } - _ => { - if frame - .to_float_frame() - .channels() - .any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample())) - { - s.open = s.deactivation_delay; - } else { - s.open -= 1; - } - } - } - - if s.open != 0 { - Poll::Ready(frame) - } else { - Poll::Ready(<S::Frame as Frame>::EQUILIBRIUM) - } - } - - fn is_exhausted(&self) -> bool { - self.signal.is_exhausted() - } -} - -fn abs<S: SignedSample>(sample: S) -> S { - let zero = S::EQUILIBRIUM; - if sample >= zero { - sample - } else { - -sample - } -} - -pub trait StreamingSignal { - type Frame: Frame; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame>; - - fn is_exhausted(&self) -> bool { - false - } -} - -impl<S> StreamingSignal for S -where - S: Signal + Unpin, -{ - type Frame = S::Frame; - - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Frame> { - Poll::Ready(self.get_mut().next()) - } -} - -pub trait StreamingSignalExt: StreamingSignal { - fn next(&mut self) -> Next<'_, Self> { - Next { stream: self } - } - - fn into_interleaved_samples(self) -> IntoInterleavedSamples<Self> - where - Self: Sized, - { - IntoInterleavedSamples { - signal: self, - current_frame: None, - } - } -} - -impl<S> StreamingSignalExt for S where S: StreamingSignal {} - -pub struct Next<'a, S: ?Sized> { - stream: &'a mut S, -} - -impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> { - type Output = S::Frame; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - match S::poll_next(Pin::new(self.stream), cx) { - Poll::Ready(val) => Poll::Ready(val), - Poll::Pending => Poll::Pending, - } - } -} - -pub struct IntoInterleavedSamples<S: StreamingSignal> { - signal: S, - current_frame: Option<<S::Frame as Frame>::Channels>, -} - -impl<S> Stream for IntoInterleavedSamples<S> -where - S: StreamingSignal + Unpin, - <<S as StreamingSignal>::Frame as Frame>::Channels: Unpin, -{ - type Item = <S::Frame as Frame>::Sample; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let s = self.get_mut(); - loop { - if s.current_frame.is_some() { - if let Some(channel) = s.current_frame.as_mut().unwrap().next() { - return Poll::Ready(Some(channel)); - } - } - match S::poll_next(Pin::new(&mut s.signal), cx) { - Poll::Ready(val) => { - s.current_frame = Some(val.channels()); - } - Poll::Pending => return Poll::Pending, - } - } - } -} - -struct FromStream<S> { - stream: S, - underlying_exhausted: bool, -} - -impl<S> StreamingSignal for FromStream<S> -where - S: Stream + Unpin, - S::Item: Frame + Unpin, -{ - type Frame = S::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> { - let s = self.get_mut(); - if s.underlying_exhausted { - return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM); - } - match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(Some(val)) => Poll::Ready(val), - Poll::Ready(None) => { - s.underlying_exhausted = true; - return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM); - } - Poll::Pending => Poll::Pending, - } - } - - fn is_exhausted(&self) -> bool { - self.underlying_exhausted - } -} - -pub struct FromInterleavedSamplesStream<S, F> -where - F: Frame, -{ - stream: S, - next_buf: Vec<F::Sample>, - underlying_exhausted: bool, -} - -pub fn from_interleaved_samples_stream<S, F>(stream: S) -> FromInterleavedSamplesStream<S, F> -where - S: Stream + Unpin, - S::Item: Sample, - F: Frame<Sample = S::Item>, -{ - FromInterleavedSamplesStream { - stream, - next_buf: Vec::new(), - underlying_exhausted: false, - } -} - -impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F> -where - S: Stream + Unpin, - S::Item: Sample + Unpin, - F: Frame<Sample = S::Item> + Unpin, -{ - type Frame = F; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> { - let s = self.get_mut(); - if s.underlying_exhausted { - return Poll::Ready(F::EQUILIBRIUM); - } - while s.next_buf.len() < F::CHANNELS { - match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(Some(v)) => { - s.next_buf.push(v); - } - Poll::Ready(None) => { - s.underlying_exhausted = true; - return Poll::Ready(F::EQUILIBRIUM); - } - Poll::Pending => return Poll::Pending, - } - } - - let mut data = s.next_buf.iter().cloned(); - let n = F::from_samples(&mut data).unwrap(); - s.next_buf.clear(); - Poll::Ready(n) - } - - fn is_exhausted(&self) -> bool { - self.underlying_exhausted - } -} - -pub struct OpusEncoder<S> { - encoder: opus::Encoder, - frame_size: u32, - sample_rate: u32, - stream: S, - input_buffer: Vec<f32>, - exhausted: bool, -} - -impl<S, I> OpusEncoder<S> -where - S: Stream<Item = I>, - I: ToSample<f32>, -{ - pub fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self { - let encoder = opus::Encoder::new( - sample_rate, - match channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!("Only 1 or 2 channels supported, got {})", channels), - }, - opus::Application::Voip, - ) - .unwrap(); - Self { - encoder, - frame_size, - sample_rate, - stream, - input_buffer: Vec::new(), - exhausted: false, - } - } -} - -impl<S, I> Stream for OpusEncoder<S> -where - S: Stream<Item = I> + Unpin, - I: Sample + ToSample<f32>, -{ - type Item = Vec<u8>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let s = self.get_mut(); - if s.exhausted { - return Poll::Ready(None); - } - let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize; - loop { - while s.input_buffer.len() < opus_frame_size { - match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(Some(v)) => { - s.input_buffer.push(v.to_sample::<f32>()); - } - Poll::Ready(None) => { - s.exhausted = true; - return Poll::Ready(None); - } - Poll::Pending => return Poll::Pending, - } - } - if s.input_buffer.iter().any(|&e| e != 0.0) { - break; - } - s.input_buffer.clear(); - } - - let encoded = s - .encoder - .encode_vec_float(&s.input_buffer, opus_frame_size) - .unwrap(); - s.input_buffer.clear(); - Poll::Ready(Some(encoded)) - } -} diff --git a/mumd/src/audio/transformers.rs b/mumd/src/audio/transformers.rs new file mode 100644 index 0000000..25e28b8 --- /dev/null +++ b/mumd/src/audio/transformers.rs @@ -0,0 +1,49 @@ +/// A trait that represents a transform of a audio buffer in some way. +pub trait Transformer { + /// Do the transform. Returning `None` is interpreted as "the buffer is unwanted". + /// The implementor is free to modify the buffer however it wants to. + fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]>; +} + +/// A struct representing a noise gate transform. +pub struct NoiseGate { + alltime_high: f32, + open: usize, + deactivation_delay: usize, +} + +impl NoiseGate { + /// Create a new noise gate. `deactivation_delay` is defined in terms of + /// how many quiet frames it receives before closing the noise gate. + pub fn new(deactivation_delay: usize) -> Self { + Self { + alltime_high: 0.0, + open: 0, + deactivation_delay, + } + } +} + +impl Transformer for NoiseGate { + fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]> { + const MUTE_PERCENTAGE: f32 = 0.1; + + let max = buf.iter().map(|e| e.abs()).max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(); + + if max > self.alltime_high { + self.alltime_high = max; + } + + if max > self.alltime_high * MUTE_PERCENTAGE { + self.open = self.deactivation_delay; + } else if self.open > 0 { + self.open -= 1; + } + + if self.open == 0 { + None + } else { + Some(buf) + } + } +} |
