diff options
Diffstat (limited to 'mumd/src')
| -rw-r--r-- | mumd/src/audio.rs | 396 | ||||
| -rw-r--r-- | mumd/src/audio/input.rs | 48 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 24 |
3 files changed, 375 insertions, 93 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 0820147..83818d5 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -4,16 +4,25 @@ pub mod output; use crate::audio::output::SaturatingAdd; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{SampleFormat, SampleRate, Stream, StreamConfig}; +use cpal::{SampleFormat, SampleRate, StreamConfig}; +use dasp_frame::Frame; use dasp_interpolate::linear::Linear; +use dasp_sample::{SignedSample, ToSample, Sample}; use dasp_signal::{self as signal, Signal}; +use futures::Stream; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; use log::*; -use mumble_protocol::voice::VoicePacketPayload; +use mumble_protocol::voice::{VoicePacketPayload, VoicePacket}; use opus::Channels; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; +use std::fmt::Debug; +use std::future::{Future}; +use std::pin::Pin; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; +use mumble_protocol::Serverbound; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&[u8], NotificationEvents)] = &[ @@ -62,10 +71,10 @@ pub enum NotificationEvents { pub struct Audio { output_config: StreamConfig, - _output_stream: Stream, - _input_stream: Stream, + _output_stream: cpal::Stream, + _input_stream: cpal::Stream, - input_channel_receiver: Option<mpsc::Receiver<VoicePacketPayload>>, + input_channel_receiver: Arc<Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>, input_volume_sender: watch::Sender<f32>, output_volume_sender: watch::Sender<f32>, @@ -160,20 +169,7 @@ impl Audio { } .unwrap(); - let input_encoder = opus::Encoder::new( - input_config.sample_rate.0, - match input_config.channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!( - "Only 1 or 2 channels supported, got {})", - input_config.channels - ), - }, - opus::Application::Voip, - ) - .unwrap(); - let (input_sender, input_receiver) = mpsc::channel(100); + let (sample_sender, sample_receiver) = futures::channel::mpsc::channel(1_000_000); let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume); @@ -181,39 +177,47 @@ impl Audio { SampleFormat::F32 => input_device.build_input_stream( &input_config, input::callback::<f32>( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), SampleFormat::I16 => input_device.build_input_stream( &input_config, input::callback::<i16>( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), SampleFormat::U16 => input_device.build_input_stream( &input_config, input::callback::<u16>( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), } .unwrap(); + let opus_stream = OpusEncoder::new( + 4, + input_config.sample_rate.0, + input_config.channels as usize, + StreamingSignalExt::into_interleaved_samples( + StreamingNoiseGate::new( + from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly + 0.09, + 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: i as u64, + payload: VoicePacketPayload::Opus(e.into(), false), + position_info: None, + }); + output_stream.play().unwrap(); let sounds = EVENT_SOUNDS @@ -237,7 +241,7 @@ impl Audio { _ => unimplemented!() // TODO handle gracefully (this might not even happen) }; let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter); - let interp = Linear::new(signal.next(), signal.next()); + let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal)); let samples = signal .from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64) .until_exhausted() @@ -253,7 +257,7 @@ impl Audio { _output_stream: output_stream, _input_stream: input_stream, input_volume_sender, - input_channel_receiver: Some(input_receiver), + input_channel_receiver: Arc::new(Mutex::new(Box::new(opus_stream))), client_streams, sounds, output_volume_sender, @@ -303,8 +307,8 @@ impl Audio { } } - pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<VoicePacketPayload>> { - self.input_channel_receiver.take() + pub fn take_receiver(&mut self) -> Arc<Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> { + Arc::clone(&self.input_channel_receiver) } pub fn clear_clients(&mut self) { @@ -354,3 +358,323 @@ impl Audio { play_sounds.extend(samples.iter().skip(l)); } } + +struct StreamingNoiseGate<S: StreamingSignal> { + open: usize, + signal: S, + activate_threshold: <<S::Frame as Frame>::Sample as Sample>::Float, + deactivation_delay: usize, +} + +impl<S: StreamingSignal> StreamingNoiseGate<S> { + pub fn new( + signal: S, + activate_threshold: <<S::Frame as Frame>::Sample as Sample>::Float, + deactivation_delay: usize, + ) -> StreamingNoiseGate<S> { + Self { + open: 0, + signal, + activate_threshold, + deactivation_delay + } + } +} + +impl<S> StreamingSignal for StreamingNoiseGate<S> + where + S: StreamingSignal + Unpin, + <<<S as StreamingSignal>::Frame as Frame>::Sample as Sample>::Float: Unpin, + <S as StreamingSignal>::Frame: Unpin { + type Frame = S::Frame; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> { + let s = self.get_mut(); + + let frame = match S::poll_next(Pin::new(&mut s.signal), cx) { + Poll::Ready(v) => v, + Poll::Pending => return Poll::Pending, + }; + + match s.open { + 0 => { + if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) { + s.open = s.deactivation_delay; + } + } + _ => { + if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) { + s.open = s.deactivation_delay; + } else { + s.open -= 1; + } + } + } + + if s.open != 0 { + Poll::Ready(frame) + } else { + Poll::Ready(<S::Frame as Frame>::EQUILIBRIUM) + } + } + + fn is_exhausted(&self) -> bool { + self.signal.is_exhausted() + } +} + +fn abs<S: SignedSample>(sample: S) -> S { + let zero = S::EQUILIBRIUM; + if sample >= zero { + sample + } else { + -sample + } +} + +trait StreamingSignal { + type Frame: Frame; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame>; + + fn is_exhausted(&self) -> bool { + false + } +} + +impl<S> StreamingSignal for S + where + S: Signal + Unpin { + type Frame = S::Frame; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Frame> { + Poll::Ready(self.get_mut().next()) + } +} + +trait StreamingSignalExt: StreamingSignal { + fn next(&mut self) -> Next<'_, Self> { + Next { + stream: self + } + } + + fn into_interleaved_samples(self) -> IntoInterleavedSamples<Self> + where + Self: Sized { + IntoInterleavedSamples { signal: self, current_frame: None } + } +} + +impl<S> StreamingSignalExt for S + where S: StreamingSignal {} + +struct Next<'a, S: ?Sized> { + stream: &'a mut S +} + +impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> { + type Output = S::Frame; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match S::poll_next(Pin::new(self.stream), cx) { + Poll::Ready(val) => { + Poll::Ready(val) + } + Poll::Pending => Poll::Pending + } + } +} + +struct IntoInterleavedSamples<S: StreamingSignal> { + signal: S, + current_frame: Option<<S::Frame as Frame>::Channels>, +} + +impl<S> Stream for IntoInterleavedSamples<S> + where + S: StreamingSignal + Unpin, + <<S as StreamingSignal>::Frame as Frame>::Channels: Unpin { + type Item = <S::Frame as Frame>::Sample; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let s = self.get_mut(); + loop { + if s.current_frame.is_some() { + if let Some(channel) = s.current_frame.as_mut().unwrap().next() { + return Poll::Ready(Some(channel)); + } + } + match S::poll_next(Pin::new(&mut s.signal), cx) { + Poll::Ready(val) => { + s.current_frame = Some(val.channels()); + } + Poll::Pending => return Poll::Pending, + } + } + } +} + +struct FromStream<S> { + stream: S, + underlying_exhausted: bool, +} + +impl<S> StreamingSignal for FromStream<S> + where + S: Stream + Unpin, + S::Item: Frame + Unpin { + type Frame = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> { + let s = self.get_mut(); + if s.underlying_exhausted { + return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM); + } + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(val)) => { + Poll::Ready(val) + } + Poll::Ready(None) => { + s.underlying_exhausted = true; + return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM); + } + Poll::Pending => Poll::Pending, + } + } + + fn is_exhausted(&self) -> bool { + self.underlying_exhausted + } +} + + +struct FromInterleavedSamplesStream<S, F> + where + F: Frame { + stream: S, + next_buf: Vec<F::Sample>, + underlying_exhausted: bool, +} + +fn from_interleaved_samples_stream<S, F>(stream: S) -> FromInterleavedSamplesStream<S, F> + where + S: Stream + Unpin, + S::Item: Sample, + F: Frame<Sample = S::Item> { + FromInterleavedSamplesStream { + stream, + next_buf: Vec::new(), + underlying_exhausted: false, + } +} + +impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F> + where + S: Stream + Unpin, + S::Item: Sample + Unpin, + F: Frame<Sample = S::Item> + Unpin { + type Frame = F; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> { + let s = self.get_mut(); + if s.underlying_exhausted { + return Poll::Ready(F::EQUILIBRIUM); + } + while s.next_buf.len() < F::CHANNELS { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.next_buf.push(v); + } + Poll::Ready(None) => { + s.underlying_exhausted = true; + return Poll::Ready(F::EQUILIBRIUM); + } + Poll::Pending => return Poll::Pending, + } + } + + let mut data = s.next_buf.iter().cloned(); + let n = F::from_samples(&mut data).unwrap(); + s.next_buf.clear(); + Poll::Ready(n) + } + + fn is_exhausted(&self) -> bool { + self.underlying_exhausted + } +} + +struct OpusEncoder<S> { + encoder: opus::Encoder, + frame_size: u32, + sample_rate: u32, + stream: S, + input_buffer: Vec<f32>, + exhausted: bool, +} + +impl<S, I> OpusEncoder<S> + where + S: Stream<Item = I>, + I: ToSample<f32> { + fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self { + let encoder = opus::Encoder::new( + sample_rate, + match channels { + 1 => Channels::Mono, + 2 => Channels::Stereo, + _ => unimplemented!( + "Only 1 or 2 channels supported, got {})", + channels + ), + }, + opus::Application::Voip, + ).unwrap(); + Self { + encoder, + frame_size, + sample_rate, + stream, + input_buffer: Vec::new(), + exhausted: false, + } + } +} + +impl<S, I> Stream for OpusEncoder<S> + where + S: Stream<Item = I> + Unpin, + I: Sample + ToSample<f32> { + type Item = Vec<u8>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let s = self.get_mut(); + if s.exhausted { + return Poll::Ready(None); + } + let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize; + loop { + while s.input_buffer.len() < opus_frame_size { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.input_buffer.push(v.to_sample::<f32>()); + } + Poll::Ready(None) => { + s.exhausted = true; + return Poll::Ready(None); + } + Poll::Pending => return Poll::Pending, + } + } + if s.input_buffer.iter().any(|&e| e != 0.0) { + break; + } + s.input_buffer.clear(); + } + + let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap(); + s.input_buffer.clear(); + Poll::Ready(Some(encoded)) + } +} diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index fe0d21f..deb0fb8 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -1,54 +1,20 @@ -use bytes::Bytes; use cpal::{InputCallbackInfo, Sample}; -use mumble_protocol::voice::VoicePacketPayload; -use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; +use log::*; pub fn callback<T: Sample>( - mut opus_encoder: opus::Encoder, - input_sender: mpsc::Sender<VoicePacketPayload>, - sample_rate: u32, + mut input_sender: futures::channel::mpsc::Sender<f32>, input_volume_receiver: watch::Receiver<f32>, - opus_frame_size_blocks: u32, // blocks of 2.5ms ) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { - if !(opus_frame_size_blocks == 1 - || opus_frame_size_blocks == 2 - || opus_frame_size_blocks == 4 - || opus_frame_size_blocks == 8) - { - panic!( - "Unsupported amount of opus frame blocks {}", - opus_frame_size_blocks - ); - } - let opus_frame_size = opus_frame_size_blocks * sample_rate / 400; - - let buf = Arc::new(Mutex::new(VecDeque::new())); move |data: &[T], _info: &InputCallbackInfo| { - let mut buf = buf.lock().unwrap(); let input_volume = *input_volume_receiver.borrow(); - let out: Vec<f32> = data + for sample in data .iter() .map(|e| e.to_f32()) - .map(|e| e * input_volume) - .collect(); - buf.extend(out); - while buf.len() >= opus_frame_size as usize { - let tail = buf.split_off(opus_frame_size as usize); - let mut opus_buf: Vec<u8> = vec![0; opus_frame_size as usize]; - let result = opus_encoder - .encode_float(&Vec::from(buf.clone()), &mut opus_buf) - .unwrap(); - opus_buf.truncate(result); - let bytes = Bytes::copy_from_slice(&opus_buf); - match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) { - Ok(_) => {} - Err(_e) => { - //warn!("Error sending audio packet: {:?}", e); - } + .map(|e| e * input_volume) { + if let Err(_e) = input_sender.try_send(sample) { + warn!("Error sending audio: {}", _e); } - *buf = tail; } } } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index f7eeb62..0c00029 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -2,7 +2,7 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use bytes::Bytes; -use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt, Stream}; use futures_util::stream::{SplitSink, SplitStream}; use log::*; use mumble_protocol::crypt::ClientCryptState; @@ -28,7 +28,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>, ) { - let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); + let receiver = state.lock().unwrap().audio_mut().take_receiver(); loop { let connection_info = 'data: loop { @@ -50,13 +50,14 @@ pub async fn handle( let source = Arc::new(Mutex::new(source)); let phase_watcher = state.lock().unwrap().phase_receiver(); + let mut audio_receiver_lock = receiver.lock().unwrap(); join!( listen(Arc::clone(&state), Arc::clone(&source), phase_watcher.clone()), send_voice( Arc::clone(&sink), connection_info.socket_addr, phase_watcher, - &mut receiver + &mut *audio_receiver_lock ), new_crypt_state(&mut crypt_state_receiver, sink, source) ); @@ -197,8 +198,9 @@ async fn send_voice( sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr, mut phase_watcher: watch::Receiver<StatePhase>, - receiver: &mut mpsc::Receiver<VoicePacketPayload>, + receiver: &mut (dyn Stream<Item = VoicePacket<Serverbound>> + Unpin), ) { + pin_mut!(receiver); let (tx, rx) = oneshot::channel(); let phase_transition_block = async { loop { @@ -213,9 +215,8 @@ async fn send_voice( let main_block = async { let rx = rx.fuse(); pin_mut!(rx); - let mut count = 0; loop { - let packet_recv = receiver.recv().fuse(); + let packet_recv = receiver.next().fuse(); pin_mut!(packet_recv); let exitor = select! { data = packet_recv => Some(data), @@ -229,16 +230,7 @@ async fn send_voice( warn!("Channel closed before disconnect command"); break; } - Some(Some(payload)) => { - let reply = VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: count, - payload, - position_info: None, - }; - count += 1; + Some(Some(reply)) => { sink.lock() .unwrap() .send((reply, server_addr)) |
