diff options
| author | Eskil Q <eskilq@kth.se> | 2021-01-02 11:17:49 +0100 |
|---|---|---|
| committer | Eskil Q <eskilq@kth.se> | 2021-01-02 11:17:49 +0100 |
| commit | 76a3f1ea5489048e6d32982119429daa05dde3e0 (patch) | |
| tree | 856e08e0ff7f5335e96cfd6752bc55fcb512ea8f /mumd/src/audio.rs | |
| parent | 08e64c1b9d622026bcbe1f80d2d5d64dd80af8f9 (diff) | |
| download | mum-76a3f1ea5489048e6d32982119429daa05dde3e0.tar.gz | |
make audio sending use streams
Diffstat (limited to 'mumd/src/audio.rs')
| -rw-r--r-- | mumd/src/audio.rs | 71 |
1 files changed, 30 insertions, 41 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 0a2465e..a8af82d 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -13,9 +13,9 @@ use opus::Channels; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; use dasp_frame::Frame; -use dasp_sample::{Sample, SignedSample, ToSample}; +use dasp_sample::{SignedSample, ToSample, Sample}; use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; @@ -74,7 +74,7 @@ pub struct Audio { _output_stream: cpal::Stream, _input_stream: cpal::Stream, - input_channel_receiver: Option<mpsc::Receiver<VoicePacketPayload>>, + input_channel_receiver: Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>>, input_volume_sender: watch::Sender<f32>, output_volume_sender: watch::Sender<f32>, @@ -88,7 +88,7 @@ pub struct Audio { } impl Audio { - pub fn new(input_volume: f32, output_volume: f32) -> Self { + pub async fn new(input_volume: f32, output_volume: f32) -> Self { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); @@ -169,20 +169,7 @@ impl Audio { } .unwrap(); - let input_encoder = opus::Encoder::new( - input_config.sample_rate.0, - match input_config.channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!( - "Only 1 or 2 channels supported, got {})", - input_config.channels - ), - }, - opus::Application::Voip, - ) - .unwrap(); - let (input_sender, input_receiver) = mpsc::channel(100); + let (sample_sender, sample_receiver) = futures::channel::mpsc::channel(1_000_000); let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume); @@ -190,39 +177,37 @@ impl Audio { SampleFormat::F32 => input_device.build_input_stream( &input_config, input::callback::<f32>( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), SampleFormat::I16 => input_device.build_input_stream( &input_config, input::callback::<i16>( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), SampleFormat::U16 => input_device.build_input_stream( &input_config, input::callback::<u16>( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), } .unwrap(); + let opus_stream = OpusEncoder::new( + 4, + input_config.sample_rate.0, + input_config.channels as usize, + StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver).await)); //TODO attach a noise gate + //TODO group frames correctly + output_stream.play().unwrap(); let sounds = EVENT_SOUNDS @@ -246,7 +231,7 @@ impl Audio { _ => unimplemented!() // TODO handle gracefully (this might not even happen) }; let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter); - let interp = Linear::new(signal.next(), signal.next()); + let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal)); let samples = signal .from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64) .until_exhausted() @@ -262,7 +247,7 @@ impl Audio { _output_stream: output_stream, _input_stream: input_stream, input_volume_sender, - input_channel_receiver: Some(input_receiver), + input_channel_receiver: Some(Box::new(opus_stream)), client_streams, sounds, output_volume_sender, @@ -312,7 +297,7 @@ impl Audio { } } - pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<VoicePacketPayload>> { + pub fn take_receiver(&mut self) -> Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>> { self.input_channel_receiver.take() } @@ -530,6 +515,9 @@ trait StreamingSignalExt: StreamingSignal { } } +impl<S> StreamingSignalExt for S + where S: StreamingSignal {} + struct Next<'a, S: ?Sized> { stream: &'a mut S } @@ -561,16 +549,16 @@ impl<S> Stream for IntoInterleavedSamples<S> fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let s = self.get_mut(); loop { - if s.current_frame.is_none() { - match S::poll_next(Pin::new(&mut s.signal), cx) { - Poll::Ready(val) => { - s.current_frame = Some(val.channels()); - } - Poll::Pending => return Poll::Pending, + if s.current_frame.is_some() { + if let Some(channel) = s.current_frame.as_mut().unwrap().next() { + return Poll::Ready(Some(channel)); } } - if let Some(channel) = s.current_frame.as_mut().unwrap().next() { - return Poll::Ready(Some(channel)); + match S::poll_next(Pin::new(&mut s.signal), cx) { + Poll::Ready(val) => { + s.current_frame = Some(val.channels()); + } + Poll::Pending => return Poll::Pending, } } } @@ -681,6 +669,7 @@ impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F> let mut data = s.next_buf.iter().cloned(); let n = F::from_samples(&mut data).unwrap(); + s.next_buf.clear(); let ret = mem::replace(&mut s.next_frame, Some(n)).unwrap(); Poll::Ready(ret) } |
