From 22481e0fd03102d2fa01a5025049c10a5539e356 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 08:52:55 +0100 Subject: fix from_interleaved_samples_iter --- mumd/src/audio.rs | 50 ++++++++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index cd6141a..7d4929c 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -21,9 +21,7 @@ use futures::Stream; use futures::task::{Context, Poll}; use std::pin::Pin; use tokio_stream::StreamExt; -use std::convert::identity; use std::future::Future; -use std::mem; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -509,20 +507,20 @@ struct FromInterleavedSamplesStream where F: Frame { stream: S, - next: Option>, + next: Vec, + underlying_exhausted: bool, } -async fn from_interleaved_samples_stream(mut stream: S) -> FromInterleavedSamplesStream +async fn from_interleaved_samples_stream(stream: S) -> FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample, F: Frame { - let mut data = Vec::with_capacity(F::CHANNELS); - for _ in 0..F::CHANNELS { - data.push(stream.next().await); + FromInterleavedSamplesStream { + stream, + next: Vec::new(), + underlying_exhausted: false } - let data = data.into_iter().flat_map(identity).collect::>(); - FromInterleavedSamplesStream { stream, next: if data.len() == F::CHANNELS { Some(data) } else { None } } } impl StreamingSignal for FromInterleavedSamplesStream @@ -534,26 +532,24 @@ impl StreamingSignal for FromInterleavedSamplesStream fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); - if s.next.is_some() { - if s.next.as_ref().unwrap().len() == F::CHANNELS { - let mut data_buf = mem::replace(&mut s.next, Some(Vec::new())).unwrap().into_iter(); - Poll::Ready(F::from_samples(&mut data_buf).unwrap()) - } else { - match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(Some(v)) => { - s.next.as_mut().unwrap().push(v); - Poll::Pending - } - Poll::Ready(None) => { - s.next = None; - Poll::Ready(F::EQUILIBRIUM) - } - Poll::Pending => Poll::Pending, + if s.underlying_exhausted { + return Poll::Ready(F::EQUILIBRIUM); + } + while s.next.len() < F::CHANNELS { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.next.push(v); } + Poll::Ready(None) => { + s.underlying_exhausted = true; + return Poll::Ready(F::EQUILIBRIUM); + } + Poll::Pending => return Poll::Pending, } - } else { - Poll::Ready(F::EQUILIBRIUM) } + + let mut data = s.next.iter().cloned(); + Poll::Ready(F::from_samples(&mut data).unwrap()) } } @@ -561,7 +557,6 @@ struct OpusEncoder { encoder: opus::Encoder, frame_size: u32, sample_rate: u32, - channels: usize, stream: S, input_buffer: Vec, exhausted: bool, @@ -588,7 +583,6 @@ impl OpusEncoder encoder, frame_size, sample_rate, - channels, stream, input_buffer: Vec::new(), exhausted: false, -- cgit v1.2.1