From dbcc5373fab41d876f4495463d76c1ab28c9d670 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sun, 27 Dec 2020 15:43:58 +0100 Subject: create noise-gate struct --- mumd/src/audio.rs | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 8f9e2ab..bed9ceb 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -14,6 +14,9 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; +use dasp_frame::Frame; +use dasp_sample::{Sample, SignedSample}; +use dasp_ring_buffer::Fixed; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -354,3 +357,65 @@ impl Audio { play_sounds.extend(samples.iter().skip(l)); } } + +struct NoiseGate { + open: bool, + signal: S, + buffer: dasp_ring_buffer::Fixed>, + activate_threshold: <::Sample as Sample>::Float, + deactivate_threshold: <::Sample as Sample>::Float, +} + +impl NoiseGate { + pub fn new(signal: S, activate_threshold: <::Sample as Sample>::Float, deactivate_threshold: <::Sample as Sample>::Float) -> NoiseGate { + Self { + open: false, + signal, + buffer: Fixed::from(vec![::EQUILIBRIUM; 4096]), + activate_threshold, + deactivate_threshold, + } + } +} + +impl Signal for NoiseGate { + type Frame = S::Frame; + + fn next(&mut self) -> Self::Frame { + let frame = self.signal.next(); + self.buffer.push(frame); + + if self.open && self.buffer + .iter() + .all(|f| f.to_float_frame() + .channels() + .all(|s| abs(s - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) <= self.deactivate_threshold)) { + self.open = false; + } else if !self.open && self.buffer + .iter() + .any(|f| f.to_float_frame() + .channels() + .any(|s| abs(s - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) >= self.activate_threshold)) { + self.open = true; + } + + if self.open { + frame + } else { + S::Frame::EQUILIBRIUM + } + } + + fn is_exhausted(&self) -> bool { + self.signal.is_exhausted() + } +} + +fn abs(sample: S) -> S { + let zero = S::EQUILIBRIUM; + if sample >= zero { + sample + } else { + -sample + } +} \ No newline at end of file -- cgit v1.2.1 From 9777219aa26dc64c0a3dc3d9d2023b8a1c4295fb Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Fri, 1 Jan 2021 14:30:34 +0100 Subject: add initial streaming signals --- mumd/src/audio.rs | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 130 insertions(+), 3 deletions(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index bed9ceb..ee5516a 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -4,7 +4,7 @@ pub mod output; use crate::audio::output::SaturatingAdd; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{SampleFormat, SampleRate, Stream, StreamConfig}; +use cpal::{SampleFormat, SampleRate, StreamConfig}; use dasp_interpolate::linear::Linear; use dasp_signal::{self as signal, Signal}; use log::*; @@ -17,6 +17,13 @@ use tokio::sync::{mpsc, watch}; use dasp_frame::Frame; use dasp_sample::{Sample, SignedSample}; use dasp_ring_buffer::Fixed; +use futures::Stream; +use futures::task::{Context, Poll}; +use std::pin::Pin; +use tokio::stream::StreamExt; +use std::convert::identity; +use std::future::Future; +use std::mem; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -65,8 +72,8 @@ pub enum NotificationEvents { pub struct Audio { output_config: StreamConfig, - _output_stream: Stream, - _input_stream: Stream, + _output_stream: cpal::Stream, + _input_stream: cpal::Stream, input_channel_receiver: Option>, input_volume_sender: watch::Sender, @@ -418,4 +425,124 @@ fn abs(sample: S) -> S { } else { -sample } +} + +trait StreamingSignal { + type Frame: Frame; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; +} + +trait StreamingSignalExt: StreamingSignal { + fn next(&mut self) -> Next<'_, Self> { + Next { + stream: self + } + } +} + +struct Next<'a, S: ?Sized> { + stream: &'a mut S +} + +impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> { + type Output = S::Frame; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match S::poll_next(Pin::new(self.stream), cx) { + Poll::Ready(val) => { + Poll::Ready(val) + } + Poll::Pending => Poll::Pending + } + } +} + +struct FromStream { + stream: S, + next: Option, +} + +async fn from_stream(mut stream: S) -> FromStream + where + S: Stream + Unpin, + S::Item: Frame { + let next = stream.next().await; + FromStream { stream, next } +} + +impl StreamingSignal for FromStream + where + S: Stream + Unpin, + S::Item: Frame + Unpin { + type Frame = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let s = self.get_mut(); + match s.next.take() { + Some(v) => { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(val) => { + s.next = val; + Poll::Ready(v) + } + Poll::Pending => Poll::Pending + } + } + None => Poll::Ready(::EQUILIBRIUM) + } + } +} + + +struct FromInterleavedSamplesStream + where + F: Frame { + stream: S, + next: Option>, +} + +async fn from_interleaved_samples_stream(mut stream: S) -> FromInterleavedSamplesStream + where + S: Stream + Unpin, + S::Item: Sample, + F: Frame { + let mut data = Vec::with_capacity(F::CHANNELS); + for _ in 0..F::CHANNELS { + data.push(stream.next().await); + } + let data = data.into_iter().flat_map(identity).collect::>(); + FromInterleavedSamplesStream { stream, next: if data.len() == F::CHANNELS { Some(data) } else { None } } +} + +impl StreamingSignal for FromInterleavedSamplesStream + where + S: Stream + Unpin, + S::Item: Sample + Unpin, + F: Frame { + type Frame = F; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let s = self.get_mut(); + if s.next.is_some() { + if s.next.as_ref().unwrap().len() == F::CHANNELS { + let mut data_buf = mem::replace(&mut s.next, Some(Vec::new())).unwrap().into_iter(); + Poll::Ready(F::from_samples(&mut data_buf).unwrap()) + } else { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.next.as_mut().unwrap().push(v); + Poll::Pending + } + Poll::Ready(None) => { + s.next = None; + Poll::Ready(F::EQUILIBRIUM) + } + Poll::Pending => Poll::Pending, + } + } + } else { + Poll::Ready(F::EQUILIBRIUM) + } + } } \ No newline at end of file -- cgit v1.2.1 From ee13c36868f08203d548c3221300651c5108b8a9 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Fri, 1 Jan 2021 14:33:19 +0100 Subject: update to use tokio 1.0 --- mumd/src/audio.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 0998f06..dc0b77d 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -20,7 +20,7 @@ use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; use std::pin::Pin; -use tokio::stream::StreamExt; +use tokio_stream::StreamExt; use std::convert::identity; use std::future::Future; use std::mem; -- cgit v1.2.1 From 5cdac93a475b4402680ac8d274677f4ba29b1e25 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 08:42:52 +0100 Subject: add OpusEncoder stream --- mumd/src/audio.rs | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index dc0b77d..324b615 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -15,7 +15,7 @@ use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; use dasp_frame::Frame; -use dasp_sample::{Sample, SignedSample}; +use dasp_sample::{Sample, SignedSample, ToSample}; use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; @@ -545,4 +545,74 @@ impl StreamingSignal for FromInterleavedSamplesStream Poll::Ready(F::EQUILIBRIUM) } } +} + +struct OpusEncoder { + encoder: opus::Encoder, + frame_size: u32, + sample_rate: u32, + channels: usize, + stream: S, + input_buffer: Vec, + exhausted: bool, +} + +impl OpusEncoder + where + S: Stream, + I: ToSample { + fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self { + let encoder = opus::Encoder::new( + sample_rate, + match channels { + 1 => Channels::Mono, + 2 => Channels::Stereo, + _ => unimplemented!( + "Only 1 or 2 channels supported, got {})", + channels + ), + }, + opus::Application::Voip, + ).unwrap(); + Self { + encoder, + frame_size, + sample_rate, + channels, + stream, + input_buffer: Vec::new(), + exhausted: false, + } + } +} + +impl Stream for OpusEncoder + where + S: Stream + Unpin, + I: Sample + ToSample { + type Item = Vec; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let s = self.get_mut(); + if s.exhausted { + return Poll::Ready(None); + } + let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize; + while s.input_buffer.len() < opus_frame_size { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.input_buffer.push(v.to_sample::()); + } + Poll::Ready(None) => { + s.exhausted = true; + return Poll::Ready(None); + } + Poll::Pending => return Poll::Pending, + } + } + + let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap(); + s.input_buffer.clear(); + Poll::Ready(Some(encoded)) + } } \ No newline at end of file -- cgit v1.2.1 From ffaa6c3e017e98054f1840d187c10fcdb593b48f Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 08:43:15 +0100 Subject: add blanket impl for StreamingSignal --- mumd/src/audio.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 324b615..cd6141a 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -433,6 +433,16 @@ trait StreamingSignal { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; } +impl StreamingSignal for S + where + S: Signal + Unpin { + type Frame = S::Frame; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Ready(self.get_mut().next()) + } +} + trait StreamingSignalExt: StreamingSignal { fn next(&mut self) -> Next<'_, Self> { Next { -- cgit v1.2.1 From 22481e0fd03102d2fa01a5025049c10a5539e356 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 08:52:55 +0100 Subject: fix from_interleaved_samples_iter --- mumd/src/audio.rs | 50 ++++++++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 28 deletions(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index cd6141a..7d4929c 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -21,9 +21,7 @@ use futures::Stream; use futures::task::{Context, Poll}; use std::pin::Pin; use tokio_stream::StreamExt; -use std::convert::identity; use std::future::Future; -use std::mem; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -509,20 +507,20 @@ struct FromInterleavedSamplesStream where F: Frame { stream: S, - next: Option>, + next: Vec, + underlying_exhausted: bool, } -async fn from_interleaved_samples_stream(mut stream: S) -> FromInterleavedSamplesStream +async fn from_interleaved_samples_stream(stream: S) -> FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample, F: Frame { - let mut data = Vec::with_capacity(F::CHANNELS); - for _ in 0..F::CHANNELS { - data.push(stream.next().await); + FromInterleavedSamplesStream { + stream, + next: Vec::new(), + underlying_exhausted: false } - let data = data.into_iter().flat_map(identity).collect::>(); - FromInterleavedSamplesStream { stream, next: if data.len() == F::CHANNELS { Some(data) } else { None } } } impl StreamingSignal for FromInterleavedSamplesStream @@ -534,26 +532,24 @@ impl StreamingSignal for FromInterleavedSamplesStream fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); - if s.next.is_some() { - if s.next.as_ref().unwrap().len() == F::CHANNELS { - let mut data_buf = mem::replace(&mut s.next, Some(Vec::new())).unwrap().into_iter(); - Poll::Ready(F::from_samples(&mut data_buf).unwrap()) - } else { - match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(Some(v)) => { - s.next.as_mut().unwrap().push(v); - Poll::Pending - } - Poll::Ready(None) => { - s.next = None; - Poll::Ready(F::EQUILIBRIUM) - } - Poll::Pending => Poll::Pending, + if s.underlying_exhausted { + return Poll::Ready(F::EQUILIBRIUM); + } + while s.next.len() < F::CHANNELS { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.next.push(v); } + Poll::Ready(None) => { + s.underlying_exhausted = true; + return Poll::Ready(F::EQUILIBRIUM); + } + Poll::Pending => return Poll::Pending, } - } else { - Poll::Ready(F::EQUILIBRIUM) } + + let mut data = s.next.iter().cloned(); + Poll::Ready(F::from_samples(&mut data).unwrap()) } } @@ -561,7 +557,6 @@ struct OpusEncoder { encoder: opus::Encoder, frame_size: u32, sample_rate: u32, - channels: usize, stream: S, input_buffer: Vec, exhausted: bool, @@ -588,7 +583,6 @@ impl OpusEncoder encoder, frame_size, sample_rate, - channels, stream, input_buffer: Vec::new(), exhausted: false, -- cgit v1.2.1 From bcf60257a86fec924dda8c71ab3a5fe0be5b74cd Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 08:55:18 +0100 Subject: add is_exhausted method to StreamingSignal trait --- mumd/src/audio.rs | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 7d4929c..9c9648b 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -429,6 +429,10 @@ trait StreamingSignal { type Frame: Frame; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; + + fn is_exhausted(&self) -> bool { + false + } } impl StreamingSignal for S -- cgit v1.2.1 From 8f212e65f1bc9187ff0854ed0b83e0b25be5847a Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 09:05:26 +0100 Subject: implement is_exhausted in a reasonable way --- mumd/src/audio.rs | 44 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 9c9648b..0aae1cf 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -22,6 +22,7 @@ use futures::task::{Context, Poll}; use std::pin::Pin; use tokio_stream::StreamExt; use std::future::Future; +use std::mem; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -504,6 +505,10 @@ impl StreamingSignal for FromStream None => Poll::Ready(::EQUILIBRIUM) } } + + fn is_exhausted(&self) -> bool { + self.next.is_none() + } } @@ -511,19 +516,37 @@ struct FromInterleavedSamplesStream where F: Frame { stream: S, - next: Vec, + next_frame: Option, + next_buf: Vec, underlying_exhausted: bool, } -async fn from_interleaved_samples_stream(stream: S) -> FromInterleavedSamplesStream +async fn from_interleaved_samples_stream(mut stream: S) -> FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample, F: Frame { + let mut i = 0; + let mut buf = Vec::new(); + let next = loop { + if i == F::CHANNELS { + let mut iter = buf.into_iter(); + break F::from_samples(&mut iter); + } + match stream.next().await { + Some(v) => { + buf.push(v); + } + None => break None, + } + + i += 1; + }; FromInterleavedSamplesStream { stream, - next: Vec::new(), - underlying_exhausted: false + next_buf: Vec::new(), + underlying_exhausted: false, + next_frame: next, } } @@ -531,7 +554,7 @@ impl StreamingSignal for FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample + Unpin, - F: Frame { + F: Frame + Unpin { type Frame = F; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -539,21 +562,24 @@ impl StreamingSignal for FromInterleavedSamplesStream if s.underlying_exhausted { return Poll::Ready(F::EQUILIBRIUM); } - while s.next.len() < F::CHANNELS { + while s.next_buf.len() < F::CHANNELS { match S::poll_next(Pin::new(&mut s.stream), cx) { Poll::Ready(Some(v)) => { - s.next.push(v); + s.next_buf.push(v); } Poll::Ready(None) => { s.underlying_exhausted = true; + s.next_frame = None; return Poll::Ready(F::EQUILIBRIUM); } Poll::Pending => return Poll::Pending, } } - let mut data = s.next.iter().cloned(); - Poll::Ready(F::from_samples(&mut data).unwrap()) + let mut data = s.next_buf.iter().cloned(); + let n = F::from_samples(&mut data).unwrap(); + let ret = mem::replace(&mut s.next_frame, Some(n)).unwrap(); + Poll::Ready(ret) } } -- cgit v1.2.1 From f52329ef65b96d1e5d1fd25dabd51f0fdd23ff92 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 09:09:27 +0100 Subject: fix FromStream --- mumd/src/audio.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 0aae1cf..afe644c 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -492,17 +492,15 @@ impl StreamingSignal for FromStream fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); - match s.next.take() { - Some(v) => { - match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(val) => { - s.next = val; - Poll::Ready(v) - } - Poll::Pending => Poll::Pending - } + if s.next.is_none() { + return Poll::Ready(::EQUILIBRIUM); + } + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(val) => { + let ret = mem::replace(&mut s.next, val); + Poll::Ready(ret.unwrap()) } - None => Poll::Ready(::EQUILIBRIUM) + Poll::Pending => Poll::Pending, } } -- cgit v1.2.1 From 7fbbf89cc16734e59882ab71e2aed54e4c048733 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 09:32:30 +0100 Subject: add IntoInterleavedSamples struct --- mumd/src/audio.rs | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index afe644c..828dbc5 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -452,6 +452,12 @@ trait StreamingSignalExt: StreamingSignal { stream: self } } + + fn into_interleaved_samples(self) -> IntoInterleavedSamples + where + Self: Sized { + IntoInterleavedSamples { signal: self, current_frame: None } + } } struct Next<'a, S: ?Sized> { @@ -471,6 +477,35 @@ impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> { } } +struct IntoInterleavedSamples { + signal: S, + current_frame: Option<::Channels>, +} + +impl Stream for IntoInterleavedSamples + where + S: StreamingSignal + Unpin, + <::Frame as Frame>::Channels: Unpin { + type Item = ::Sample; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let s = self.get_mut(); + loop { + if s.current_frame.is_none() { + match S::poll_next(Pin::new(&mut s.signal), cx) { + Poll::Ready(val) => { + s.current_frame = Some(val.channels()); + } + Poll::Pending => return Poll::Pending, + } + } + if let Some(channel) = s.current_frame.as_mut().unwrap().next() { + return Poll::Ready(Some(channel)); + } + } + } +} + struct FromStream { stream: S, next: Option, -- cgit v1.2.1 From 08e64c1b9d622026bcbe1f80d2d5d64dd80af8f9 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 09:47:14 +0100 Subject: add streaming version of NoiseGate --- mumd/src/audio.rs | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 2 deletions(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 828dbc5..0a2465e 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -21,7 +21,7 @@ use futures::Stream; use futures::task::{Context, Poll}; use std::pin::Pin; use tokio_stream::StreamExt; -use std::future::Future; +use std::future::{Future}; use std::mem; //TODO? move to mumlib @@ -373,7 +373,11 @@ struct NoiseGate { } impl NoiseGate { - pub fn new(signal: S, activate_threshold: <::Sample as Sample>::Float, deactivate_threshold: <::Sample as Sample>::Float) -> NoiseGate { + pub fn new( + signal: S, + activate_threshold: <::Sample as Sample>::Float, + deactivate_threshold: <::Sample as Sample>::Float + ) -> NoiseGate { Self { open: false, signal, @@ -417,6 +421,72 @@ impl Signal for NoiseGate { } } +struct StreamingNoiseGate { + open: bool, + signal: S, + buffer: dasp_ring_buffer::Fixed>, + activate_threshold: <::Sample as Sample>::Float, + deactivate_threshold: <::Sample as Sample>::Float, +} + +impl StreamingNoiseGate { + pub fn new( + signal: S, + activate_threshold: <::Sample as Sample>::Float, + deactivate_threshold: <::Sample as Sample>::Float + ) -> StreamingNoiseGate { + Self { + open: false, + signal, + buffer: Fixed::from(vec![::EQUILIBRIUM; 4096]), + activate_threshold, + deactivate_threshold, + } + } +} + +impl StreamingSignal for StreamingNoiseGate + where + S: StreamingSignal + Unpin, + <<::Frame as Frame>::Sample as Sample>::Float: Unpin, + ::Frame: Unpin { + type Frame = S::Frame; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let s = self.get_mut(); + + let frame = match S::poll_next(Pin::new(&mut s.signal), cx) { + Poll::Ready(v) => v, + Poll::Pending => return Poll::Pending, + }; + s.buffer.push(frame); + + if s.open && s.buffer + .iter() + .all(|f| f.to_float_frame() + .channels() + .all(|sample| abs(sample - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) <= s.deactivate_threshold)) { + s.open = false; + } else if !s.open && s.buffer + .iter() + .any(|f| f.to_float_frame() + .channels() + .any(|sample| abs(sample - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) >= s.activate_threshold)) { + s.open = true; + } + + if s.open { + Poll::Ready(frame) + } else { + Poll::Ready(S::Frame::EQUILIBRIUM) + } + } + + fn is_exhausted(&self) -> bool { + self.signal.is_exhausted() + } +} + fn abs(sample: S) -> S { let zero = S::EQUILIBRIUM; if sample >= zero { -- cgit v1.2.1 From 76a3f1ea5489048e6d32982119429daa05dde3e0 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 11:17:49 +0100 Subject: make audio sending use streams --- mumd/src/audio.rs | 71 +++++++++++++++++++++++-------------------------------- 1 file changed, 30 insertions(+), 41 deletions(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 0a2465e..a8af82d 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -13,9 +13,9 @@ use opus::Channels; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; use dasp_frame::Frame; -use dasp_sample::{Sample, SignedSample, ToSample}; +use dasp_sample::{SignedSample, ToSample, Sample}; use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; @@ -74,7 +74,7 @@ pub struct Audio { _output_stream: cpal::Stream, _input_stream: cpal::Stream, - input_channel_receiver: Option>, + input_channel_receiver: Option> + Unpin>>, input_volume_sender: watch::Sender, output_volume_sender: watch::Sender, @@ -88,7 +88,7 @@ pub struct Audio { } impl Audio { - pub fn new(input_volume: f32, output_volume: f32) -> Self { + pub async fn new(input_volume: f32, output_volume: f32) -> Self { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); @@ -169,20 +169,7 @@ impl Audio { } .unwrap(); - let input_encoder = opus::Encoder::new( - input_config.sample_rate.0, - match input_config.channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!( - "Only 1 or 2 channels supported, got {})", - input_config.channels - ), - }, - opus::Application::Voip, - ) - .unwrap(); - let (input_sender, input_receiver) = mpsc::channel(100); + let (sample_sender, sample_receiver) = futures::channel::mpsc::channel(1_000_000); let (input_volume_sender, input_volume_receiver) = watch::channel::(input_volume); @@ -190,39 +177,37 @@ impl Audio { SampleFormat::F32 => input_device.build_input_stream( &input_config, input::callback::( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), SampleFormat::I16 => input_device.build_input_stream( &input_config, input::callback::( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), SampleFormat::U16 => input_device.build_input_stream( &input_config, input::callback::( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), } .unwrap(); + let opus_stream = OpusEncoder::new( + 4, + input_config.sample_rate.0, + input_config.channels as usize, + StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver).await)); //TODO attach a noise gate + //TODO group frames correctly + output_stream.play().unwrap(); let sounds = EVENT_SOUNDS @@ -246,7 +231,7 @@ impl Audio { _ => unimplemented!() // TODO handle gracefully (this might not even happen) }; let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter); - let interp = Linear::new(signal.next(), signal.next()); + let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal)); let samples = signal .from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64) .until_exhausted() @@ -262,7 +247,7 @@ impl Audio { _output_stream: output_stream, _input_stream: input_stream, input_volume_sender, - input_channel_receiver: Some(input_receiver), + input_channel_receiver: Some(Box::new(opus_stream)), client_streams, sounds, output_volume_sender, @@ -312,7 +297,7 @@ impl Audio { } } - pub fn take_receiver(&mut self) -> Option> { + pub fn take_receiver(&mut self) -> Option> + Unpin>> { self.input_channel_receiver.take() } @@ -530,6 +515,9 @@ trait StreamingSignalExt: StreamingSignal { } } +impl StreamingSignalExt for S + where S: StreamingSignal {} + struct Next<'a, S: ?Sized> { stream: &'a mut S } @@ -561,16 +549,16 @@ impl Stream for IntoInterleavedSamples fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let s = self.get_mut(); loop { - if s.current_frame.is_none() { - match S::poll_next(Pin::new(&mut s.signal), cx) { - Poll::Ready(val) => { - s.current_frame = Some(val.channels()); - } - Poll::Pending => return Poll::Pending, + if s.current_frame.is_some() { + if let Some(channel) = s.current_frame.as_mut().unwrap().next() { + return Poll::Ready(Some(channel)); } } - if let Some(channel) = s.current_frame.as_mut().unwrap().next() { - return Poll::Ready(Some(channel)); + match S::poll_next(Pin::new(&mut s.signal), cx) { + Poll::Ready(val) => { + s.current_frame = Some(val.channels()); + } + Poll::Pending => return Poll::Pending, } } } @@ -681,6 +669,7 @@ impl StreamingSignal for FromInterleavedSamplesStream let mut data = s.next_buf.iter().cloned(); let n = F::from_samples(&mut data).unwrap(); + s.next_buf.clear(); let ret = mem::replace(&mut s.next_frame, Some(n)).unwrap(); Poll::Ready(ret) } -- cgit v1.2.1 From 0f225e518b6889f604cb440f14824b21ed49bf37 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 11:24:59 +0100 Subject: make a bunch of functions sync --- mumd/src/audio.rs | 55 ++++++++++++++++++++----------------------------------- 1 file changed, 20 insertions(+), 35 deletions(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index a8af82d..9c3c143 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -22,7 +22,6 @@ use futures::task::{Context, Poll}; use std::pin::Pin; use tokio_stream::StreamExt; use std::future::{Future}; -use std::mem; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -88,7 +87,7 @@ pub struct Audio { } impl Audio { - pub async fn new(input_volume: f32, output_volume: f32) -> Self { + pub fn new(input_volume: f32, output_volume: f32) -> Self { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); @@ -205,7 +204,7 @@ impl Audio { 4, input_config.sample_rate.0, input_config.channels as usize, - StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver).await)); //TODO attach a noise gate + StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver))); //TODO attach a noise gate //TODO group frames correctly output_stream.play().unwrap(); @@ -564,17 +563,16 @@ impl Stream for IntoInterleavedSamples } } -struct FromStream { +struct FromStream { stream: S, - next: Option, + underlying_exhausted: bool, } -async fn from_stream(mut stream: S) -> FromStream +fn from_stream(mut stream: S) -> FromStream where S: Stream + Unpin, S::Item: Frame { - let next = stream.next().await; - FromStream { stream, next } + FromStream { stream, underlying_exhausted: false } } impl StreamingSignal for FromStream @@ -585,20 +583,23 @@ impl StreamingSignal for FromStream fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); - if s.next.is_none() { + if s.underlying_exhausted { return Poll::Ready(::EQUILIBRIUM); } match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(val) => { - let ret = mem::replace(&mut s.next, val); - Poll::Ready(ret.unwrap()) + Poll::Ready(Some(val)) => { + Poll::Ready(val) + } + Poll::Ready(None) => { + s.underlying_exhausted = true; + return Poll::Ready(::EQUILIBRIUM); } Poll::Pending => Poll::Pending, } } fn is_exhausted(&self) -> bool { - self.next.is_none() + self.underlying_exhausted } } @@ -607,37 +608,19 @@ struct FromInterleavedSamplesStream where F: Frame { stream: S, - next_frame: Option, next_buf: Vec, underlying_exhausted: bool, } -async fn from_interleaved_samples_stream(mut stream: S) -> FromInterleavedSamplesStream +fn from_interleaved_samples_stream(stream: S) -> FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample, F: Frame { - let mut i = 0; - let mut buf = Vec::new(); - let next = loop { - if i == F::CHANNELS { - let mut iter = buf.into_iter(); - break F::from_samples(&mut iter); - } - match stream.next().await { - Some(v) => { - buf.push(v); - } - None => break None, - } - - i += 1; - }; FromInterleavedSamplesStream { stream, next_buf: Vec::new(), underlying_exhausted: false, - next_frame: next, } } @@ -660,7 +643,6 @@ impl StreamingSignal for FromInterleavedSamplesStream } Poll::Ready(None) => { s.underlying_exhausted = true; - s.next_frame = None; return Poll::Ready(F::EQUILIBRIUM); } Poll::Pending => return Poll::Pending, @@ -670,8 +652,11 @@ impl StreamingSignal for FromInterleavedSamplesStream let mut data = s.next_buf.iter().cloned(); let n = F::from_samples(&mut data).unwrap(); s.next_buf.clear(); - let ret = mem::replace(&mut s.next_frame, Some(n)).unwrap(); - Poll::Ready(ret) + Poll::Ready(n) + } + + fn is_exhausted(&self) -> bool { + self.underlying_exhausted } } -- cgit v1.2.1 From b35a9c0a48f3f853b2d0e1551d33682189d77055 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 11:27:04 +0100 Subject: remove unused imports --- mumd/src/audio.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 9c3c143..4b0cb47 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -20,7 +20,6 @@ use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; use std::pin::Pin; -use tokio_stream::StreamExt; use std::future::{Future}; //TODO? move to mumlib -- cgit v1.2.1 From 6e07c2bc4bba206e15bbe8838a322a5c506be9a1 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 15:05:52 +0100 Subject: remove deps and make noise gate more efficent --- mumd/src/audio.rs | 95 +++++++++++++++++++++++++++---------------------------- 1 file changed, 47 insertions(+), 48 deletions(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 4b0cb47..d4ef4d6 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -16,11 +16,11 @@ use std::sync::{Arc, Mutex}; use tokio::sync::watch; use dasp_frame::Frame; use dasp_sample::{SignedSample, ToSample, Sample}; -use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; use std::pin::Pin; use std::future::{Future}; +use std::fmt::Debug; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -203,8 +203,11 @@ impl Audio { 4, input_config.sample_rate.0, input_config.channels as usize, - StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver))); //TODO attach a noise gate - //TODO group frames correctly + StreamingSignalExt::into_interleaved_samples( + StreamingNoiseGate::new( + from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly + 0.09, + 10_000))); output_stream.play().unwrap(); @@ -348,25 +351,23 @@ impl Audio { } struct NoiseGate { - open: bool, + open: usize, signal: S, - buffer: dasp_ring_buffer::Fixed>, activate_threshold: <::Sample as Sample>::Float, - deactivate_threshold: <::Sample as Sample>::Float, + deactivation_delay: usize, } impl NoiseGate { pub fn new( signal: S, activate_threshold: <::Sample as Sample>::Float, - deactivate_threshold: <::Sample as Sample>::Float + deactivation_delay: usize, ) -> NoiseGate { Self { - open: false, + open: 0, signal, - buffer: Fixed::from(vec![::EQUILIBRIUM; 4096]), activate_threshold, - deactivate_threshold, + deactivation_delay } } } @@ -376,26 +377,26 @@ impl Signal for NoiseGate { fn next(&mut self) -> Self::Frame { let frame = self.signal.next(); - self.buffer.push(frame); - if self.open && self.buffer - .iter() - .all(|f| f.to_float_frame() - .channels() - .all(|s| abs(s - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) <= self.deactivate_threshold)) { - self.open = false; - } else if !self.open && self.buffer - .iter() - .any(|f| f.to_float_frame() - .channels() - .any(|s| abs(s - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) >= self.activate_threshold)) { - self.open = true; + match self.open { + 0 => { + if frame.to_float_frame().channels().any(|e| abs(e) >= self.activate_threshold) { + self.open = self.deactivation_delay; + } + } + _ => { + if frame.to_float_frame().channels().any(|e| abs(e) >= self.activate_threshold) { + self.open = self.deactivation_delay; + } else { + self.open -= 1; + } + } } - if self.open { + if self.open != 0 { frame } else { - S::Frame::EQUILIBRIUM + ::EQUILIBRIUM } } @@ -405,25 +406,23 @@ impl Signal for NoiseGate { } struct StreamingNoiseGate { - open: bool, + open: usize, signal: S, - buffer: dasp_ring_buffer::Fixed>, activate_threshold: <::Sample as Sample>::Float, - deactivate_threshold: <::Sample as Sample>::Float, + deactivation_delay: usize, } impl StreamingNoiseGate { pub fn new( signal: S, activate_threshold: <::Sample as Sample>::Float, - deactivate_threshold: <::Sample as Sample>::Float + deactivation_delay: usize, ) -> StreamingNoiseGate { Self { - open: false, + open: 0, signal, - buffer: Fixed::from(vec![::EQUILIBRIUM; 4096]), activate_threshold, - deactivate_threshold, + deactivation_delay } } } @@ -442,26 +441,26 @@ impl StreamingSignal for StreamingNoiseGate Poll::Ready(v) => v, Poll::Pending => return Poll::Pending, }; - s.buffer.push(frame); - if s.open && s.buffer - .iter() - .all(|f| f.to_float_frame() - .channels() - .all(|sample| abs(sample - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) <= s.deactivate_threshold)) { - s.open = false; - } else if !s.open && s.buffer - .iter() - .any(|f| f.to_float_frame() - .channels() - .any(|sample| abs(sample - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) >= s.activate_threshold)) { - s.open = true; + match s.open { + 0 => { + if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) { + s.open = s.deactivation_delay; + } + } + _ => { + if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) { + s.open = s.deactivation_delay; + } else { + s.open -= 1; + } + } } - if s.open { + if s.open != 0 { Poll::Ready(frame) } else { - Poll::Ready(S::Frame::EQUILIBRIUM) + Poll::Ready(::EQUILIBRIUM) } } @@ -567,7 +566,7 @@ struct FromStream { underlying_exhausted: bool, } -fn from_stream(mut stream: S) -> FromStream +fn from_stream(stream: S) -> FromStream where S: Stream + Unpin, S::Item: Frame { -- cgit v1.2.1 From 33a2da5d267d953fdc91988bc14ce9b0710b3d59 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 15:34:31 +0100 Subject: make opus encoder only yield frame when there is data to send --- mumd/src/audio.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index d4ef4d6..6d69e36 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -707,17 +707,23 @@ impl Stream for OpusEncoder return Poll::Ready(None); } let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize; - while s.input_buffer.len() < opus_frame_size { - match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(Some(v)) => { - s.input_buffer.push(v.to_sample::()); - } - Poll::Ready(None) => { - s.exhausted = true; - return Poll::Ready(None); + loop { + while s.input_buffer.len() < opus_frame_size { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.input_buffer.push(v.to_sample::()); + } + Poll::Ready(None) => { + s.exhausted = true; + return Poll::Ready(None); + } + Poll::Pending => return Poll::Pending, } - Poll::Pending => return Poll::Pending, } + if s.input_buffer.iter().any(|&e| e != 0.0) { + break; + } + s.input_buffer.clear(); } let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap(); -- cgit v1.2.1 From e566dbb547ffa2ebea9cd7c529d9bf5edb920f9a Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sun, 3 Jan 2021 17:16:38 +0100 Subject: remove dead code --- mumd/src/audio.rs | 62 ------------------------------------------------------- 1 file changed, 62 deletions(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 6d69e36..d7b2060 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -350,61 +350,6 @@ impl Audio { } } -struct NoiseGate { - open: usize, - signal: S, - activate_threshold: <::Sample as Sample>::Float, - deactivation_delay: usize, -} - -impl NoiseGate { - pub fn new( - signal: S, - activate_threshold: <::Sample as Sample>::Float, - deactivation_delay: usize, - ) -> NoiseGate { - Self { - open: 0, - signal, - activate_threshold, - deactivation_delay - } - } -} - -impl Signal for NoiseGate { - type Frame = S::Frame; - - fn next(&mut self) -> Self::Frame { - let frame = self.signal.next(); - - match self.open { - 0 => { - if frame.to_float_frame().channels().any(|e| abs(e) >= self.activate_threshold) { - self.open = self.deactivation_delay; - } - } - _ => { - if frame.to_float_frame().channels().any(|e| abs(e) >= self.activate_threshold) { - self.open = self.deactivation_delay; - } else { - self.open -= 1; - } - } - } - - if self.open != 0 { - frame - } else { - ::EQUILIBRIUM - } - } - - fn is_exhausted(&self) -> bool { - self.signal.is_exhausted() - } -} - struct StreamingNoiseGate { open: usize, signal: S, @@ -566,13 +511,6 @@ struct FromStream { underlying_exhausted: bool, } -fn from_stream(stream: S) -> FromStream - where - S: Stream + Unpin, - S::Item: Frame { - FromStream { stream, underlying_exhausted: false } -} - impl StreamingSignal for FromStream where S: Stream + Unpin, -- cgit v1.2.1 From 0179caa3f696ab88710b86943bc697fa1c6cf158 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Mon, 4 Jan 2021 17:22:07 +0100 Subject: change audio stream to send packages --- mumd/src/audio.rs | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) (limited to 'mumd/src/audio.rs') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index d7b2060..bf3f7f4 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -5,22 +5,24 @@ use crate::audio::output::SaturatingAdd; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{SampleFormat, SampleRate, StreamConfig}; +use dasp_frame::Frame; use dasp_interpolate::linear::Linear; +use dasp_sample::{SignedSample, ToSample, Sample}; use dasp_signal::{self as signal, Signal}; +use futures::Stream; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; use log::*; -use mumble_protocol::voice::VoicePacketPayload; +use mumble_protocol::voice::{VoicePacketPayload, VoicePacket}; use opus::Channels; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; +use std::fmt::Debug; +use std::future::{Future}; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use tokio::sync::watch; -use dasp_frame::Frame; -use dasp_sample::{SignedSample, ToSample, Sample}; -use futures::Stream; -use futures::task::{Context, Poll}; -use std::pin::Pin; -use std::future::{Future}; -use std::fmt::Debug; +use mumble_protocol::Serverbound; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -72,7 +74,7 @@ pub struct Audio { _output_stream: cpal::Stream, _input_stream: cpal::Stream, - input_channel_receiver: Option> + Unpin>>, + input_channel_receiver: Arc> + Unpin>>>, input_volume_sender: watch::Sender, output_volume_sender: watch::Sender, @@ -207,7 +209,14 @@ impl Audio { StreamingNoiseGate::new( from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly 0.09, - 10_000))); + 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: i as u64, + payload: VoicePacketPayload::Opus(e.into(), false), + position_info: None, + }); output_stream.play().unwrap(); @@ -248,7 +257,7 @@ impl Audio { _output_stream: output_stream, _input_stream: input_stream, input_volume_sender, - input_channel_receiver: Some(Box::new(opus_stream)), + input_channel_receiver: Arc::new(Mutex::new(Box::new(opus_stream))), client_streams, sounds, output_volume_sender, @@ -298,8 +307,8 @@ impl Audio { } } - pub fn take_receiver(&mut self) -> Option> + Unpin>> { - self.input_channel_receiver.take() + pub fn take_receiver(&mut self) -> Arc> + Unpin>>> { + Arc::clone(&self.input_channel_receiver) } pub fn clear_clients(&mut self) { @@ -668,4 +677,4 @@ impl Stream for OpusEncoder s.input_buffer.clear(); Poll::Ready(Some(encoded)) } -} \ No newline at end of file +} -- cgit v1.2.1