aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/audio.rs
diff options
context:
space:
mode:
Diffstat (limited to 'mumd/src/audio.rs')
-rw-r--r--mumd/src/audio.rs396
1 files changed, 360 insertions, 36 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index 0820147..83818d5 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -4,16 +4,25 @@ pub mod output;
use crate::audio::output::SaturatingAdd;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
-use cpal::{SampleFormat, SampleRate, Stream, StreamConfig};
+use cpal::{SampleFormat, SampleRate, StreamConfig};
+use dasp_frame::Frame;
use dasp_interpolate::linear::Linear;
+use dasp_sample::{SignedSample, ToSample, Sample};
use dasp_signal::{self as signal, Signal};
+use futures::Stream;
+use futures::stream::StreamExt;
+use futures::task::{Context, Poll};
use log::*;
-use mumble_protocol::voice::VoicePacketPayload;
+use mumble_protocol::voice::{VoicePacketPayload, VoicePacket};
use opus::Channels;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
+use std::fmt::Debug;
+use std::future::{Future};
+use std::pin::Pin;
use std::sync::{Arc, Mutex};
-use tokio::sync::{mpsc, watch};
+use tokio::sync::watch;
+use mumble_protocol::Serverbound;
//TODO? move to mumlib
pub const EVENT_SOUNDS: &[(&[u8], NotificationEvents)] = &[
@@ -62,10 +71,10 @@ pub enum NotificationEvents {
pub struct Audio {
output_config: StreamConfig,
- _output_stream: Stream,
- _input_stream: Stream,
+ _output_stream: cpal::Stream,
+ _input_stream: cpal::Stream,
- input_channel_receiver: Option<mpsc::Receiver<VoicePacketPayload>>,
+ input_channel_receiver: Arc<Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>,
input_volume_sender: watch::Sender<f32>,
output_volume_sender: watch::Sender<f32>,
@@ -160,20 +169,7 @@ impl Audio {
}
.unwrap();
- let input_encoder = opus::Encoder::new(
- input_config.sample_rate.0,
- match input_config.channels {
- 1 => Channels::Mono,
- 2 => Channels::Stereo,
- _ => unimplemented!(
- "Only 1 or 2 channels supported, got {})",
- input_config.channels
- ),
- },
- opus::Application::Voip,
- )
- .unwrap();
- let (input_sender, input_receiver) = mpsc::channel(100);
+ let (sample_sender, sample_receiver) = futures::channel::mpsc::channel(1_000_000);
let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
@@ -181,39 +177,47 @@ impl Audio {
SampleFormat::F32 => input_device.build_input_stream(
&input_config,
input::callback::<f32>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
SampleFormat::I16 => input_device.build_input_stream(
&input_config,
input::callback::<i16>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
SampleFormat::U16 => input_device.build_input_stream(
&input_config,
input::callback::<u16>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
}
.unwrap();
+ let opus_stream = OpusEncoder::new(
+ 4,
+ input_config.sample_rate.0,
+ input_config.channels as usize,
+ StreamingSignalExt::into_interleaved_samples(
+ StreamingNoiseGate::new(
+ from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly
+ 0.09,
+ 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0, // normal speech
+ session_id: (), // unused for server-bound packets
+ seq_num: i as u64,
+ payload: VoicePacketPayload::Opus(e.into(), false),
+ position_info: None,
+ });
+
output_stream.play().unwrap();
let sounds = EVENT_SOUNDS
@@ -237,7 +241,7 @@ impl Audio {
_ => unimplemented!() // TODO handle gracefully (this might not even happen)
};
let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter);
- let interp = Linear::new(signal.next(), signal.next());
+ let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal));
let samples = signal
.from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64)
.until_exhausted()
@@ -253,7 +257,7 @@ impl Audio {
_output_stream: output_stream,
_input_stream: input_stream,
input_volume_sender,
- input_channel_receiver: Some(input_receiver),
+ input_channel_receiver: Arc::new(Mutex::new(Box::new(opus_stream))),
client_streams,
sounds,
output_volume_sender,
@@ -303,8 +307,8 @@ impl Audio {
}
}
- pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<VoicePacketPayload>> {
- self.input_channel_receiver.take()
+ pub fn take_receiver(&mut self) -> Arc<Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> {
+ Arc::clone(&self.input_channel_receiver)
}
pub fn clear_clients(&mut self) {
@@ -354,3 +358,323 @@ impl Audio {
play_sounds.extend(samples.iter().skip(l));
}
}
+
+struct StreamingNoiseGate<S: StreamingSignal> {
+ open: usize,
+ signal: S,
+ activate_threshold: <<S::Frame as Frame>::Sample as Sample>::Float,
+ deactivation_delay: usize,
+}
+
+impl<S: StreamingSignal> StreamingNoiseGate<S> {
+ pub fn new(
+ signal: S,
+ activate_threshold: <<S::Frame as Frame>::Sample as Sample>::Float,
+ deactivation_delay: usize,
+ ) -> StreamingNoiseGate<S> {
+ Self {
+ open: 0,
+ signal,
+ activate_threshold,
+ deactivation_delay
+ }
+ }
+}
+
+impl<S> StreamingSignal for StreamingNoiseGate<S>
+ where
+ S: StreamingSignal + Unpin,
+ <<<S as StreamingSignal>::Frame as Frame>::Sample as Sample>::Float: Unpin,
+ <S as StreamingSignal>::Frame: Unpin {
+ type Frame = S::Frame;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
+ let s = self.get_mut();
+
+ let frame = match S::poll_next(Pin::new(&mut s.signal), cx) {
+ Poll::Ready(v) => v,
+ Poll::Pending => return Poll::Pending,
+ };
+
+ match s.open {
+ 0 => {
+ if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) {
+ s.open = s.deactivation_delay;
+ }
+ }
+ _ => {
+ if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) {
+ s.open = s.deactivation_delay;
+ } else {
+ s.open -= 1;
+ }
+ }
+ }
+
+ if s.open != 0 {
+ Poll::Ready(frame)
+ } else {
+ Poll::Ready(<S::Frame as Frame>::EQUILIBRIUM)
+ }
+ }
+
+ fn is_exhausted(&self) -> bool {
+ self.signal.is_exhausted()
+ }
+}
+
+fn abs<S: SignedSample>(sample: S) -> S {
+ let zero = S::EQUILIBRIUM;
+ if sample >= zero {
+ sample
+ } else {
+ -sample
+ }
+}
+
+trait StreamingSignal {
+ type Frame: Frame;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame>;
+
+ fn is_exhausted(&self) -> bool {
+ false
+ }
+}
+
+impl<S> StreamingSignal for S
+ where
+ S: Signal + Unpin {
+ type Frame = S::Frame;
+
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Frame> {
+ Poll::Ready(self.get_mut().next())
+ }
+}
+
+trait StreamingSignalExt: StreamingSignal {
+ fn next(&mut self) -> Next<'_, Self> {
+ Next {
+ stream: self
+ }
+ }
+
+ fn into_interleaved_samples(self) -> IntoInterleavedSamples<Self>
+ where
+ Self: Sized {
+ IntoInterleavedSamples { signal: self, current_frame: None }
+ }
+}
+
+impl<S> StreamingSignalExt for S
+ where S: StreamingSignal {}
+
+struct Next<'a, S: ?Sized> {
+ stream: &'a mut S
+}
+
+impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> {
+ type Output = S::Frame;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match S::poll_next(Pin::new(self.stream), cx) {
+ Poll::Ready(val) => {
+ Poll::Ready(val)
+ }
+ Poll::Pending => Poll::Pending
+ }
+ }
+}
+
+struct IntoInterleavedSamples<S: StreamingSignal> {
+ signal: S,
+ current_frame: Option<<S::Frame as Frame>::Channels>,
+}
+
+impl<S> Stream for IntoInterleavedSamples<S>
+ where
+ S: StreamingSignal + Unpin,
+ <<S as StreamingSignal>::Frame as Frame>::Channels: Unpin {
+ type Item = <S::Frame as Frame>::Sample;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let s = self.get_mut();
+ loop {
+ if s.current_frame.is_some() {
+ if let Some(channel) = s.current_frame.as_mut().unwrap().next() {
+ return Poll::Ready(Some(channel));
+ }
+ }
+ match S::poll_next(Pin::new(&mut s.signal), cx) {
+ Poll::Ready(val) => {
+ s.current_frame = Some(val.channels());
+ }
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ }
+}
+
+struct FromStream<S> {
+ stream: S,
+ underlying_exhausted: bool,
+}
+
+impl<S> StreamingSignal for FromStream<S>
+ where
+ S: Stream + Unpin,
+ S::Item: Frame + Unpin {
+ type Frame = S::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
+ let s = self.get_mut();
+ if s.underlying_exhausted {
+ return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM);
+ }
+ match S::poll_next(Pin::new(&mut s.stream), cx) {
+ Poll::Ready(Some(val)) => {
+ Poll::Ready(val)
+ }
+ Poll::Ready(None) => {
+ s.underlying_exhausted = true;
+ return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM);
+ }
+ Poll::Pending => Poll::Pending,
+ }
+ }
+
+ fn is_exhausted(&self) -> bool {
+ self.underlying_exhausted
+ }
+}
+
+
+struct FromInterleavedSamplesStream<S, F>
+ where
+ F: Frame {
+ stream: S,
+ next_buf: Vec<F::Sample>,
+ underlying_exhausted: bool,
+}
+
+fn from_interleaved_samples_stream<S, F>(stream: S) -> FromInterleavedSamplesStream<S, F>
+ where
+ S: Stream + Unpin,
+ S::Item: Sample,
+ F: Frame<Sample = S::Item> {
+ FromInterleavedSamplesStream {
+ stream,
+ next_buf: Vec::new(),
+ underlying_exhausted: false,
+ }
+}
+
+impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F>
+ where
+ S: Stream + Unpin,
+ S::Item: Sample + Unpin,
+ F: Frame<Sample = S::Item> + Unpin {
+ type Frame = F;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
+ let s = self.get_mut();
+ if s.underlying_exhausted {
+ return Poll::Ready(F::EQUILIBRIUM);
+ }
+ while s.next_buf.len() < F::CHANNELS {
+ match S::poll_next(Pin::new(&mut s.stream), cx) {
+ Poll::Ready(Some(v)) => {
+ s.next_buf.push(v);
+ }
+ Poll::Ready(None) => {
+ s.underlying_exhausted = true;
+ return Poll::Ready(F::EQUILIBRIUM);
+ }
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+
+ let mut data = s.next_buf.iter().cloned();
+ let n = F::from_samples(&mut data).unwrap();
+ s.next_buf.clear();
+ Poll::Ready(n)
+ }
+
+ fn is_exhausted(&self) -> bool {
+ self.underlying_exhausted
+ }
+}
+
+struct OpusEncoder<S> {
+ encoder: opus::Encoder,
+ frame_size: u32,
+ sample_rate: u32,
+ stream: S,
+ input_buffer: Vec<f32>,
+ exhausted: bool,
+}
+
+impl<S, I> OpusEncoder<S>
+ where
+ S: Stream<Item = I>,
+ I: ToSample<f32> {
+ fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self {
+ let encoder = opus::Encoder::new(
+ sample_rate,
+ match channels {
+ 1 => Channels::Mono,
+ 2 => Channels::Stereo,
+ _ => unimplemented!(
+ "Only 1 or 2 channels supported, got {})",
+ channels
+ ),
+ },
+ opus::Application::Voip,
+ ).unwrap();
+ Self {
+ encoder,
+ frame_size,
+ sample_rate,
+ stream,
+ input_buffer: Vec::new(),
+ exhausted: false,
+ }
+ }
+}
+
+impl<S, I> Stream for OpusEncoder<S>
+ where
+ S: Stream<Item = I> + Unpin,
+ I: Sample + ToSample<f32> {
+ type Item = Vec<u8>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let s = self.get_mut();
+ if s.exhausted {
+ return Poll::Ready(None);
+ }
+ let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize;
+ loop {
+ while s.input_buffer.len() < opus_frame_size {
+ match S::poll_next(Pin::new(&mut s.stream), cx) {
+ Poll::Ready(Some(v)) => {
+ s.input_buffer.push(v.to_sample::<f32>());
+ }
+ Poll::Ready(None) => {
+ s.exhausted = true;
+ return Poll::Ready(None);
+ }
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ if s.input_buffer.iter().any(|&e| e != 0.0) {
+ break;
+ }
+ s.input_buffer.clear();
+ }
+
+ let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap();
+ s.input_buffer.clear();
+ Poll::Ready(Some(encoded))
+ }
+}