use dasp_frame::Frame; use dasp_sample::{SignedSample, ToSample, Sample}; use dasp_signal::Signal; use futures_util::stream::Stream; use opus::Channels; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; type FloatSample = <::Sample as Sample>::Float; pub struct StreamingNoiseGate { open: usize, signal: S, deactivation_delay: usize, alltime_high: FloatSample, } impl StreamingNoiseGate { pub fn new( signal: S, deactivation_delay: usize, ) -> StreamingNoiseGate { Self { open: 0, signal, deactivation_delay, alltime_high: FloatSample::::EQUILIBRIUM, } } } impl StreamingSignal for StreamingNoiseGate where S: StreamingSignal + Unpin, FloatSample: Unpin, ::Frame: Unpin { type Frame = S::Frame; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { const MUTE_PERCENTAGE: f32 = 0.1; let s = self.get_mut(); let frame = match S::poll_next(Pin::new(&mut s.signal), cx) { Poll::Ready(v) => v, Poll::Pending => return Poll::Pending, }; if let Some(highest) = frame.to_float_frame().channels().find(|e| abs(e.clone()) > s.alltime_high) { s.alltime_high = highest; } match s.open { 0 => { if frame.to_float_frame().channels().any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample())) { s.open = s.deactivation_delay; } } _ => { if frame.to_float_frame().channels().any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample())) { s.open = s.deactivation_delay; } else { s.open -= 1; } } } if s.open != 0 { Poll::Ready(frame) } else { Poll::Ready(::EQUILIBRIUM) } } fn is_exhausted(&self) -> bool { self.signal.is_exhausted() } } fn abs(sample: S) -> S { let zero = S::EQUILIBRIUM; if sample >= zero { sample } else { -sample } } pub trait StreamingSignal { type Frame: Frame; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; fn is_exhausted(&self) -> bool { false } } impl StreamingSignal for S where S: Signal + Unpin { type Frame = S::Frame; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { Poll::Ready(self.get_mut().next()) } } pub trait StreamingSignalExt: StreamingSignal { fn next(&mut self) -> Next<'_, Self> { Next { stream: self } } fn into_interleaved_samples(self) -> IntoInterleavedSamples where Self: Sized { IntoInterleavedSamples { signal: self, current_frame: None } } } impl StreamingSignalExt for S where S: StreamingSignal {} pub struct Next<'a, S: ?Sized> { stream: &'a mut S } impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> { type Output = S::Frame; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match S::poll_next(Pin::new(self.stream), cx) { Poll::Ready(val) => { Poll::Ready(val) } Poll::Pending => Poll::Pending } } } pub struct IntoInterleavedSamples { signal: S, current_frame: Option<::Channels>, } impl Stream for IntoInterleavedSamples where S: StreamingSignal + Unpin, <::Frame as Frame>::Channels: Unpin { type Item = ::Sample; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let s = self.get_mut(); loop { if s.current_frame.is_some() { if let Some(channel) = s.current_frame.as_mut().unwrap().next() { return Poll::Ready(Some(channel)); } } match S::poll_next(Pin::new(&mut s.signal), cx) { Poll::Ready(val) => { s.current_frame = Some(val.channels()); } Poll::Pending => return Poll::Pending, } } } } struct FromStream { stream: S, underlying_exhausted: bool, } impl StreamingSignal for FromStream where S: Stream + Unpin, S::Item: Frame + Unpin { type Frame = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); if s.underlying_exhausted { return Poll::Ready(::EQUILIBRIUM); } match S::poll_next(Pin::new(&mut s.stream), cx) { Poll::Ready(Some(val)) => { Poll::Ready(val) } Poll::Ready(None) => { s.underlying_exhausted = true; return Poll::Ready(::EQUILIBRIUM); } Poll::Pending => Poll::Pending, } } fn is_exhausted(&self) -> bool { self.underlying_exhausted } } pub struct FromInterleavedSamplesStream where F: Frame { stream: S, next_buf: Vec, underlying_exhausted: bool, } pub fn from_interleaved_samples_stream(stream: S) -> FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample, F: Frame { FromInterleavedSamplesStream { stream, next_buf: Vec::new(), underlying_exhausted: false, } } impl StreamingSignal for FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample + Unpin, F: Frame + Unpin { type Frame = F; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); if s.underlying_exhausted { return Poll::Ready(F::EQUILIBRIUM); } while s.next_buf.len() < F::CHANNELS { match S::poll_next(Pin::new(&mut s.stream), cx) { Poll::Ready(Some(v)) => { s.next_buf.push(v); } Poll::Ready(None) => { s.underlying_exhausted = true; return Poll::Ready(F::EQUILIBRIUM); } Poll::Pending => return Poll::Pending, } } let mut data = s.next_buf.iter().cloned(); let n = F::from_samples(&mut data).unwrap(); s.next_buf.clear(); Poll::Ready(n) } fn is_exhausted(&self) -> bool { self.underlying_exhausted } } pub struct OpusEncoder { encoder: opus::Encoder, frame_size: u32, sample_rate: u32, stream: S, input_buffer: Vec, exhausted: bool, } impl OpusEncoder where S: Stream, I: ToSample { pub fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self { let encoder = opus::Encoder::new( sample_rate, match channels { 1 => Channels::Mono, 2 => Channels::Stereo, _ => unimplemented!( "Only 1 or 2 channels supported, got {})", channels ), }, opus::Application::Voip, ).unwrap(); Self { encoder, frame_size, sample_rate, stream, input_buffer: Vec::new(), exhausted: false, } } } impl Stream for OpusEncoder where S: Stream + Unpin, I: Sample + ToSample { type Item = Vec; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let s = self.get_mut(); if s.exhausted { return Poll::Ready(None); } let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize; loop { while s.input_buffer.len() < opus_frame_size { match S::poll_next(Pin::new(&mut s.stream), cx) { Poll::Ready(Some(v)) => { s.input_buffer.push(v.to_sample::()); } Poll::Ready(None) => { s.exhausted = true; return Poll::Ready(None); } Poll::Pending => return Poll::Pending, } } if s.input_buffer.iter().any(|&e| e != 0.0) { break; } s.input_buffer.clear(); } let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap(); s.input_buffer.clear(); Poll::Ready(Some(encoded)) } }