From dbcc5373fab41d876f4495463d76c1ab28c9d670 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sun, 27 Dec 2020 15:43:58 +0100 Subject: create noise-gate struct --- Cargo.lock | 3 +++ mumd/Cargo.toml | 3 +++ mumd/src/audio.rs | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 945a736..e8e5cd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1161,7 +1161,10 @@ dependencies = [ "argparse", "bytes 0.5.6", "cpal", + "dasp_frame", "dasp_interpolate", + "dasp_ring_buffer", + "dasp_sample", "dasp_signal", "futures", "futures-util", diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml index 7f014bd..e548559 100644 --- a/mumd/Cargo.toml +++ b/mumd/Cargo.toml @@ -23,6 +23,9 @@ bytes = "0.5" cpal = "0.13" dasp_interpolate = { version = "0.11", features = ["linear"] } dasp_signal = "0.11" +dasp_frame = "0.11" +dasp_sample = "0.11" +dasp_ring_buffer = "0.11" futures = "0.3" futures-util = "0.3" hound = "3.4" diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 8f9e2ab..bed9ceb 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -14,6 +14,9 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; +use dasp_frame::Frame; +use dasp_sample::{Sample, SignedSample}; +use dasp_ring_buffer::Fixed; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -354,3 +357,65 @@ impl Audio { play_sounds.extend(samples.iter().skip(l)); } } + +struct NoiseGate { + open: bool, + signal: S, + buffer: dasp_ring_buffer::Fixed>, + activate_threshold: <::Sample as Sample>::Float, + deactivate_threshold: <::Sample as Sample>::Float, +} + +impl NoiseGate { + pub fn new(signal: S, activate_threshold: <::Sample as Sample>::Float, deactivate_threshold: <::Sample as Sample>::Float) -> NoiseGate { + Self { + open: false, + signal, + buffer: Fixed::from(vec![::EQUILIBRIUM; 4096]), + activate_threshold, + deactivate_threshold, + } + } +} + +impl Signal for NoiseGate { + type Frame = S::Frame; + + fn next(&mut self) -> Self::Frame { + let frame = self.signal.next(); + self.buffer.push(frame); + + if self.open && self.buffer + .iter() + .all(|f| f.to_float_frame() + .channels() + .all(|s| abs(s - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) <= self.deactivate_threshold)) { + self.open = false; + } else if !self.open && self.buffer + .iter() + .any(|f| f.to_float_frame() + .channels() + .any(|s| abs(s - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) >= self.activate_threshold)) { + self.open = true; + } + + if self.open { + frame + } else { + S::Frame::EQUILIBRIUM + } + } + + fn is_exhausted(&self) -> bool { + self.signal.is_exhausted() + } +} + +fn abs(sample: S) -> S { + let zero = S::EQUILIBRIUM; + if sample >= zero { + sample + } else { + -sample + } +} \ No newline at end of file -- cgit v1.2.1 From 9777219aa26dc64c0a3dc3d9d2023b8a1c4295fb Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Fri, 1 Jan 2021 14:30:34 +0100 Subject: add initial streaming signals --- mumd/src/audio.rs | 133 ++++++++++++++++++++++++++++++++++++++++++++++-- mumd/src/audio/input.rs | 2 + 2 files changed, 132 insertions(+), 3 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index bed9ceb..ee5516a 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -4,7 +4,7 @@ pub mod output; use crate::audio::output::SaturatingAdd; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{SampleFormat, SampleRate, Stream, StreamConfig}; +use cpal::{SampleFormat, SampleRate, StreamConfig}; use dasp_interpolate::linear::Linear; use dasp_signal::{self as signal, Signal}; use log::*; @@ -17,6 +17,13 @@ use tokio::sync::{mpsc, watch}; use dasp_frame::Frame; use dasp_sample::{Sample, SignedSample}; use dasp_ring_buffer::Fixed; +use futures::Stream; +use futures::task::{Context, Poll}; +use std::pin::Pin; +use tokio::stream::StreamExt; +use std::convert::identity; +use std::future::Future; +use std::mem; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -65,8 +72,8 @@ pub enum NotificationEvents { pub struct Audio { output_config: StreamConfig, - _output_stream: Stream, - _input_stream: Stream, + _output_stream: cpal::Stream, + _input_stream: cpal::Stream, input_channel_receiver: Option>, input_volume_sender: watch::Sender, @@ -418,4 +425,124 @@ fn abs(sample: S) -> S { } else { -sample } +} + +trait StreamingSignal { + type Frame: Frame; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; +} + +trait StreamingSignalExt: StreamingSignal { + fn next(&mut self) -> Next<'_, Self> { + Next { + stream: self + } + } +} + +struct Next<'a, S: ?Sized> { + stream: &'a mut S +} + +impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> { + type Output = S::Frame; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match S::poll_next(Pin::new(self.stream), cx) { + Poll::Ready(val) => { + Poll::Ready(val) + } + Poll::Pending => Poll::Pending + } + } +} + +struct FromStream { + stream: S, + next: Option, +} + +async fn from_stream(mut stream: S) -> FromStream + where + S: Stream + Unpin, + S::Item: Frame { + let next = stream.next().await; + FromStream { stream, next } +} + +impl StreamingSignal for FromStream + where + S: Stream + Unpin, + S::Item: Frame + Unpin { + type Frame = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let s = self.get_mut(); + match s.next.take() { + Some(v) => { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(val) => { + s.next = val; + Poll::Ready(v) + } + Poll::Pending => Poll::Pending + } + } + None => Poll::Ready(::EQUILIBRIUM) + } + } +} + + +struct FromInterleavedSamplesStream + where + F: Frame { + stream: S, + next: Option>, +} + +async fn from_interleaved_samples_stream(mut stream: S) -> FromInterleavedSamplesStream + where + S: Stream + Unpin, + S::Item: Sample, + F: Frame { + let mut data = Vec::with_capacity(F::CHANNELS); + for _ in 0..F::CHANNELS { + data.push(stream.next().await); + } + let data = data.into_iter().flat_map(identity).collect::>(); + FromInterleavedSamplesStream { stream, next: if data.len() == F::CHANNELS { Some(data) } else { None } } +} + +impl StreamingSignal for FromInterleavedSamplesStream + where + S: Stream + Unpin, + S::Item: Sample + Unpin, + F: Frame { + type Frame = F; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let s = self.get_mut(); + if s.next.is_some() { + if s.next.as_ref().unwrap().len() == F::CHANNELS { + let mut data_buf = mem::replace(&mut s.next, Some(Vec::new())).unwrap().into_iter(); + Poll::Ready(F::from_samples(&mut data_buf).unwrap()) + } else { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.next.as_mut().unwrap().push(v); + Poll::Pending + } + Poll::Ready(None) => { + s.next = None; + Poll::Ready(F::EQUILIBRIUM) + } + Poll::Pending => Poll::Pending, + } + } + } else { + Poll::Ready(F::EQUILIBRIUM) + } + } } \ No newline at end of file diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 7405fdb..d04c728 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -4,6 +4,7 @@ use mumble_protocol::voice::VoicePacketPayload; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; +use log::*; pub fn callback( mut opus_encoder: opus::Encoder, @@ -26,6 +27,7 @@ pub fn callback( let buf = Arc::new(Mutex::new(VecDeque::new())); move |data: &[T], _info: &InputCallbackInfo| { + debug!("{:?}", _info); let mut buf = buf.lock().unwrap(); let input_volume = *input_volume_receiver.borrow(); let out: Vec = data -- cgit v1.2.1 From ee13c36868f08203d548c3221300651c5108b8a9 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Fri, 1 Jan 2021 14:33:19 +0100 Subject: update to use tokio 1.0 --- Cargo.lock | 1 + mumd/Cargo.toml | 1 + mumd/src/audio.rs | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index abefeb1..cdd53a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1217,6 +1217,7 @@ dependencies = [ "serde", "tokio", "tokio-native-tls", + "tokio-stream", "tokio-util", ] diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml index caf1a9b..101e614 100644 --- a/mumd/Cargo.toml +++ b/mumd/Cargo.toml @@ -37,6 +37,7 @@ openssl = { version = "0.10" } opus = "0.2" serde = { version = "1.0", features = ["derive"] } tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "net", "time"] } +tokio-stream = "0.1.0" tokio-native-tls = "0.3" tokio-util = { version = "0.6", features = ["codec", "net"] } diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 0998f06..dc0b77d 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -20,7 +20,7 @@ use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; use std::pin::Pin; -use tokio::stream::StreamExt; +use tokio_stream::StreamExt; use std::convert::identity; use std::future::Future; use std::mem; -- cgit v1.2.1 From 65d7b5e907ffbb594319e13684f7f566c0ad2264 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Fri, 1 Jan 2021 15:08:05 +0100 Subject: add AudioStream struct --- mumd/src/audio/input.rs | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 914891b..01fd1f3 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -5,6 +5,10 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; use log::*; +use futures::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use futures_util::task::Waker; pub fn callback( mut opus_encoder: opus::Encoder, @@ -54,3 +58,38 @@ pub fn callback( } } } + +struct AudioStream { + data: Arc, Option)>>, +} + +impl AudioStream { + fn new() -> Self { + Self { + data: Arc::new(Mutex::new((VecDeque::new(), None))) + } + } + + fn insert_sample(&self, sample: T) { + let mut data = self.data.lock().unwrap(); + data.0.push_back(sample); + if let Some(waker) = data.1.take() { + waker.wake(); + } + } +} + +impl Stream for AudioStream { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let s = self.get_mut(); + let mut data = s.data.lock().unwrap(); + if data.0.len() > 0 { + Poll::Ready(data.0.pop_front()) + } else { + data.1 = Some(cx.waker().clone()); + Poll::Pending + } + } +} \ No newline at end of file -- cgit v1.2.1 From 5cdac93a475b4402680ac8d274677f4ba29b1e25 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 08:42:52 +0100 Subject: add OpusEncoder stream --- Cargo.lock | 1 + mumd/Cargo.toml | 1 + mumd/src/audio.rs | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index cdd53a7..25591ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1196,6 +1196,7 @@ name = "mumd" version = "0.3.0" dependencies = [ "argparse", + "async-stream", "bytes", "cpal", "dasp_frame", diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml index 101e614..d12cec2 100644 --- a/mumd/Cargo.toml +++ b/mumd/Cargo.toml @@ -19,6 +19,7 @@ notifications = ["libnotify"] mumlib = { version = "0.3", path = "../mumlib" } argparse = "0.2" +async-stream = "0.3.0" cpal = "0.13" bytes = "1.0" dasp_interpolate = { version = "0.11", features = ["linear"] } diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index dc0b77d..324b615 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -15,7 +15,7 @@ use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; use dasp_frame::Frame; -use dasp_sample::{Sample, SignedSample}; +use dasp_sample::{Sample, SignedSample, ToSample}; use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; @@ -545,4 +545,74 @@ impl StreamingSignal for FromInterleavedSamplesStream Poll::Ready(F::EQUILIBRIUM) } } +} + +struct OpusEncoder { + encoder: opus::Encoder, + frame_size: u32, + sample_rate: u32, + channels: usize, + stream: S, + input_buffer: Vec, + exhausted: bool, +} + +impl OpusEncoder + where + S: Stream, + I: ToSample { + fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self { + let encoder = opus::Encoder::new( + sample_rate, + match channels { + 1 => Channels::Mono, + 2 => Channels::Stereo, + _ => unimplemented!( + "Only 1 or 2 channels supported, got {})", + channels + ), + }, + opus::Application::Voip, + ).unwrap(); + Self { + encoder, + frame_size, + sample_rate, + channels, + stream, + input_buffer: Vec::new(), + exhausted: false, + } + } +} + +impl Stream for OpusEncoder + where + S: Stream + Unpin, + I: Sample + ToSample { + type Item = Vec; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let s = self.get_mut(); + if s.exhausted { + return Poll::Ready(None); + } + let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize; + while s.input_buffer.len() < opus_frame_size { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.input_buffer.push(v.to_sample::()); + } + Poll::Ready(None) => { + s.exhausted = true; + return Poll::Ready(None); + } + Poll::Pending => return Poll::Pending, + } + } + + let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap(); + s.input_buffer.clear(); + Poll::Ready(Some(encoded)) + } } \ No newline at end of file -- cgit v1.2.1 From ffaa6c3e017e98054f1840d187c10fcdb593b48f Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 08:43:15 +0100 Subject: add blanket impl for StreamingSignal --- mumd/src/audio.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 324b615..cd6141a 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -433,6 +433,16 @@ trait StreamingSignal { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; } +impl StreamingSignal for S + where + S: Signal + Unpin { + type Frame = S::Frame; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Ready(self.get_mut().next()) + } +} + trait StreamingSignalExt: StreamingSignal { fn next(&mut self) -> Next<'_, Self> { Next { -- cgit v1.2.1 From 22481e0fd03102d2fa01a5025049c10a5539e356 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 08:52:55 +0100 Subject: fix from_interleaved_samples_iter --- mumd/src/audio.rs | 50 ++++++++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index cd6141a..7d4929c 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -21,9 +21,7 @@ use futures::Stream; use futures::task::{Context, Poll}; use std::pin::Pin; use tokio_stream::StreamExt; -use std::convert::identity; use std::future::Future; -use std::mem; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -509,20 +507,20 @@ struct FromInterleavedSamplesStream where F: Frame { stream: S, - next: Option>, + next: Vec, + underlying_exhausted: bool, } -async fn from_interleaved_samples_stream(mut stream: S) -> FromInterleavedSamplesStream +async fn from_interleaved_samples_stream(stream: S) -> FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample, F: Frame { - let mut data = Vec::with_capacity(F::CHANNELS); - for _ in 0..F::CHANNELS { - data.push(stream.next().await); + FromInterleavedSamplesStream { + stream, + next: Vec::new(), + underlying_exhausted: false } - let data = data.into_iter().flat_map(identity).collect::>(); - FromInterleavedSamplesStream { stream, next: if data.len() == F::CHANNELS { Some(data) } else { None } } } impl StreamingSignal for FromInterleavedSamplesStream @@ -534,26 +532,24 @@ impl StreamingSignal for FromInterleavedSamplesStream fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); - if s.next.is_some() { - if s.next.as_ref().unwrap().len() == F::CHANNELS { - let mut data_buf = mem::replace(&mut s.next, Some(Vec::new())).unwrap().into_iter(); - Poll::Ready(F::from_samples(&mut data_buf).unwrap()) - } else { - match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(Some(v)) => { - s.next.as_mut().unwrap().push(v); - Poll::Pending - } - Poll::Ready(None) => { - s.next = None; - Poll::Ready(F::EQUILIBRIUM) - } - Poll::Pending => Poll::Pending, + if s.underlying_exhausted { + return Poll::Ready(F::EQUILIBRIUM); + } + while s.next.len() < F::CHANNELS { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.next.push(v); } + Poll::Ready(None) => { + s.underlying_exhausted = true; + return Poll::Ready(F::EQUILIBRIUM); + } + Poll::Pending => return Poll::Pending, } - } else { - Poll::Ready(F::EQUILIBRIUM) } + + let mut data = s.next.iter().cloned(); + Poll::Ready(F::from_samples(&mut data).unwrap()) } } @@ -561,7 +557,6 @@ struct OpusEncoder { encoder: opus::Encoder, frame_size: u32, sample_rate: u32, - channels: usize, stream: S, input_buffer: Vec, exhausted: bool, @@ -588,7 +583,6 @@ impl OpusEncoder encoder, frame_size, sample_rate, - channels, stream, input_buffer: Vec::new(), exhausted: false, -- cgit v1.2.1 From bcf60257a86fec924dda8c71ab3a5fe0be5b74cd Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 08:55:18 +0100 Subject: add is_exhausted method to StreamingSignal trait --- mumd/src/audio.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 7d4929c..9c9648b 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -429,6 +429,10 @@ trait StreamingSignal { type Frame: Frame; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; + + fn is_exhausted(&self) -> bool { + false + } } impl StreamingSignal for S -- cgit v1.2.1 From 8f212e65f1bc9187ff0854ed0b83e0b25be5847a Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 09:05:26 +0100 Subject: implement is_exhausted in a reasonable way --- mumd/src/audio.rs | 44 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 9c9648b..0aae1cf 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -22,6 +22,7 @@ use futures::task::{Context, Poll}; use std::pin::Pin; use tokio_stream::StreamExt; use std::future::Future; +use std::mem; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -504,6 +505,10 @@ impl StreamingSignal for FromStream None => Poll::Ready(::EQUILIBRIUM) } } + + fn is_exhausted(&self) -> bool { + self.next.is_none() + } } @@ -511,19 +516,37 @@ struct FromInterleavedSamplesStream where F: Frame { stream: S, - next: Vec, + next_frame: Option, + next_buf: Vec, underlying_exhausted: bool, } -async fn from_interleaved_samples_stream(stream: S) -> FromInterleavedSamplesStream +async fn from_interleaved_samples_stream(mut stream: S) -> FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample, F: Frame { + let mut i = 0; + let mut buf = Vec::new(); + let next = loop { + if i == F::CHANNELS { + let mut iter = buf.into_iter(); + break F::from_samples(&mut iter); + } + match stream.next().await { + Some(v) => { + buf.push(v); + } + None => break None, + } + + i += 1; + }; FromInterleavedSamplesStream { stream, - next: Vec::new(), - underlying_exhausted: false + next_buf: Vec::new(), + underlying_exhausted: false, + next_frame: next, } } @@ -531,7 +554,7 @@ impl StreamingSignal for FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample + Unpin, - F: Frame { + F: Frame + Unpin { type Frame = F; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -539,21 +562,24 @@ impl StreamingSignal for FromInterleavedSamplesStream if s.underlying_exhausted { return Poll::Ready(F::EQUILIBRIUM); } - while s.next.len() < F::CHANNELS { + while s.next_buf.len() < F::CHANNELS { match S::poll_next(Pin::new(&mut s.stream), cx) { Poll::Ready(Some(v)) => { - s.next.push(v); + s.next_buf.push(v); } Poll::Ready(None) => { s.underlying_exhausted = true; + s.next_frame = None; return Poll::Ready(F::EQUILIBRIUM); } Poll::Pending => return Poll::Pending, } } - let mut data = s.next.iter().cloned(); - Poll::Ready(F::from_samples(&mut data).unwrap()) + let mut data = s.next_buf.iter().cloned(); + let n = F::from_samples(&mut data).unwrap(); + let ret = mem::replace(&mut s.next_frame, Some(n)).unwrap(); + Poll::Ready(ret) } } -- cgit v1.2.1 From f52329ef65b96d1e5d1fd25dabd51f0fdd23ff92 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 09:09:27 +0100 Subject: fix FromStream --- mumd/src/audio.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 0aae1cf..afe644c 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -492,17 +492,15 @@ impl StreamingSignal for FromStream fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); - match s.next.take() { - Some(v) => { - match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(val) => { - s.next = val; - Poll::Ready(v) - } - Poll::Pending => Poll::Pending - } + if s.next.is_none() { + return Poll::Ready(::EQUILIBRIUM); + } + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(val) => { + let ret = mem::replace(&mut s.next, val); + Poll::Ready(ret.unwrap()) } - None => Poll::Ready(::EQUILIBRIUM) + Poll::Pending => Poll::Pending, } } -- cgit v1.2.1 From 7fbbf89cc16734e59882ab71e2aed54e4c048733 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 09:32:30 +0100 Subject: add IntoInterleavedSamples struct --- mumd/src/audio.rs | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index afe644c..828dbc5 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -452,6 +452,12 @@ trait StreamingSignalExt: StreamingSignal { stream: self } } + + fn into_interleaved_samples(self) -> IntoInterleavedSamples + where + Self: Sized { + IntoInterleavedSamples { signal: self, current_frame: None } + } } struct Next<'a, S: ?Sized> { @@ -471,6 +477,35 @@ impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> { } } +struct IntoInterleavedSamples { + signal: S, + current_frame: Option<::Channels>, +} + +impl Stream for IntoInterleavedSamples + where + S: StreamingSignal + Unpin, + <::Frame as Frame>::Channels: Unpin { + type Item = ::Sample; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let s = self.get_mut(); + loop { + if s.current_frame.is_none() { + match S::poll_next(Pin::new(&mut s.signal), cx) { + Poll::Ready(val) => { + s.current_frame = Some(val.channels()); + } + Poll::Pending => return Poll::Pending, + } + } + if let Some(channel) = s.current_frame.as_mut().unwrap().next() { + return Poll::Ready(Some(channel)); + } + } + } +} + struct FromStream { stream: S, next: Option, -- cgit v1.2.1 From 08e64c1b9d622026bcbe1f80d2d5d64dd80af8f9 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 09:47:14 +0100 Subject: add streaming version of NoiseGate --- mumd/src/audio.rs | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 2 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 828dbc5..0a2465e 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -21,7 +21,7 @@ use futures::Stream; use futures::task::{Context, Poll}; use std::pin::Pin; use tokio_stream::StreamExt; -use std::future::Future; +use std::future::{Future}; use std::mem; //TODO? move to mumlib @@ -373,7 +373,11 @@ struct NoiseGate { } impl NoiseGate { - pub fn new(signal: S, activate_threshold: <::Sample as Sample>::Float, deactivate_threshold: <::Sample as Sample>::Float) -> NoiseGate { + pub fn new( + signal: S, + activate_threshold: <::Sample as Sample>::Float, + deactivate_threshold: <::Sample as Sample>::Float + ) -> NoiseGate { Self { open: false, signal, @@ -417,6 +421,72 @@ impl Signal for NoiseGate { } } +struct StreamingNoiseGate { + open: bool, + signal: S, + buffer: dasp_ring_buffer::Fixed>, + activate_threshold: <::Sample as Sample>::Float, + deactivate_threshold: <::Sample as Sample>::Float, +} + +impl StreamingNoiseGate { + pub fn new( + signal: S, + activate_threshold: <::Sample as Sample>::Float, + deactivate_threshold: <::Sample as Sample>::Float + ) -> StreamingNoiseGate { + Self { + open: false, + signal, + buffer: Fixed::from(vec![::EQUILIBRIUM; 4096]), + activate_threshold, + deactivate_threshold, + } + } +} + +impl StreamingSignal for StreamingNoiseGate + where + S: StreamingSignal + Unpin, + <<::Frame as Frame>::Sample as Sample>::Float: Unpin, + ::Frame: Unpin { + type Frame = S::Frame; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let s = self.get_mut(); + + let frame = match S::poll_next(Pin::new(&mut s.signal), cx) { + Poll::Ready(v) => v, + Poll::Pending => return Poll::Pending, + }; + s.buffer.push(frame); + + if s.open && s.buffer + .iter() + .all(|f| f.to_float_frame() + .channels() + .all(|sample| abs(sample - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) <= s.deactivate_threshold)) { + s.open = false; + } else if !s.open && s.buffer + .iter() + .any(|f| f.to_float_frame() + .channels() + .any(|sample| abs(sample - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) >= s.activate_threshold)) { + s.open = true; + } + + if s.open { + Poll::Ready(frame) + } else { + Poll::Ready(S::Frame::EQUILIBRIUM) + } + } + + fn is_exhausted(&self) -> bool { + self.signal.is_exhausted() + } +} + fn abs(sample: S) -> S { let zero = S::EQUILIBRIUM; if sample >= zero { -- cgit v1.2.1 From 76a3f1ea5489048e6d32982119429daa05dde3e0 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 11:17:49 +0100 Subject: make audio sending use streams --- mumd/src/audio.rs | 71 +++++++++++++++++++++---------------------------- mumd/src/audio/input.rs | 46 +++++--------------------------- mumd/src/main.rs | 2 +- mumd/src/network/udp.rs | 11 ++++---- mumd/src/state.rs | 4 +-- 5 files changed, 45 insertions(+), 89 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 0a2465e..a8af82d 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -13,9 +13,9 @@ use opus::Channels; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; use dasp_frame::Frame; -use dasp_sample::{Sample, SignedSample, ToSample}; +use dasp_sample::{SignedSample, ToSample, Sample}; use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; @@ -74,7 +74,7 @@ pub struct Audio { _output_stream: cpal::Stream, _input_stream: cpal::Stream, - input_channel_receiver: Option>, + input_channel_receiver: Option> + Unpin>>, input_volume_sender: watch::Sender, output_volume_sender: watch::Sender, @@ -88,7 +88,7 @@ pub struct Audio { } impl Audio { - pub fn new(input_volume: f32, output_volume: f32) -> Self { + pub async fn new(input_volume: f32, output_volume: f32) -> Self { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); @@ -169,20 +169,7 @@ impl Audio { } .unwrap(); - let input_encoder = opus::Encoder::new( - input_config.sample_rate.0, - match input_config.channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!( - "Only 1 or 2 channels supported, got {})", - input_config.channels - ), - }, - opus::Application::Voip, - ) - .unwrap(); - let (input_sender, input_receiver) = mpsc::channel(100); + let (sample_sender, sample_receiver) = futures::channel::mpsc::channel(1_000_000); let (input_volume_sender, input_volume_receiver) = watch::channel::(input_volume); @@ -190,39 +177,37 @@ impl Audio { SampleFormat::F32 => input_device.build_input_stream( &input_config, input::callback::( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), SampleFormat::I16 => input_device.build_input_stream( &input_config, input::callback::( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), SampleFormat::U16 => input_device.build_input_stream( &input_config, input::callback::( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), } .unwrap(); + let opus_stream = OpusEncoder::new( + 4, + input_config.sample_rate.0, + input_config.channels as usize, + StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver).await)); //TODO attach a noise gate + //TODO group frames correctly + output_stream.play().unwrap(); let sounds = EVENT_SOUNDS @@ -246,7 +231,7 @@ impl Audio { _ => unimplemented!() // TODO handle gracefully (this might not even happen) }; let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter); - let interp = Linear::new(signal.next(), signal.next()); + let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal)); let samples = signal .from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64) .until_exhausted() @@ -262,7 +247,7 @@ impl Audio { _output_stream: output_stream, _input_stream: input_stream, input_volume_sender, - input_channel_receiver: Some(input_receiver), + input_channel_receiver: Some(Box::new(opus_stream)), client_streams, sounds, output_volume_sender, @@ -312,7 +297,7 @@ impl Audio { } } - pub fn take_receiver(&mut self) -> Option> { + pub fn take_receiver(&mut self) -> Option> + Unpin>> { self.input_channel_receiver.take() } @@ -530,6 +515,9 @@ trait StreamingSignalExt: StreamingSignal { } } +impl StreamingSignalExt for S + where S: StreamingSignal {} + struct Next<'a, S: ?Sized> { stream: &'a mut S } @@ -561,16 +549,16 @@ impl Stream for IntoInterleavedSamples fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let s = self.get_mut(); loop { - if s.current_frame.is_none() { - match S::poll_next(Pin::new(&mut s.signal), cx) { - Poll::Ready(val) => { - s.current_frame = Some(val.channels()); - } - Poll::Pending => return Poll::Pending, + if s.current_frame.is_some() { + if let Some(channel) = s.current_frame.as_mut().unwrap().next() { + return Poll::Ready(Some(channel)); } } - if let Some(channel) = s.current_frame.as_mut().unwrap().next() { - return Poll::Ready(Some(channel)); + match S::poll_next(Pin::new(&mut s.signal), cx) { + Poll::Ready(val) => { + s.current_frame = Some(val.channels()); + } + Poll::Pending => return Poll::Pending, } } } @@ -681,6 +669,7 @@ impl StreamingSignal for FromInterleavedSamplesStream let mut data = s.next_buf.iter().cloned(); let n = F::from_samples(&mut data).unwrap(); + s.next_buf.clear(); let ret = mem::replace(&mut s.next_frame, Some(n)).unwrap(); Poll::Ready(ret) } diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 01fd1f3..8f0fe6e 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -1,9 +1,7 @@ -use bytes::Bytes; use cpal::{InputCallbackInfo, Sample}; -use mumble_protocol::voice::VoicePacketPayload; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; use log::*; use futures::Stream; use std::pin::Pin; @@ -11,50 +9,18 @@ use std::task::{Context, Poll}; use futures_util::task::Waker; pub fn callback( - mut opus_encoder: opus::Encoder, - input_sender: mpsc::Sender, - sample_rate: u32, + mut input_sender: futures::channel::mpsc::Sender, input_volume_receiver: watch::Receiver, - opus_frame_size_blocks: u32, // blocks of 2.5ms ) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { - if !(opus_frame_size_blocks == 1 - || opus_frame_size_blocks == 2 - || opus_frame_size_blocks == 4 - || opus_frame_size_blocks == 8) - { - panic!( - "Unsupported amount of opus frame blocks {}", - opus_frame_size_blocks - ); - } - let opus_frame_size = opus_frame_size_blocks * sample_rate / 400; - - let buf = Arc::new(Mutex::new(VecDeque::new())); move |data: &[T], _info: &InputCallbackInfo| { - debug!("{:?}", _info); - let mut buf = buf.lock().unwrap(); let input_volume = *input_volume_receiver.borrow(); - let out: Vec = data + for sample in data .iter() .map(|e| e.to_f32()) - .map(|e| e * input_volume) - .collect(); - buf.extend(out); - while buf.len() >= opus_frame_size as usize { - let tail = buf.split_off(opus_frame_size as usize); - let mut opus_buf: Vec = vec![0; opus_frame_size as usize]; - let result = opus_encoder - .encode_float(&Vec::from(buf.clone()), &mut opus_buf) - .unwrap(); - opus_buf.truncate(result); - let bytes = Bytes::copy_from_slice(&opus_buf); - match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) { - Ok(_) => {} - Err(_e) => { - //warn!("Error sending audio packet: {:?}", e); - } + .map(|e| e * input_volume) { + if let Err(_e) = input_sender.try_send(sample) { + // warn!("Error sending audio: {}", e) } - *buf = tail; } } } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index db6d2ef..4d6f148 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -58,7 +58,7 @@ async fn main() { let (response_sender, response_receiver) = mpsc::unbounded_channel(); let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel(); - let state = State::new(packet_sender, connection_info_sender); + let state = State::new(packet_sender, connection_info_sender).await; let state = Arc::new(Mutex::new(state)); let (_, _, _, e, _) = join!( diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index b592a60..d412d55 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -3,7 +3,7 @@ use crate::state::{State, StatePhase}; use log::*; use bytes::Bytes; -use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt, Stream}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; @@ -54,7 +54,7 @@ pub async fn handle( Arc::clone(&sink), connection_info.socket_addr, phase_watcher, - &mut receiver + &mut *receiver ), new_crypt_state(&mut crypt_state_receiver, sink, source) ); @@ -198,8 +198,9 @@ async fn send_voice( sink: Arc>, server_addr: SocketAddr, mut phase_watcher: watch::Receiver, - receiver: &mut mpsc::Receiver, + receiver: &mut (dyn Stream> + Unpin), ) { + pin_mut!(receiver); let (tx, rx) = oneshot::channel(); let phase_transition_block = async { loop { @@ -216,7 +217,7 @@ async fn send_voice( pin_mut!(rx); let mut count = 0; loop { - let packet_recv = receiver.recv().fuse(); + let packet_recv = receiver.next().fuse(); pin_mut!(packet_recv); let exitor = select! { data = packet_recv => Some(data), @@ -236,7 +237,7 @@ async fn send_voice( target: 0, // normal speech session_id: (), // unused for server-bound packets seq_num: count, - payload, + payload: VoicePacketPayload::Opus(payload.into(), false), position_info: None, }; count += 1; diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 85e5449..1421691 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -64,7 +64,7 @@ pub struct State { } impl State { - pub fn new( + pub async fn new( packet_sender: mpsc::UnboundedSender>, connection_info_sender: watch::Sender>, ) -> Self { @@ -72,7 +72,7 @@ impl State { let audio = Audio::new( config.audio.input_volume.unwrap_or(1.0), config.audio.output_volume.unwrap_or(1.0), - ); + ).await; let mut state = Self { config, server: None, -- cgit v1.2.1 From 0f225e518b6889f604cb440f14824b21ed49bf37 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 11:24:59 +0100 Subject: make a bunch of functions sync --- mumd/src/audio.rs | 55 ++++++++++++++++++------------------------------- mumd/src/audio/input.rs | 1 - mumd/src/main.rs | 2 +- mumd/src/state.rs | 4 ++-- 4 files changed, 23 insertions(+), 39 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index a8af82d..9c3c143 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -22,7 +22,6 @@ use futures::task::{Context, Poll}; use std::pin::Pin; use tokio_stream::StreamExt; use std::future::{Future}; -use std::mem; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -88,7 +87,7 @@ pub struct Audio { } impl Audio { - pub async fn new(input_volume: f32, output_volume: f32) -> Self { + pub fn new(input_volume: f32, output_volume: f32) -> Self { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); @@ -205,7 +204,7 @@ impl Audio { 4, input_config.sample_rate.0, input_config.channels as usize, - StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver).await)); //TODO attach a noise gate + StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver))); //TODO attach a noise gate //TODO group frames correctly output_stream.play().unwrap(); @@ -564,17 +563,16 @@ impl Stream for IntoInterleavedSamples } } -struct FromStream { +struct FromStream { stream: S, - next: Option, + underlying_exhausted: bool, } -async fn from_stream(mut stream: S) -> FromStream +fn from_stream(mut stream: S) -> FromStream where S: Stream + Unpin, S::Item: Frame { - let next = stream.next().await; - FromStream { stream, next } + FromStream { stream, underlying_exhausted: false } } impl StreamingSignal for FromStream @@ -585,20 +583,23 @@ impl StreamingSignal for FromStream fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); - if s.next.is_none() { + if s.underlying_exhausted { return Poll::Ready(::EQUILIBRIUM); } match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(val) => { - let ret = mem::replace(&mut s.next, val); - Poll::Ready(ret.unwrap()) + Poll::Ready(Some(val)) => { + Poll::Ready(val) + } + Poll::Ready(None) => { + s.underlying_exhausted = true; + return Poll::Ready(::EQUILIBRIUM); } Poll::Pending => Poll::Pending, } } fn is_exhausted(&self) -> bool { - self.next.is_none() + self.underlying_exhausted } } @@ -607,37 +608,19 @@ struct FromInterleavedSamplesStream where F: Frame { stream: S, - next_frame: Option, next_buf: Vec, underlying_exhausted: bool, } -async fn from_interleaved_samples_stream(mut stream: S) -> FromInterleavedSamplesStream +fn from_interleaved_samples_stream(stream: S) -> FromInterleavedSamplesStream where S: Stream + Unpin, S::Item: Sample, F: Frame { - let mut i = 0; - let mut buf = Vec::new(); - let next = loop { - if i == F::CHANNELS { - let mut iter = buf.into_iter(); - break F::from_samples(&mut iter); - } - match stream.next().await { - Some(v) => { - buf.push(v); - } - None => break None, - } - - i += 1; - }; FromInterleavedSamplesStream { stream, next_buf: Vec::new(), underlying_exhausted: false, - next_frame: next, } } @@ -660,7 +643,6 @@ impl StreamingSignal for FromInterleavedSamplesStream } Poll::Ready(None) => { s.underlying_exhausted = true; - s.next_frame = None; return Poll::Ready(F::EQUILIBRIUM); } Poll::Pending => return Poll::Pending, @@ -670,8 +652,11 @@ impl StreamingSignal for FromInterleavedSamplesStream let mut data = s.next_buf.iter().cloned(); let n = F::from_samples(&mut data).unwrap(); s.next_buf.clear(); - let ret = mem::replace(&mut s.next_frame, Some(n)).unwrap(); - Poll::Ready(ret) + Poll::Ready(n) + } + + fn is_exhausted(&self) -> bool { + self.underlying_exhausted } } diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 8f0fe6e..febcb17 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -2,7 +2,6 @@ use cpal::{InputCallbackInfo, Sample}; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use tokio::sync::watch; -use log::*; use futures::Stream; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 4d6f148..db6d2ef 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -58,7 +58,7 @@ async fn main() { let (response_sender, response_receiver) = mpsc::unbounded_channel(); let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel(); - let state = State::new(packet_sender, connection_info_sender).await; + let state = State::new(packet_sender, connection_info_sender); let state = Arc::new(Mutex::new(state)); let (_, _, _, e, _) = join!( diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 1421691..85e5449 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -64,7 +64,7 @@ pub struct State { } impl State { - pub async fn new( + pub fn new( packet_sender: mpsc::UnboundedSender>, connection_info_sender: watch::Sender>, ) -> Self { @@ -72,7 +72,7 @@ impl State { let audio = Audio::new( config.audio.input_volume.unwrap_or(1.0), config.audio.output_volume.unwrap_or(1.0), - ).await; + ); let mut state = Self { config, server: None, -- cgit v1.2.1 From 30a0c3b479b1ff39ad2bf9fbd58c93634ed418b2 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 11:25:29 +0100 Subject: remove AudioStream struct --- mumd/src/audio/input.rs | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index febcb17..517c9ce 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -22,39 +22,4 @@ pub fn callback( } } } -} - -struct AudioStream { - data: Arc, Option)>>, -} - -impl AudioStream { - fn new() -> Self { - Self { - data: Arc::new(Mutex::new((VecDeque::new(), None))) - } - } - - fn insert_sample(&self, sample: T) { - let mut data = self.data.lock().unwrap(); - data.0.push_back(sample); - if let Some(waker) = data.1.take() { - waker.wake(); - } - } -} - -impl Stream for AudioStream { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let s = self.get_mut(); - let mut data = s.data.lock().unwrap(); - if data.0.len() > 0 { - Poll::Ready(data.0.pop_front()) - } else { - data.1 = Some(cx.waker().clone()); - Poll::Pending - } - } } \ No newline at end of file -- cgit v1.2.1 From b35a9c0a48f3f853b2d0e1551d33682189d77055 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 11:27:04 +0100 Subject: remove unused imports --- mumd/src/audio.rs | 1 - mumd/src/audio/input.rs | 6 ------ 2 files changed, 7 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 9c3c143..4b0cb47 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -20,7 +20,6 @@ use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; use std::pin::Pin; -use tokio_stream::StreamExt; use std::future::{Future}; //TODO? move to mumlib diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 517c9ce..9ea82e0 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -1,11 +1,5 @@ use cpal::{InputCallbackInfo, Sample}; -use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; use tokio::sync::watch; -use futures::Stream; -use std::pin::Pin; -use std::task::{Context, Poll}; -use futures_util::task::Waker; pub fn callback( mut input_sender: futures::channel::mpsc::Sender, -- cgit v1.2.1 From 6e07c2bc4bba206e15bbe8838a322a5c506be9a1 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 15:05:52 +0100 Subject: remove deps and make noise gate more efficent --- Cargo.lock | 8 ----- mumd/Cargo.toml | 2 -- mumd/src/audio.rs | 95 ++++++++++++++++++++++++------------------------- mumd/src/audio/input.rs | 3 +- 4 files changed, 49 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 25591ca..edf9c90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -52,12 +52,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "argparse" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f8ebf5827e4ac4fd5946560e6a99776ea73b596d80898f357007317a7141e47" - [[package]] name = "ascii" version = "0.9.3" @@ -1195,8 +1189,6 @@ dependencies = [ name = "mumd" version = "0.3.0" dependencies = [ - "argparse", - "async-stream", "bytes", "cpal", "dasp_frame", diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml index d12cec2..d13bdc8 100644 --- a/mumd/Cargo.toml +++ b/mumd/Cargo.toml @@ -18,8 +18,6 @@ notifications = ["libnotify"] [dependencies] mumlib = { version = "0.3", path = "../mumlib" } -argparse = "0.2" -async-stream = "0.3.0" cpal = "0.13" bytes = "1.0" dasp_interpolate = { version = "0.11", features = ["linear"] } diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 4b0cb47..d4ef4d6 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -16,11 +16,11 @@ use std::sync::{Arc, Mutex}; use tokio::sync::watch; use dasp_frame::Frame; use dasp_sample::{SignedSample, ToSample, Sample}; -use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; use std::pin::Pin; use std::future::{Future}; +use std::fmt::Debug; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -203,8 +203,11 @@ impl Audio { 4, input_config.sample_rate.0, input_config.channels as usize, - StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver))); //TODO attach a noise gate - //TODO group frames correctly + StreamingSignalExt::into_interleaved_samples( + StreamingNoiseGate::new( + from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly + 0.09, + 10_000))); output_stream.play().unwrap(); @@ -348,25 +351,23 @@ impl Audio { } struct NoiseGate { - open: bool, + open: usize, signal: S, - buffer: dasp_ring_buffer::Fixed>, activate_threshold: <::Sample as Sample>::Float, - deactivate_threshold: <::Sample as Sample>::Float, + deactivation_delay: usize, } impl NoiseGate { pub fn new( signal: S, activate_threshold: <::Sample as Sample>::Float, - deactivate_threshold: <::Sample as Sample>::Float + deactivation_delay: usize, ) -> NoiseGate { Self { - open: false, + open: 0, signal, - buffer: Fixed::from(vec![::EQUILIBRIUM; 4096]), activate_threshold, - deactivate_threshold, + deactivation_delay } } } @@ -376,26 +377,26 @@ impl Signal for NoiseGate { fn next(&mut self) -> Self::Frame { let frame = self.signal.next(); - self.buffer.push(frame); - if self.open && self.buffer - .iter() - .all(|f| f.to_float_frame() - .channels() - .all(|s| abs(s - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) <= self.deactivate_threshold)) { - self.open = false; - } else if !self.open && self.buffer - .iter() - .any(|f| f.to_float_frame() - .channels() - .any(|s| abs(s - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) >= self.activate_threshold)) { - self.open = true; + match self.open { + 0 => { + if frame.to_float_frame().channels().any(|e| abs(e) >= self.activate_threshold) { + self.open = self.deactivation_delay; + } + } + _ => { + if frame.to_float_frame().channels().any(|e| abs(e) >= self.activate_threshold) { + self.open = self.deactivation_delay; + } else { + self.open -= 1; + } + } } - if self.open { + if self.open != 0 { frame } else { - S::Frame::EQUILIBRIUM + ::EQUILIBRIUM } } @@ -405,25 +406,23 @@ impl Signal for NoiseGate { } struct StreamingNoiseGate { - open: bool, + open: usize, signal: S, - buffer: dasp_ring_buffer::Fixed>, activate_threshold: <::Sample as Sample>::Float, - deactivate_threshold: <::Sample as Sample>::Float, + deactivation_delay: usize, } impl StreamingNoiseGate { pub fn new( signal: S, activate_threshold: <::Sample as Sample>::Float, - deactivate_threshold: <::Sample as Sample>::Float + deactivation_delay: usize, ) -> StreamingNoiseGate { Self { - open: false, + open: 0, signal, - buffer: Fixed::from(vec![::EQUILIBRIUM; 4096]), activate_threshold, - deactivate_threshold, + deactivation_delay } } } @@ -442,26 +441,26 @@ impl StreamingSignal for StreamingNoiseGate Poll::Ready(v) => v, Poll::Pending => return Poll::Pending, }; - s.buffer.push(frame); - if s.open && s.buffer - .iter() - .all(|f| f.to_float_frame() - .channels() - .all(|sample| abs(sample - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) <= s.deactivate_threshold)) { - s.open = false; - } else if !s.open && s.buffer - .iter() - .any(|f| f.to_float_frame() - .channels() - .any(|sample| abs(sample - <<::Sample as Sample>::Float as Sample>::EQUILIBRIUM) >= s.activate_threshold)) { - s.open = true; + match s.open { + 0 => { + if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) { + s.open = s.deactivation_delay; + } + } + _ => { + if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) { + s.open = s.deactivation_delay; + } else { + s.open -= 1; + } + } } - if s.open { + if s.open != 0 { Poll::Ready(frame) } else { - Poll::Ready(S::Frame::EQUILIBRIUM) + Poll::Ready(::EQUILIBRIUM) } } @@ -567,7 +566,7 @@ struct FromStream { underlying_exhausted: bool, } -fn from_stream(mut stream: S) -> FromStream +fn from_stream(stream: S) -> FromStream where S: Stream + Unpin, S::Item: Frame { diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 9ea82e0..da6dc61 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -1,5 +1,6 @@ use cpal::{InputCallbackInfo, Sample}; use tokio::sync::watch; +use log::*; pub fn callback( mut input_sender: futures::channel::mpsc::Sender, @@ -12,7 +13,7 @@ pub fn callback( .map(|e| e.to_f32()) .map(|e| e * input_volume) { if let Err(_e) = input_sender.try_send(sample) { - // warn!("Error sending audio: {}", e) + warn!("Error sending audio: {}", _e); } } } -- cgit v1.2.1 From 33a2da5d267d953fdc91988bc14ce9b0710b3d59 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 15:34:31 +0100 Subject: make opus encoder only yield frame when there is data to send --- mumd/src/audio.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index d4ef4d6..6d69e36 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -707,17 +707,23 @@ impl Stream for OpusEncoder return Poll::Ready(None); } let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize; - while s.input_buffer.len() < opus_frame_size { - match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(Some(v)) => { - s.input_buffer.push(v.to_sample::()); - } - Poll::Ready(None) => { - s.exhausted = true; - return Poll::Ready(None); + loop { + while s.input_buffer.len() < opus_frame_size { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.input_buffer.push(v.to_sample::()); + } + Poll::Ready(None) => { + s.exhausted = true; + return Poll::Ready(None); + } + Poll::Pending => return Poll::Pending, } - Poll::Pending => return Poll::Pending, } + if s.input_buffer.iter().any(|&e| e != 0.0) { + break; + } + s.input_buffer.clear(); } let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap(); -- cgit v1.2.1 From 866d57df5d72f8b4c29072783b47554e260e0a8c Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 15:38:21 +0100 Subject: update changelog --- CHANGELOG | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index d7d0f68..bf100e8 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -15,8 +15,10 @@ a PR. Unreleased ---------- -// Added -// ~~~~~ +Added +~~~~~ + +* Added a noise gate // Changed // ~~~~~~~ @@ -29,6 +31,7 @@ Fixed * Informative error message instead of panic when a running mumd-process can't be found. +* Client no longer sends empty audio packets // Other // ~~~~~ -- cgit v1.2.1 From e566dbb547ffa2ebea9cd7c529d9bf5edb920f9a Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sun, 3 Jan 2021 17:16:38 +0100 Subject: remove dead code --- mumd/src/audio.rs | 62 ------------------------------------------------------- 1 file changed, 62 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 6d69e36..d7b2060 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -350,61 +350,6 @@ impl Audio { } } -struct NoiseGate { - open: usize, - signal: S, - activate_threshold: <::Sample as Sample>::Float, - deactivation_delay: usize, -} - -impl NoiseGate { - pub fn new( - signal: S, - activate_threshold: <::Sample as Sample>::Float, - deactivation_delay: usize, - ) -> NoiseGate { - Self { - open: 0, - signal, - activate_threshold, - deactivation_delay - } - } -} - -impl Signal for NoiseGate { - type Frame = S::Frame; - - fn next(&mut self) -> Self::Frame { - let frame = self.signal.next(); - - match self.open { - 0 => { - if frame.to_float_frame().channels().any(|e| abs(e) >= self.activate_threshold) { - self.open = self.deactivation_delay; - } - } - _ => { - if frame.to_float_frame().channels().any(|e| abs(e) >= self.activate_threshold) { - self.open = self.deactivation_delay; - } else { - self.open -= 1; - } - } - } - - if self.open != 0 { - frame - } else { - ::EQUILIBRIUM - } - } - - fn is_exhausted(&self) -> bool { - self.signal.is_exhausted() - } -} - struct StreamingNoiseGate { open: usize, signal: S, @@ -566,13 +511,6 @@ struct FromStream { underlying_exhausted: bool, } -fn from_stream(stream: S) -> FromStream - where - S: Stream + Unpin, - S::Item: Frame { - FromStream { stream, underlying_exhausted: false } -} - impl StreamingSignal for FromStream where S: Stream + Unpin, -- cgit v1.2.1 From 49f4b7a7158f768e5bd04047b44b759f84529036 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Mon, 4 Jan 2021 15:47:48 +0100 Subject: bump mumble-protocol to 0.4.1 --- Cargo.lock | 4 ++-- mumd/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index edf9c90..904fe44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1162,9 +1162,9 @@ dependencies = [ [[package]] name = "mumble-protocol" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a853f0a6938b65b17fbdaf6ee8a2dcd5e6d12c27f38c93ecd5a647a5c4357e8" +checksum = "4071096f63c6e9853c32825096af49caf6667979ff67a765ba23dffcf3c77160" dependencies = [ "byteorder", "bytes", diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml index d13bdc8..0f5edb1 100644 --- a/mumd/Cargo.toml +++ b/mumd/Cargo.toml @@ -30,7 +30,7 @@ futures-util = "0.3" hound = "3.4" ipc-channel = "0.14" log = "0.4" -mumble-protocol = "0.4.0" +mumble-protocol = "0.4.1" native-tls = "0.2" openssl = { version = "0.10" } opus = "0.2" -- cgit v1.2.1 From 0179caa3f696ab88710b86943bc697fa1c6cf158 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Mon, 4 Jan 2021 17:22:07 +0100 Subject: change audio stream to send packages --- mumd/src/audio.rs | 37 +++++++++++++++++++++++-------------- mumd/src/network/udp.rs | 19 +++++-------------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index d7b2060..bf3f7f4 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -5,22 +5,24 @@ use crate::audio::output::SaturatingAdd; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{SampleFormat, SampleRate, StreamConfig}; +use dasp_frame::Frame; use dasp_interpolate::linear::Linear; +use dasp_sample::{SignedSample, ToSample, Sample}; use dasp_signal::{self as signal, Signal}; +use futures::Stream; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; use log::*; -use mumble_protocol::voice::VoicePacketPayload; +use mumble_protocol::voice::{VoicePacketPayload, VoicePacket}; use opus::Channels; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; +use std::fmt::Debug; +use std::future::{Future}; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use tokio::sync::watch; -use dasp_frame::Frame; -use dasp_sample::{SignedSample, ToSample, Sample}; -use futures::Stream; -use futures::task::{Context, Poll}; -use std::pin::Pin; -use std::future::{Future}; -use std::fmt::Debug; +use mumble_protocol::Serverbound; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -72,7 +74,7 @@ pub struct Audio { _output_stream: cpal::Stream, _input_stream: cpal::Stream, - input_channel_receiver: Option> + Unpin>>, + input_channel_receiver: Arc> + Unpin>>>, input_volume_sender: watch::Sender, output_volume_sender: watch::Sender, @@ -207,7 +209,14 @@ impl Audio { StreamingNoiseGate::new( from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly 0.09, - 10_000))); + 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: i as u64, + payload: VoicePacketPayload::Opus(e.into(), false), + position_info: None, + }); output_stream.play().unwrap(); @@ -248,7 +257,7 @@ impl Audio { _output_stream: output_stream, _input_stream: input_stream, input_volume_sender, - input_channel_receiver: Some(Box::new(opus_stream)), + input_channel_receiver: Arc::new(Mutex::new(Box::new(opus_stream))), client_streams, sounds, output_volume_sender, @@ -298,8 +307,8 @@ impl Audio { } } - pub fn take_receiver(&mut self) -> Option> + Unpin>> { - self.input_channel_receiver.take() + pub fn take_receiver(&mut self) -> Arc> + Unpin>>> { + Arc::clone(&self.input_channel_receiver) } pub fn clear_clients(&mut self) { @@ -668,4 +677,4 @@ impl Stream for OpusEncoder s.input_buffer.clear(); Poll::Ready(Some(encoded)) } -} \ No newline at end of file +} diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index d412d55..4dde268 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -26,7 +26,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) { - let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); + let receiver = state.lock().unwrap().audio_mut().take_receiver(); loop { let connection_info = 'data: loop { @@ -48,13 +48,14 @@ pub async fn handle( let source = Arc::new(Mutex::new(source)); let phase_watcher = state.lock().unwrap().phase_receiver(); + let mut audio_receiver_lock = receiver.lock().unwrap(); join!( listen(Arc::clone(&state), Arc::clone(&source), phase_watcher.clone()), send_voice( Arc::clone(&sink), connection_info.socket_addr, phase_watcher, - &mut *receiver + &mut *audio_receiver_lock ), new_crypt_state(&mut crypt_state_receiver, sink, source) ); @@ -198,7 +199,7 @@ async fn send_voice( sink: Arc>, server_addr: SocketAddr, mut phase_watcher: watch::Receiver, - receiver: &mut (dyn Stream> + Unpin), + receiver: &mut (dyn Stream> + Unpin), ) { pin_mut!(receiver); let (tx, rx) = oneshot::channel(); @@ -215,7 +216,6 @@ async fn send_voice( let main_block = async { let rx = rx.fuse(); pin_mut!(rx); - let mut count = 0; loop { let packet_recv = receiver.next().fuse(); pin_mut!(packet_recv); @@ -231,16 +231,7 @@ async fn send_voice( warn!("Channel closed before disconnect command"); break; } - Some(Some(payload)) => { - let reply = VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: count, - payload: VoicePacketPayload::Opus(payload.into(), false), - position_info: None, - }; - count += 1; + Some(Some(reply)) => { sink.lock() .unwrap() .send((reply, server_addr)) -- cgit v1.2.1 From 1af9b90133a8d6102a09102bbd6f726f598c24fc Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Mon, 4 Jan 2021 21:55:33 +0100 Subject: re-add newline --- mumd/src/audio/input.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index da6dc61..deb0fb8 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -17,4 +17,4 @@ pub fn callback( } } } -} \ No newline at end of file +} -- cgit v1.2.1