aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mumd/src/audio.rs39
-rw-r--r--mumd/src/audio/input.rs77
-rw-r--r--mumd/src/audio/noise_gate.rs350
-rw-r--r--mumd/src/audio/transformers.rs49
4 files changed, 128 insertions, 387 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index 63adcc6..2e20583 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -1,17 +1,13 @@
pub mod input;
-mod noise_gate;
pub mod output;
+pub mod transformers;
use crate::audio::input::{AudioInputDevice, DefaultAudioInputDevice};
-use crate::audio::noise_gate::{
- from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt,
-};
use crate::audio::output::{AudioOutputDevice, ClientStream, DefaultAudioOutputDevice};
use crate::error::AudioError;
use crate::network::VoiceStreamType;
use crate::state::StatePhase;
-use cpal::SampleRate;
use dasp_interpolate::linear::Linear;
use dasp_signal::{self as signal, Signal};
use futures_util::stream::Stream;
@@ -81,27 +77,20 @@ impl AudioInput {
input_volume: f32,
phase_watcher: watch::Receiver<StatePhase>,
) -> Result<Self, AudioError> {
- let mut default = DefaultAudioInputDevice::new(input_volume, phase_watcher)?;
- let sample_rate = SampleRate(SAMPLE_RATE);
+ let mut default = DefaultAudioInputDevice::new(input_volume, phase_watcher, 4)?;
- let opus_stream = OpusEncoder::new(
- 4,
- sample_rate.0,
- default.num_channels(),
- StreamingSignalExt::into_interleaved_samples(StreamingNoiseGate::new(
- from_interleaved_samples_stream::<_, f32>(default.sample_receiver()), //TODO group frames correctly
- 10_000,
- )),
- )
- .enumerate()
- .map(|(i, e)| VoicePacket::Audio {
- _dst: std::marker::PhantomData,
- target: 0, // normal speech
- session_id: (), // unused for server-bound packets
- seq_num: i as u64,
- payload: VoicePacketPayload::Opus(e.into(), false),
- position_info: None,
- });
+ let opus_stream = default
+ .sample_receiver()
+ .unwrap()
+ .enumerate()
+ .map(|(i, e)| VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0, // normal speech
+ session_id: (), // unused for server-bound packets
+ seq_num: i as u64,
+ payload: VoicePacketPayload::Opus(e.into(), false),
+ position_info: None,
+ });
default.play()?;
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
index e45ff27..a1227e3 100644
--- a/mumd/src/audio/input.rs
+++ b/mumd/src/audio/input.rs
@@ -4,24 +4,42 @@ use log::*;
use tokio::sync::watch;
use crate::audio::SAMPLE_RATE;
+use crate::audio::transformers::{NoiseGate, Transformer};
use crate::error::{AudioError, AudioStream};
use crate::state::StatePhase;
pub fn callback<T: Sample>(
- mut input_sender: futures_channel::mpsc::Sender<f32>,
+ mut input_sender: futures_channel::mpsc::Sender<Vec<u8>>,
+ mut transformers: Vec<Box<dyn Transformer + Send + 'static>>,
+ mut opus_encoder: opus::Encoder,
+ buffer_size: usize,
input_volume_receiver: watch::Receiver<f32>,
phase_watcher: watch::Receiver<StatePhase>,
) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static {
+ let mut buffer = Vec::with_capacity(buffer_size);
+
move |data: &[T], _info: &InputCallbackInfo| {
if !matches!(&*phase_watcher.borrow(), StatePhase::Connected(_)) {
return;
}
let input_volume = *input_volume_receiver.borrow();
- for sample in data.iter().map(|e| e.to_f32()).map(|e| e * input_volume) {
- if let Err(_e) = input_sender.try_send(sample) {
- warn!("Error sending audio: {}", _e);
+ let mut data = data.iter().map(|e| e.to_f32()).map(|e| e * input_volume);
+
+ while buffer.len() + data.len() > buffer_size {
+ buffer.extend(data.by_ref().take(buffer_size - buffer.len()));
+ let encoded = transformers
+ .iter_mut()
+ .try_fold(&mut buffer[..], |acc, e| e.transform(acc))
+ .map(|buf| opus_encoder.encode_vec_float(&*buf, buffer_size).unwrap());
+
+ if let Some(encoded) = encoded {
+ if let Err(e) = input_sender.try_send(encoded) {
+ warn!("Error sending audio: {}", e);
+ }
}
+ buffer.clear();
}
+ buffer.extend(data);
}
}
@@ -29,13 +47,13 @@ pub trait AudioInputDevice {
fn play(&self) -> Result<(), AudioError>;
fn pause(&self) -> Result<(), AudioError>;
fn set_volume(&self, volume: f32);
- fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver<f32>;
+ fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>>;
fn num_channels(&self) -> usize;
}
pub struct DefaultAudioInputDevice {
stream: cpal::Stream,
- sample_receiver: Option<futures_channel::mpsc::Receiver<f32>>,
+ sample_receiver: Option<futures_channel::mpsc::Receiver<Vec<u8>>>,
volume_sender: watch::Sender<f32>,
channels: u16,
}
@@ -44,6 +62,7 @@ impl DefaultAudioInputDevice {
pub fn new(
input_volume: f32,
phase_watcher: watch::Receiver<StatePhase>,
+ frame_size: u32, // blocks of 2.5 ms
) -> Result<Self, AudioError> {
let sample_rate = SampleRate(SAMPLE_RATE);
@@ -73,20 +92,55 @@ impl DefaultAudioInputDevice {
let (volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
+ let opus_encoder = opus::Encoder::new(
+ sample_rate.0,
+ match input_config.channels {
+ 1 => opus::Channels::Mono,
+ 2 => opus::Channels::Stereo,
+ _ => unimplemented!("Only 1 or 2 channels supported, got {}", input_config.channels),
+ },
+ opus::Application::Voip,
+ )
+ .unwrap();
+ let buffer_size = (sample_rate.0 * frame_size / 400) as usize;
+
+ let transformers = vec![Box::new(NoiseGate::new(50)) as Box<dyn Transformer + Send + 'static>];
+
let input_stream = match input_supported_sample_format {
SampleFormat::F32 => input_device.build_input_stream(
&input_config,
- callback::<f32>(sample_sender, input_volume_receiver, phase_watcher),
+ callback::<f32>(
+ sample_sender,
+ transformers,
+ opus_encoder,
+ buffer_size,
+ input_volume_receiver,
+ phase_watcher
+ ),
err_fn,
),
SampleFormat::I16 => input_device.build_input_stream(
&input_config,
- callback::<i16>(sample_sender, input_volume_receiver, phase_watcher),
+ callback::<i16>(
+ sample_sender,
+ transformers,
+ opus_encoder,
+ buffer_size,
+ input_volume_receiver,
+ phase_watcher
+ ),
err_fn,
),
SampleFormat::U16 => input_device.build_input_stream(
&input_config,
- callback::<u16>(sample_sender, input_volume_receiver, phase_watcher),
+ callback::<u16>(
+ sample_sender,
+ transformers,
+ opus_encoder,
+ buffer_size,
+ input_volume_receiver,
+ phase_watcher
+ ),
err_fn,
),
}
@@ -116,9 +170,8 @@ impl AudioInputDevice for DefaultAudioInputDevice {
fn set_volume(&self, volume: f32) {
self.volume_sender.send(volume).unwrap();
}
- fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver<f32> {
- let ret = self.sample_receiver.take();
- ret.unwrap()
+ fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>> {
+ self.sample_receiver.take()
}
fn num_channels(&self) -> usize {
self.channels as usize
diff --git a/mumd/src/audio/noise_gate.rs b/mumd/src/audio/noise_gate.rs
deleted file mode 100644
index bd1a262..0000000
--- a/mumd/src/audio/noise_gate.rs
+++ /dev/null
@@ -1,350 +0,0 @@
-use dasp_frame::Frame;
-use dasp_sample::{Sample, SignedSample, ToSample};
-use dasp_signal::Signal;
-use futures_util::stream::Stream;
-use opus::Channels;
-use std::future::Future;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-type FloatSample<S> = <<S as Frame>::Sample as Sample>::Float;
-
-pub struct StreamingNoiseGate<S: StreamingSignal> {
- open: usize,
- signal: S,
- deactivation_delay: usize,
- alltime_high: FloatSample<S::Frame>,
-}
-
-impl<S: StreamingSignal> StreamingNoiseGate<S> {
- pub fn new(signal: S, deactivation_delay: usize) -> StreamingNoiseGate<S> {
- Self {
- open: 0,
- signal,
- deactivation_delay,
- alltime_high: FloatSample::<S::Frame>::EQUILIBRIUM,
- }
- }
-}
-
-impl<S> StreamingSignal for StreamingNoiseGate<S>
-where
- S: StreamingSignal + Unpin,
- FloatSample<S::Frame>: Unpin,
- <S as StreamingSignal>::Frame: Unpin,
-{
- type Frame = S::Frame;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
- const MUTE_PERCENTAGE: f32 = 0.1;
-
- let s = self.get_mut();
-
- let frame = match S::poll_next(Pin::new(&mut s.signal), cx) {
- Poll::Ready(v) => v,
- Poll::Pending => return Poll::Pending,
- };
-
- if let Some(highest) = frame
- .to_float_frame()
- .channels()
- .find(|e| abs(e.clone()) > s.alltime_high)
- {
- s.alltime_high = highest;
- }
-
- match s.open {
- 0 => {
- if frame
- .to_float_frame()
- .channels()
- .any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample()))
- {
- s.open = s.deactivation_delay;
- }
- }
- _ => {
- if frame
- .to_float_frame()
- .channels()
- .any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample()))
- {
- s.open = s.deactivation_delay;
- } else {
- s.open -= 1;
- }
- }
- }
-
- if s.open != 0 {
- Poll::Ready(frame)
- } else {
- Poll::Ready(<S::Frame as Frame>::EQUILIBRIUM)
- }
- }
-
- fn is_exhausted(&self) -> bool {
- self.signal.is_exhausted()
- }
-}
-
-fn abs<S: SignedSample>(sample: S) -> S {
- let zero = S::EQUILIBRIUM;
- if sample >= zero {
- sample
- } else {
- -sample
- }
-}
-
-pub trait StreamingSignal {
- type Frame: Frame;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame>;
-
- fn is_exhausted(&self) -> bool {
- false
- }
-}
-
-impl<S> StreamingSignal for S
-where
- S: Signal + Unpin,
-{
- type Frame = S::Frame;
-
- fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Frame> {
- Poll::Ready(self.get_mut().next())
- }
-}
-
-pub trait StreamingSignalExt: StreamingSignal {
- fn next(&mut self) -> Next<'_, Self> {
- Next { stream: self }
- }
-
- fn into_interleaved_samples(self) -> IntoInterleavedSamples<Self>
- where
- Self: Sized,
- {
- IntoInterleavedSamples {
- signal: self,
- current_frame: None,
- }
- }
-}
-
-impl<S> StreamingSignalExt for S where S: StreamingSignal {}
-
-pub struct Next<'a, S: ?Sized> {
- stream: &'a mut S,
-}
-
-impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> {
- type Output = S::Frame;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match S::poll_next(Pin::new(self.stream), cx) {
- Poll::Ready(val) => Poll::Ready(val),
- Poll::Pending => Poll::Pending,
- }
- }
-}
-
-pub struct IntoInterleavedSamples<S: StreamingSignal> {
- signal: S,
- current_frame: Option<<S::Frame as Frame>::Channels>,
-}
-
-impl<S> Stream for IntoInterleavedSamples<S>
-where
- S: StreamingSignal + Unpin,
- <<S as StreamingSignal>::Frame as Frame>::Channels: Unpin,
-{
- type Item = <S::Frame as Frame>::Sample;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let s = self.get_mut();
- loop {
- if s.current_frame.is_some() {
- if let Some(channel) = s.current_frame.as_mut().unwrap().next() {
- return Poll::Ready(Some(channel));
- }
- }
- match S::poll_next(Pin::new(&mut s.signal), cx) {
- Poll::Ready(val) => {
- s.current_frame = Some(val.channels());
- }
- Poll::Pending => return Poll::Pending,
- }
- }
- }
-}
-
-struct FromStream<S> {
- stream: S,
- underlying_exhausted: bool,
-}
-
-impl<S> StreamingSignal for FromStream<S>
-where
- S: Stream + Unpin,
- S::Item: Frame + Unpin,
-{
- type Frame = S::Item;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
- let s = self.get_mut();
- if s.underlying_exhausted {
- return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM);
- }
- match S::poll_next(Pin::new(&mut s.stream), cx) {
- Poll::Ready(Some(val)) => Poll::Ready(val),
- Poll::Ready(None) => {
- s.underlying_exhausted = true;
- return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM);
- }
- Poll::Pending => Poll::Pending,
- }
- }
-
- fn is_exhausted(&self) -> bool {
- self.underlying_exhausted
- }
-}
-
-pub struct FromInterleavedSamplesStream<S, F>
-where
- F: Frame,
-{
- stream: S,
- next_buf: Vec<F::Sample>,
- underlying_exhausted: bool,
-}
-
-pub fn from_interleaved_samples_stream<S, F>(stream: S) -> FromInterleavedSamplesStream<S, F>
-where
- S: Stream + Unpin,
- S::Item: Sample,
- F: Frame<Sample = S::Item>,
-{
- FromInterleavedSamplesStream {
- stream,
- next_buf: Vec::new(),
- underlying_exhausted: false,
- }
-}
-
-impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F>
-where
- S: Stream + Unpin,
- S::Item: Sample + Unpin,
- F: Frame<Sample = S::Item> + Unpin,
-{
- type Frame = F;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
- let s = self.get_mut();
- if s.underlying_exhausted {
- return Poll::Ready(F::EQUILIBRIUM);
- }
- while s.next_buf.len() < F::CHANNELS {
- match S::poll_next(Pin::new(&mut s.stream), cx) {
- Poll::Ready(Some(v)) => {
- s.next_buf.push(v);
- }
- Poll::Ready(None) => {
- s.underlying_exhausted = true;
- return Poll::Ready(F::EQUILIBRIUM);
- }
- Poll::Pending => return Poll::Pending,
- }
- }
-
- let mut data = s.next_buf.iter().cloned();
- let n = F::from_samples(&mut data).unwrap();
- s.next_buf.clear();
- Poll::Ready(n)
- }
-
- fn is_exhausted(&self) -> bool {
- self.underlying_exhausted
- }
-}
-
-pub struct OpusEncoder<S> {
- encoder: opus::Encoder,
- frame_size: u32,
- sample_rate: u32,
- stream: S,
- input_buffer: Vec<f32>,
- exhausted: bool,
-}
-
-impl<S, I> OpusEncoder<S>
-where
- S: Stream<Item = I>,
- I: ToSample<f32>,
-{
- pub fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self {
- let encoder = opus::Encoder::new(
- sample_rate,
- match channels {
- 1 => Channels::Mono,
- 2 => Channels::Stereo,
- _ => unimplemented!("Only 1 or 2 channels supported, got {})", channels),
- },
- opus::Application::Voip,
- )
- .unwrap();
- Self {
- encoder,
- frame_size,
- sample_rate,
- stream,
- input_buffer: Vec::new(),
- exhausted: false,
- }
- }
-}
-
-impl<S, I> Stream for OpusEncoder<S>
-where
- S: Stream<Item = I> + Unpin,
- I: Sample + ToSample<f32>,
-{
- type Item = Vec<u8>;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let s = self.get_mut();
- if s.exhausted {
- return Poll::Ready(None);
- }
- let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize;
- loop {
- while s.input_buffer.len() < opus_frame_size {
- match S::poll_next(Pin::new(&mut s.stream), cx) {
- Poll::Ready(Some(v)) => {
- s.input_buffer.push(v.to_sample::<f32>());
- }
- Poll::Ready(None) => {
- s.exhausted = true;
- return Poll::Ready(None);
- }
- Poll::Pending => return Poll::Pending,
- }
- }
- if s.input_buffer.iter().any(|&e| e != 0.0) {
- break;
- }
- s.input_buffer.clear();
- }
-
- let encoded = s
- .encoder
- .encode_vec_float(&s.input_buffer, opus_frame_size)
- .unwrap();
- s.input_buffer.clear();
- Poll::Ready(Some(encoded))
- }
-}
diff --git a/mumd/src/audio/transformers.rs b/mumd/src/audio/transformers.rs
new file mode 100644
index 0000000..25e28b8
--- /dev/null
+++ b/mumd/src/audio/transformers.rs
@@ -0,0 +1,49 @@
+/// A trait that represents a transform of a audio buffer in some way.
+pub trait Transformer {
+ /// Do the transform. Returning `None` is interpreted as "the buffer is unwanted".
+ /// The implementor is free to modify the buffer however it wants to.
+ fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]>;
+}
+
+/// A struct representing a noise gate transform.
+pub struct NoiseGate {
+ alltime_high: f32,
+ open: usize,
+ deactivation_delay: usize,
+}
+
+impl NoiseGate {
+ /// Create a new noise gate. `deactivation_delay` is defined in terms of
+ /// how many quiet frames it receives before closing the noise gate.
+ pub fn new(deactivation_delay: usize) -> Self {
+ Self {
+ alltime_high: 0.0,
+ open: 0,
+ deactivation_delay,
+ }
+ }
+}
+
+impl Transformer for NoiseGate {
+ fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]> {
+ const MUTE_PERCENTAGE: f32 = 0.1;
+
+ let max = buf.iter().map(|e| e.abs()).max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap();
+
+ if max > self.alltime_high {
+ self.alltime_high = max;
+ }
+
+ if max > self.alltime_high * MUTE_PERCENTAGE {
+ self.open = self.deactivation_delay;
+ } else if self.open > 0 {
+ self.open -= 1;
+ }
+
+ if self.open == 0 {
+ None
+ } else {
+ Some(buf)
+ }
+ }
+}