From b3339294dac5a7f448de8c3849ab198afff8d14b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Sat, 10 Oct 2020 22:25:05 +0200 Subject: handle multiple incoming audio streams Co-authored-by: Eskil Queseth --- mumd/src/audio.rs | 127 +++++++++++++++++++++++++++++++++++++++++++++++----- mumd/src/main.rs | 10 ++++- mumd/src/network.rs | 56 +++++++---------------- 3 files changed, 140 insertions(+), 53 deletions(-) diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index c6f30fb..747716c 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -1,20 +1,29 @@ use cpal::traits::DeviceTrait; use cpal::traits::HostTrait; use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig}; +use mumble_protocol::voice::VoicePacketPayload; +use opus::Channels; +use std::collections::HashMap; use std::collections::VecDeque; +use std::collections::hash_map::Entry; +use std::ops::AddAssign; use std::sync::Arc; use std::sync::Mutex; +struct ClientStream { + buffer: VecDeque, //TODO ring buffer? + opus_decoder: opus::Decoder, +} + pub struct Audio { - pub output_buffer: Arc>>, //TODO ring buffer? pub output_config: StreamConfig, pub output_stream: Stream, + + client_streams: Arc>>, } impl Audio { pub fn new() -> Self { - let output_buffer = Arc::new(Mutex::new(VecDeque::new())); - let host = cpal::default_host(); let device = host .default_output_device() @@ -31,35 +40,129 @@ impl Audio { let err_fn = |err| eprintln!("an error occurred on the output audio stream: {}", err); - let stream_audio_buf = Arc::clone(&output_buffer); + let client_streams = Arc::new(Mutex::new(HashMap::new())); + let output_client_streams = Arc::clone(&client_streams); + let stream = match supported_sample_format { SampleFormat::F32 => { - device.build_output_stream(&config, curry_callback::(stream_audio_buf), err_fn) + device.build_output_stream(&config, curry_callback::(output_client_streams), err_fn) } SampleFormat::I16 => { - device.build_output_stream(&config, curry_callback::(stream_audio_buf), err_fn) + device.build_output_stream(&config, curry_callback::(output_client_streams), err_fn) } SampleFormat::U16 => { - device.build_output_stream(&config, curry_callback::(stream_audio_buf), err_fn) + device.build_output_stream(&config, curry_callback::(output_client_streams), err_fn) } } .unwrap(); Self { - output_buffer, output_config: config, output_stream: stream, + client_streams, + } + } + + pub fn decode_packet(&self, session_id: u32, payload: VoicePacketPayload) { + match self.client_streams.lock().unwrap().entry(session_id) { + Entry::Occupied(mut entry) => { + entry.get_mut().decode_packet(payload, self.output_config.channels as usize); + } + Entry::Vacant(_) => { + eprintln!("cannot find session id {}", session_id); + } + } + } + + pub fn add_client(&self, session_id: u32) { + match self.client_streams.lock().unwrap().entry(session_id) { + Entry::Occupied(_) => { + eprintln!("session id {} already exists", session_id); + } + Entry::Vacant(entry) => { + entry.insert(ClientStream::new( + self.output_config.sample_rate.0, + self.output_config.channels + )); + } + } + } +} + +impl ClientStream { + fn new(sample_rate: u32, channels: u16) -> Self { + Self { + buffer: VecDeque::new(), + opus_decoder: opus::Decoder::new( + sample_rate, + match channels { + 1 => Channels::Mono, + 2 => Channels::Stereo, + _ => unimplemented!( + "only 1 or 2 supported, got {})", + channels + ), + }, + ).unwrap(), + } + } + + fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) { + match payload { + VoicePacketPayload::Opus(bytes, _eot) => { + let mut out: Vec = + vec![0.0; bytes.len() * channels * 4]; + self.opus_decoder + .decode_float(&bytes[..], &mut out, false) + .expect("error decoding"); + self.buffer.extend(out); + } + _ => { + unimplemented!("payload type not supported"); + } } } } -fn curry_callback( - buf: Arc>>, +trait SaturatingAdd { + fn saturating_add(self, rhs: Self) -> Self; +} + +impl SaturatingAdd for f32 { + fn saturating_add(self, rhs: Self) -> Self { + match self + rhs { + a if a < -1.0 => -1.0, + a if a > 1.0 => 1.0, + a => a, + } + } +} + +impl SaturatingAdd for i16 { + fn saturating_add(self, rhs: Self) -> Self { + i16::saturating_add(self, rhs) + } +} + +impl SaturatingAdd for u16 { + fn saturating_add(self, rhs: Self) -> Self { + u16::saturating_add(self, rhs) + } +} + +fn curry_callback( + buf: Arc>>, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { move |data: &mut [T], _info: &OutputCallbackInfo| { - let mut lock = buf.lock().unwrap(); for sample in data.iter_mut() { - *sample = Sample::from(&lock.pop_front().unwrap_or(0.0)); + *sample = Sample::from(&0.0); + } + + let mut lock = buf.lock().unwrap(); + for client_stream in lock.values_mut() { + for sample in data.iter_mut() { + *sample = sample.saturating_add(Sample::from(&client_stream.buffer.pop_front().unwrap_or(0.0))); + } } } } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 3960b48..4c3b67c 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -10,6 +10,8 @@ use futures::channel::oneshot; use futures::join; use mumble_protocol::crypt::ClientCryptState; use std::net::ToSocketAddrs; +use std::sync::Arc; +use std::sync::Mutex; #[tokio::main] async fn main() { @@ -47,6 +49,7 @@ async fn main() { let audio = Audio::new(); audio.output_stream.play().unwrap(); + let audio = Arc::new(Mutex::new(audio)); // Run it join!( @@ -56,7 +59,12 @@ async fn main() { username, accept_invalid_cert, crypt_state_sender, + Arc::clone(&audio), ), - network::handle_udp(server_addr, crypt_state_receiver, audio,) + network::handle_udp( + server_addr, + crypt_state_receiver, + audio, + ) ); } diff --git a/mumd/src/network.rs b/mumd/src/network.rs index 82b45da..a2be9ea 100644 --- a/mumd/src/network.rs +++ b/mumd/src/network.rs @@ -1,20 +1,18 @@ use crate::audio::Audio; use bytes::Bytes; -use mumble_protocol::voice::VoicePacketPayload; -use opus::Channels; - -use futures::channel::oneshot; -use futures::join; use futures::SinkExt; use futures::StreamExt; +use futures::channel::oneshot; +use futures::join; use futures_util::stream::{SplitSink, SplitStream}; -use mumble_protocol::control::msgs; use mumble_protocol::control::ClientControlCodec; use mumble_protocol::control::ControlCodec; use mumble_protocol::control::ControlPacket; +use mumble_protocol::control::msgs; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; +use mumble_protocol::voice::VoicePacketPayload; use mumble_protocol::{Clientbound, Serverbound}; use std::convert::Into; use std::convert::TryInto; @@ -107,6 +105,7 @@ async fn listen_tcp( sink: Arc>, mut stream: TcpReceiver, crypt_state_sender: oneshot::Sender, + audio: Arc>, ) { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); @@ -155,6 +154,10 @@ async fn listen_tcp( ControlPacket::Reject(msg) => { println!("Login rejected: {:?}", msg); } + ControlPacket::UserState(msg) => { + println!("Found user {}", msg.get_name()); + audio.lock().unwrap().add_client(msg.get_session()); + } _ => {} } } @@ -166,6 +169,7 @@ pub async fn handle_tcp( username: String, accept_invalid_cert: bool, crypt_state_sender: oneshot::Sender, + audio: Arc>, ) { let (sink, stream) = connect_tcp(server_addr, server_host, accept_invalid_cert).await; let sink = Arc::new(Mutex::new(sink)); @@ -177,15 +181,14 @@ pub async fn handle_tcp( join!( send_pings(Arc::clone(&sink), 10), - listen_tcp(sink, stream, crypt_state_sender), + listen_tcp(sink, stream, crypt_state_sender, audio), ); } async fn listen_udp( _sink: Arc>, mut source: UdpReceiver, - mut opus_decoder: opus::Decoder, - audio: Audio, + audio: Arc>, ) { while let Some(packet) = source.next().await { let (packet, _src_addr) = match packet { @@ -203,27 +206,13 @@ async fn listen_udp( continue; } VoicePacket::Audio { + session_id, // seq_num, payload, // position_info, .. } => { - match payload { - VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec = - vec![0.0; bytes.len() * audio.output_config.channels as usize * 4]; - opus_decoder - .decode_float(&bytes[..], &mut out, false) - .expect("error decoding"); - let mut lock = audio.output_buffer.lock().unwrap(); - lock.extend(out); - } - _ => { - unimplemented!("något fint"); - } - } - - // decode paylout and put it in buffer + audio.lock().unwrap().decode_packet(session_id, payload); // Got audio, naively echo it back //let reply = VoicePacket::Audio { @@ -259,21 +248,8 @@ async fn send_ping_udp(sink: &mut UdpSender, server_addr: SocketAddr) { pub async fn handle_udp( server_addr: SocketAddr, crypt_state: oneshot::Receiver, - audio: Audio, + audio: Arc>, ) { - let opus_decoder = opus::Decoder::new( - audio.output_config.sample_rate.0 as u32, - match audio.output_config.channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!( - "ljudnörd (got {} channels, need 1 or 2)", - audio.output_config.channels - ), - }, - ) - .unwrap(); - let (mut sink, source) = connect_udp(crypt_state).await; // Note: A normal application would also send periodic Ping packets, and its own audio @@ -281,5 +257,5 @@ pub async fn handle_udp( // dummy voice packet. send_ping_udp(&mut sink, server_addr).await; - listen_udp(Arc::new(Mutex::new(sink)), source, opus_decoder, audio).await; + listen_udp(Arc::new(Mutex::new(sink)), source, audio).await; } -- cgit v1.2.1