From 8f212e65f1bc9187ff0854ed0b83e0b25be5847a Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 09:05:26 +0100 Subject: implement is_exhausted in a reasonable way --- mumd/src/audio.rs | 44 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) (limited to 'mumd') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 9c9648b..0aae1cf 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -22,6 +22,7 @@ use futures::task::{Context, Poll}; use std::pin::Pin; use tokio_stream::StreamExt; use std::future::Future; +use std::mem; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -504,6 +505,10 @@ impl StreamingSignal for FromStream None => Poll::Ready(::EQUILIBRIUM) } } + + fn is_exhausted(&self) -> bool { + self.next.is_none() + } } @@ -511,19 +516,37 @@ struct FromInterleavedSamplesStream where F: Frame { stream: S, - next: Vec, + next_frame: Option, + next_buf: Vec, underlying_exhausted: bool, } -async fn from_interleaved_samples_stream(stream: S) -> FromInterleavedSamplesStream +async fn from_interleaved_samples_stream(mut stream: S) -> FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample, F: Frame { + let mut i = 0; + let mut buf = Vec::new(); + let next = loop { + if i == F::CHANNELS { + let mut iter = buf.into_iter(); + break F::from_samples(&mut iter); + } + match stream.next().await { + Some(v) => { + buf.push(v); + } + None => break None, + } + + i += 1; + }; FromInterleavedSamplesStream { stream, - next: Vec::new(), - underlying_exhausted: false + next_buf: Vec::new(), + underlying_exhausted: false, + next_frame: next, } } @@ -531,7 +554,7 @@ impl StreamingSignal for FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample + Unpin, - F: Frame { + F: Frame + Unpin { type Frame = F; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -539,21 +562,24 @@ impl StreamingSignal for FromInterleavedSamplesStream if s.underlying_exhausted { return Poll::Ready(F::EQUILIBRIUM); } - while s.next.len() < F::CHANNELS { + while s.next_buf.len() < F::CHANNELS { match S::poll_next(Pin::new(&mut s.stream), cx) { Poll::Ready(Some(v)) => { - s.next.push(v); + s.next_buf.push(v); } Poll::Ready(None) => { s.underlying_exhausted = true; + s.next_frame = None; return Poll::Ready(F::EQUILIBRIUM); } Poll::Pending => return Poll::Pending, } } - let mut data = s.next.iter().cloned(); - Poll::Ready(F::from_samples(&mut data).unwrap()) + let mut data = s.next_buf.iter().cloned(); + let n = F::from_samples(&mut data).unwrap(); + let ret = mem::replace(&mut s.next_frame, Some(n)).unwrap(); + Poll::Ready(ret) } } -- cgit v1.2.1