aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/audio
diff options
context:
space:
mode:
authorEskil Queseth <eskilq@kth.se>2021-06-12 02:30:01 +0200
committerEskil Queseth <eskilq@kth.se>2021-06-12 02:30:01 +0200
commitbc65445af44a335a0586a393c792614330258249 (patch)
treefd0ba08e7a1748076f04ce3d5678b960c5aaa4a7 /mumd/src/audio
parentdcd70175a98c83a3334d7980e5196bc866e04efb (diff)
downloadmum-bc65445af44a335a0586a393c792614330258249.tar.gz
simplify audio output infrastructure
Diffstat (limited to 'mumd/src/audio')
-rw-r--r--mumd/src/audio/input.rs79
-rw-r--r--mumd/src/audio/noise_gate.rs350
2 files changed, 67 insertions, 362 deletions
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
index e45ff27..162dd2c 100644
--- a/mumd/src/audio/input.rs
+++ b/mumd/src/audio/input.rs
@@ -8,20 +8,49 @@ use crate::error::{AudioError, AudioStream};
use crate::state::StatePhase;
pub fn callback<T: Sample>(
- mut input_sender: futures_channel::mpsc::Sender<f32>,
+ mut input_sender: futures_channel::mpsc::Sender<Vec<u8>>,
+ transformers: Vec<Box<dyn Fn(&mut [f32]) -> Option<&mut [f32]> + Send + 'static>>,
+ frame_size: u32,
+ sample_rate: u32,
+ channels: u16,
input_volume_receiver: watch::Receiver<f32>,
phase_watcher: watch::Receiver<StatePhase>,
) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static {
+ let buffer_size = (sample_rate * frame_size / 400) as usize;
+ let mut opus_encoder = opus::Encoder::new(
+ sample_rate,
+ match channels {
+ 1 => opus::Channels::Mono,
+ 2 => opus::Channels::Stereo,
+ _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels),
+ },
+ opus::Application::Voip,
+ )
+ .unwrap();
+ let mut buffer = Vec::with_capacity(buffer_size);
+
move |data: &[T], _info: &InputCallbackInfo| {
if !matches!(&*phase_watcher.borrow(), StatePhase::Connected(_)) {
return;
}
let input_volume = *input_volume_receiver.borrow();
- for sample in data.iter().map(|e| e.to_f32()).map(|e| e * input_volume) {
- if let Err(_e) = input_sender.try_send(sample) {
- warn!("Error sending audio: {}", _e);
+ let mut data = data.iter().map(|e| e.to_f32()).map(|e| e * input_volume);
+
+ while buffer.len() + data.len() > buffer_size {
+ buffer.extend(data.by_ref().take(buffer_size - buffer.len()));
+ let encoded = transformers
+ .iter()
+ .try_fold(&mut buffer[..], |acc, e| e(acc))
+ .map(|buf| opus_encoder.encode_vec_float(&*buf, buffer_size).unwrap());
+
+ if let Some(encoded) = encoded {
+ if let Err(e) = input_sender.try_send(encoded) {
+ warn!("Error sending audio: {}", e);
+ }
}
+ buffer.clear();
}
+ buffer.extend(data);
}
}
@@ -29,13 +58,13 @@ pub trait AudioInputDevice {
fn play(&self) -> Result<(), AudioError>;
fn pause(&self) -> Result<(), AudioError>;
fn set_volume(&self, volume: f32);
- fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver<f32>;
+ fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>>;
fn num_channels(&self) -> usize;
}
pub struct DefaultAudioInputDevice {
stream: cpal::Stream,
- sample_receiver: Option<futures_channel::mpsc::Receiver<f32>>,
+ sample_receiver: Option<futures_channel::mpsc::Receiver<Vec<u8>>>,
volume_sender: watch::Sender<f32>,
channels: u16,
}
@@ -44,6 +73,7 @@ impl DefaultAudioInputDevice {
pub fn new(
input_volume: f32,
phase_watcher: watch::Receiver<StatePhase>,
+ frame_size: u32,
) -> Result<Self, AudioError> {
let sample_rate = SampleRate(SAMPLE_RATE);
@@ -73,20 +103,46 @@ impl DefaultAudioInputDevice {
let (volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
+ let transformers = vec![];
+
let input_stream = match input_supported_sample_format {
SampleFormat::F32 => input_device.build_input_stream(
&input_config,
- callback::<f32>(sample_sender, input_volume_receiver, phase_watcher),
+ callback::<f32>(
+ sample_sender,
+ transformers,
+ frame_size,
+ SAMPLE_RATE,
+ input_config.channels,
+ input_volume_receiver,
+ phase_watcher
+ ),
err_fn,
),
SampleFormat::I16 => input_device.build_input_stream(
&input_config,
- callback::<i16>(sample_sender, input_volume_receiver, phase_watcher),
+ callback::<i16>(
+ sample_sender,
+ transformers,
+ frame_size,
+ SAMPLE_RATE,
+ input_config.channels,
+ input_volume_receiver,
+ phase_watcher
+ ),
err_fn,
),
SampleFormat::U16 => input_device.build_input_stream(
&input_config,
- callback::<u16>(sample_sender, input_volume_receiver, phase_watcher),
+ callback::<u16>(
+ sample_sender,
+ transformers,
+ frame_size,
+ SAMPLE_RATE,
+ input_config.channels,
+ input_volume_receiver,
+ phase_watcher
+ ),
err_fn,
),
}
@@ -116,9 +172,8 @@ impl AudioInputDevice for DefaultAudioInputDevice {
fn set_volume(&self, volume: f32) {
self.volume_sender.send(volume).unwrap();
}
- fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver<f32> {
- let ret = self.sample_receiver.take();
- ret.unwrap()
+ fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>> {
+ self.sample_receiver.take()
}
fn num_channels(&self) -> usize {
self.channels as usize
diff --git a/mumd/src/audio/noise_gate.rs b/mumd/src/audio/noise_gate.rs
index bd1a262..e69de29 100644
--- a/mumd/src/audio/noise_gate.rs
+++ b/mumd/src/audio/noise_gate.rs
@@ -1,350 +0,0 @@
-use dasp_frame::Frame;
-use dasp_sample::{Sample, SignedSample, ToSample};
-use dasp_signal::Signal;
-use futures_util::stream::Stream;
-use opus::Channels;
-use std::future::Future;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-type FloatSample<S> = <<S as Frame>::Sample as Sample>::Float;
-
-pub struct StreamingNoiseGate<S: StreamingSignal> {
- open: usize,
- signal: S,
- deactivation_delay: usize,
- alltime_high: FloatSample<S::Frame>,
-}
-
-impl<S: StreamingSignal> StreamingNoiseGate<S> {
- pub fn new(signal: S, deactivation_delay: usize) -> StreamingNoiseGate<S> {
- Self {
- open: 0,
- signal,
- deactivation_delay,
- alltime_high: FloatSample::<S::Frame>::EQUILIBRIUM,
- }
- }
-}
-
-impl<S> StreamingSignal for StreamingNoiseGate<S>
-where
- S: StreamingSignal + Unpin,
- FloatSample<S::Frame>: Unpin,
- <S as StreamingSignal>::Frame: Unpin,
-{
- type Frame = S::Frame;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
- const MUTE_PERCENTAGE: f32 = 0.1;
-
- let s = self.get_mut();
-
- let frame = match S::poll_next(Pin::new(&mut s.signal), cx) {
- Poll::Ready(v) => v,
- Poll::Pending => return Poll::Pending,
- };
-
- if let Some(highest) = frame
- .to_float_frame()
- .channels()
- .find(|e| abs(e.clone()) > s.alltime_high)
- {
- s.alltime_high = highest;
- }
-
- match s.open {
- 0 => {
- if frame
- .to_float_frame()
- .channels()
- .any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample()))
- {
- s.open = s.deactivation_delay;
- }
- }
- _ => {
- if frame
- .to_float_frame()
- .channels()
- .any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample()))
- {
- s.open = s.deactivation_delay;
- } else {
- s.open -= 1;
- }
- }
- }
-
- if s.open != 0 {
- Poll::Ready(frame)
- } else {
- Poll::Ready(<S::Frame as Frame>::EQUILIBRIUM)
- }
- }
-
- fn is_exhausted(&self) -> bool {
- self.signal.is_exhausted()
- }
-}
-
-fn abs<S: SignedSample>(sample: S) -> S {
- let zero = S::EQUILIBRIUM;
- if sample >= zero {
- sample
- } else {
- -sample
- }
-}
-
-pub trait StreamingSignal {
- type Frame: Frame;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame>;
-
- fn is_exhausted(&self) -> bool {
- false
- }
-}
-
-impl<S> StreamingSignal for S
-where
- S: Signal + Unpin,
-{
- type Frame = S::Frame;
-
- fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Frame> {
- Poll::Ready(self.get_mut().next())
- }
-}
-
-pub trait StreamingSignalExt: StreamingSignal {
- fn next(&mut self) -> Next<'_, Self> {
- Next { stream: self }
- }
-
- fn into_interleaved_samples(self) -> IntoInterleavedSamples<Self>
- where
- Self: Sized,
- {
- IntoInterleavedSamples {
- signal: self,
- current_frame: None,
- }
- }
-}
-
-impl<S> StreamingSignalExt for S where S: StreamingSignal {}
-
-pub struct Next<'a, S: ?Sized> {
- stream: &'a mut S,
-}
-
-impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> {
- type Output = S::Frame;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match S::poll_next(Pin::new(self.stream), cx) {
- Poll::Ready(val) => Poll::Ready(val),
- Poll::Pending => Poll::Pending,
- }
- }
-}
-
-pub struct IntoInterleavedSamples<S: StreamingSignal> {
- signal: S,
- current_frame: Option<<S::Frame as Frame>::Channels>,
-}
-
-impl<S> Stream for IntoInterleavedSamples<S>
-where
- S: StreamingSignal + Unpin,
- <<S as StreamingSignal>::Frame as Frame>::Channels: Unpin,
-{
- type Item = <S::Frame as Frame>::Sample;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let s = self.get_mut();
- loop {
- if s.current_frame.is_some() {
- if let Some(channel) = s.current_frame.as_mut().unwrap().next() {
- return Poll::Ready(Some(channel));
- }
- }
- match S::poll_next(Pin::new(&mut s.signal), cx) {
- Poll::Ready(val) => {
- s.current_frame = Some(val.channels());
- }
- Poll::Pending => return Poll::Pending,
- }
- }
- }
-}
-
-struct FromStream<S> {
- stream: S,
- underlying_exhausted: bool,
-}
-
-impl<S> StreamingSignal for FromStream<S>
-where
- S: Stream + Unpin,
- S::Item: Frame + Unpin,
-{
- type Frame = S::Item;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
- let s = self.get_mut();
- if s.underlying_exhausted {
- return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM);
- }
- match S::poll_next(Pin::new(&mut s.stream), cx) {
- Poll::Ready(Some(val)) => Poll::Ready(val),
- Poll::Ready(None) => {
- s.underlying_exhausted = true;
- return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM);
- }
- Poll::Pending => Poll::Pending,
- }
- }
-
- fn is_exhausted(&self) -> bool {
- self.underlying_exhausted
- }
-}
-
-pub struct FromInterleavedSamplesStream<S, F>
-where
- F: Frame,
-{
- stream: S,
- next_buf: Vec<F::Sample>,
- underlying_exhausted: bool,
-}
-
-pub fn from_interleaved_samples_stream<S, F>(stream: S) -> FromInterleavedSamplesStream<S, F>
-where
- S: Stream + Unpin,
- S::Item: Sample,
- F: Frame<Sample = S::Item>,
-{
- FromInterleavedSamplesStream {
- stream,
- next_buf: Vec::new(),
- underlying_exhausted: false,
- }
-}
-
-impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F>
-where
- S: Stream + Unpin,
- S::Item: Sample + Unpin,
- F: Frame<Sample = S::Item> + Unpin,
-{
- type Frame = F;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
- let s = self.get_mut();
- if s.underlying_exhausted {
- return Poll::Ready(F::EQUILIBRIUM);
- }
- while s.next_buf.len() < F::CHANNELS {
- match S::poll_next(Pin::new(&mut s.stream), cx) {
- Poll::Ready(Some(v)) => {
- s.next_buf.push(v);
- }
- Poll::Ready(None) => {
- s.underlying_exhausted = true;
- return Poll::Ready(F::EQUILIBRIUM);
- }
- Poll::Pending => return Poll::Pending,
- }
- }
-
- let mut data = s.next_buf.iter().cloned();
- let n = F::from_samples(&mut data).unwrap();
- s.next_buf.clear();
- Poll::Ready(n)
- }
-
- fn is_exhausted(&self) -> bool {
- self.underlying_exhausted
- }
-}
-
-pub struct OpusEncoder<S> {
- encoder: opus::Encoder,
- frame_size: u32,
- sample_rate: u32,
- stream: S,
- input_buffer: Vec<f32>,
- exhausted: bool,
-}
-
-impl<S, I> OpusEncoder<S>
-where
- S: Stream<Item = I>,
- I: ToSample<f32>,
-{
- pub fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self {
- let encoder = opus::Encoder::new(
- sample_rate,
- match channels {
- 1 => Channels::Mono,
- 2 => Channels::Stereo,
- _ => unimplemented!("Only 1 or 2 channels supported, got {})", channels),
- },
- opus::Application::Voip,
- )
- .unwrap();
- Self {
- encoder,
- frame_size,
- sample_rate,
- stream,
- input_buffer: Vec::new(),
- exhausted: false,
- }
- }
-}
-
-impl<S, I> Stream for OpusEncoder<S>
-where
- S: Stream<Item = I> + Unpin,
- I: Sample + ToSample<f32>,
-{
- type Item = Vec<u8>;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let s = self.get_mut();
- if s.exhausted {
- return Poll::Ready(None);
- }
- let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize;
- loop {
- while s.input_buffer.len() < opus_frame_size {
- match S::poll_next(Pin::new(&mut s.stream), cx) {
- Poll::Ready(Some(v)) => {
- s.input_buffer.push(v.to_sample::<f32>());
- }
- Poll::Ready(None) => {
- s.exhausted = true;
- return Poll::Ready(None);
- }
- Poll::Pending => return Poll::Pending,
- }
- }
- if s.input_buffer.iter().any(|&e| e != 0.0) {
- break;
- }
- s.input_buffer.clear();
- }
-
- let encoded = s
- .encoder
- .encode_vec_float(&s.input_buffer, opus_frame_size)
- .unwrap();
- s.input_buffer.clear();
- Poll::Ready(Some(encoded))
- }
-}