aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/audio.rs
diff options
context:
space:
mode:
authorEskil Q <eskilq@kth.se>2021-01-02 11:17:49 +0100
committerEskil Q <eskilq@kth.se>2021-01-02 11:17:49 +0100
commit76a3f1ea5489048e6d32982119429daa05dde3e0 (patch)
tree856e08e0ff7f5335e96cfd6752bc55fcb512ea8f /mumd/src/audio.rs
parent08e64c1b9d622026bcbe1f80d2d5d64dd80af8f9 (diff)
downloadmum-76a3f1ea5489048e6d32982119429daa05dde3e0.tar.gz
make audio sending use streams
Diffstat (limited to 'mumd/src/audio.rs')
-rw-r--r--mumd/src/audio.rs71
1 files changed, 30 insertions, 41 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index 0a2465e..a8af82d 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -13,9 +13,9 @@ use opus::Channels;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
-use tokio::sync::{mpsc, watch};
+use tokio::sync::watch;
use dasp_frame::Frame;
-use dasp_sample::{Sample, SignedSample, ToSample};
+use dasp_sample::{SignedSample, ToSample, Sample};
use dasp_ring_buffer::Fixed;
use futures::Stream;
use futures::task::{Context, Poll};
@@ -74,7 +74,7 @@ pub struct Audio {
_output_stream: cpal::Stream,
_input_stream: cpal::Stream,
- input_channel_receiver: Option<mpsc::Receiver<VoicePacketPayload>>,
+ input_channel_receiver: Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>>,
input_volume_sender: watch::Sender<f32>,
output_volume_sender: watch::Sender<f32>,
@@ -88,7 +88,7 @@ pub struct Audio {
}
impl Audio {
- pub fn new(input_volume: f32, output_volume: f32) -> Self {
+ pub async fn new(input_volume: f32, output_volume: f32) -> Self {
let sample_rate = SampleRate(SAMPLE_RATE);
let host = cpal::default_host();
@@ -169,20 +169,7 @@ impl Audio {
}
.unwrap();
- let input_encoder = opus::Encoder::new(
- input_config.sample_rate.0,
- match input_config.channels {
- 1 => Channels::Mono,
- 2 => Channels::Stereo,
- _ => unimplemented!(
- "Only 1 or 2 channels supported, got {})",
- input_config.channels
- ),
- },
- opus::Application::Voip,
- )
- .unwrap();
- let (input_sender, input_receiver) = mpsc::channel(100);
+ let (sample_sender, sample_receiver) = futures::channel::mpsc::channel(1_000_000);
let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
@@ -190,39 +177,37 @@ impl Audio {
SampleFormat::F32 => input_device.build_input_stream(
&input_config,
input::callback::<f32>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
SampleFormat::I16 => input_device.build_input_stream(
&input_config,
input::callback::<i16>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
SampleFormat::U16 => input_device.build_input_stream(
&input_config,
input::callback::<u16>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
}
.unwrap();
+ let opus_stream = OpusEncoder::new(
+ 4,
+ input_config.sample_rate.0,
+ input_config.channels as usize,
+ StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver).await)); //TODO attach a noise gate
+ //TODO group frames correctly
+
output_stream.play().unwrap();
let sounds = EVENT_SOUNDS
@@ -246,7 +231,7 @@ impl Audio {
_ => unimplemented!() // TODO handle gracefully (this might not even happen)
};
let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter);
- let interp = Linear::new(signal.next(), signal.next());
+ let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal));
let samples = signal
.from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64)
.until_exhausted()
@@ -262,7 +247,7 @@ impl Audio {
_output_stream: output_stream,
_input_stream: input_stream,
input_volume_sender,
- input_channel_receiver: Some(input_receiver),
+ input_channel_receiver: Some(Box::new(opus_stream)),
client_streams,
sounds,
output_volume_sender,
@@ -312,7 +297,7 @@ impl Audio {
}
}
- pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<VoicePacketPayload>> {
+ pub fn take_receiver(&mut self) -> Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>> {
self.input_channel_receiver.take()
}
@@ -530,6 +515,9 @@ trait StreamingSignalExt: StreamingSignal {
}
}
+impl<S> StreamingSignalExt for S
+ where S: StreamingSignal {}
+
struct Next<'a, S: ?Sized> {
stream: &'a mut S
}
@@ -561,16 +549,16 @@ impl<S> Stream for IntoInterleavedSamples<S>
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let s = self.get_mut();
loop {
- if s.current_frame.is_none() {
- match S::poll_next(Pin::new(&mut s.signal), cx) {
- Poll::Ready(val) => {
- s.current_frame = Some(val.channels());
- }
- Poll::Pending => return Poll::Pending,
+ if s.current_frame.is_some() {
+ if let Some(channel) = s.current_frame.as_mut().unwrap().next() {
+ return Poll::Ready(Some(channel));
}
}
- if let Some(channel) = s.current_frame.as_mut().unwrap().next() {
- return Poll::Ready(Some(channel));
+ match S::poll_next(Pin::new(&mut s.signal), cx) {
+ Poll::Ready(val) => {
+ s.current_frame = Some(val.channels());
+ }
+ Poll::Pending => return Poll::Pending,
}
}
}
@@ -681,6 +669,7 @@ impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F>
let mut data = s.next_buf.iter().cloned();
let n = F::from_samples(&mut data).unwrap();
+ s.next_buf.clear();
let ret = mem::replace(&mut s.next_frame, Some(n)).unwrap();
Poll::Ready(ret)
}