diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2020-10-11 01:30:48 +0200 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2020-10-11 01:30:48 +0200 |
| commit | d0a82ae5ef6b47c3b973d16d3be6b7d89bc2826f (patch) | |
| tree | e5041814fbf3549a20084be2ef4b1bf887bca169 | |
| parent | fd8a9f28b036315c0ade03a6df2999305807021d (diff) | |
| download | mum-d0a82ae5ef6b47c3b973d16d3be6b7d89bc2826f.tar.gz | |
working microphone stuff
Co-authored-by: Eskil Queseth <eskilq@kth.se>
| -rw-r--r-- | mumd/src/audio.rs | 174 | ||||
| -rw-r--r-- | mumd/src/main.rs | 6 | ||||
| -rw-r--r-- | mumd/src/network.rs | 49 |
3 files changed, 172 insertions, 57 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 2d504cf..8b8a08e 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -1,64 +1,131 @@ +use bytes::Bytes; use cpal::traits::DeviceTrait; use cpal::traits::HostTrait; -use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig}; +use cpal::{ + InputCallbackInfo, OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig, +}; use mumble_protocol::voice::VoicePacketPayload; use opus::Channels; +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::collections::VecDeque; -use std::collections::hash_map::Entry; use std::ops::AddAssign; use std::sync::Arc; use std::sync::Mutex; +use tokio::sync::mpsc::{self, Receiver, Sender}; struct ClientStream { buffer: VecDeque<f32>, //TODO ring buffer? opus_decoder: opus::Decoder, } +//TODO remove pub where possible pub struct Audio { pub output_config: StreamConfig, pub output_stream: Stream, + pub input_config: StreamConfig, + pub input_stream: Stream, + pub input_buffer: Arc<Mutex<VecDeque<f32>>>, + input_channel_receiver: Option<Receiver<VoicePacketPayload>>, + client_streams: Arc<Mutex<HashMap<u32, ClientStream>>>, } +//TODO split into input/output impl Audio { pub fn new() -> Self { let host = cpal::default_host(); - let device = host + let output_device = host .default_output_device() .expect("default output device not found"); - let mut supported_configs_range = device + let mut output_supported_configs_range = output_device .supported_output_configs() - .expect("error querying configs"); - let supported_config = supported_configs_range + .expect("error querying output configs"); + let output_supported_config = output_supported_configs_range + .next() + .expect("no supported output config??") + .with_sample_rate(SampleRate(48000)); + let output_supported_sample_format = output_supported_config.sample_format(); + let output_config: StreamConfig = output_supported_config.into(); + + let input_device = host + .default_input_device() + .expect("default input device not found"); + let mut input_supported_configs_range = input_device + .supported_input_configs() + .expect("error querying input configs"); + let input_supported_config = input_supported_configs_range .next() - .expect("no supported config??") + .expect("no supported input config??") .with_sample_rate(SampleRate(48000)); - let supported_sample_format = supported_config.sample_format(); - let config: StreamConfig = supported_config.into(); + let input_supported_sample_format = input_supported_config.sample_format(); + let input_config: StreamConfig = input_supported_config.into(); let err_fn = |err| eprintln!("an error occurred on the output audio stream: {}", err); let client_streams = Arc::new(Mutex::new(HashMap::new())); - let output_client_streams = Arc::clone(&client_streams); + let output_stream = match output_supported_sample_format { + SampleFormat::F32 => output_device.build_output_stream( + &output_config, + output_curry_callback::<f32>(Arc::clone(&client_streams)), + err_fn, + ), + SampleFormat::I16 => output_device.build_output_stream( + &output_config, + output_curry_callback::<i16>(Arc::clone(&client_streams)), + err_fn, + ), + SampleFormat::U16 => output_device.build_output_stream( + &output_config, + output_curry_callback::<u16>(Arc::clone(&client_streams)), + err_fn, + ), + } + .unwrap(); - let stream = match supported_sample_format { - SampleFormat::F32 => { - device.build_output_stream(&config, curry_callback::<f32>(output_client_streams), err_fn) - } - SampleFormat::I16 => { - device.build_output_stream(&config, curry_callback::<i16>(output_client_streams), err_fn) - } - SampleFormat::U16 => { - device.build_output_stream(&config, curry_callback::<u16>(output_client_streams), err_fn) - } + let input_encoder = opus::Encoder::new( + input_config.sample_rate.0, + match input_config.channels { + 1 => Channels::Mono, + 2 => Channels::Stereo, + _ => unimplemented!( + "only 1 or 2 channels supported, got {})", + input_config.channels + ), + }, + opus::Application::Voip, + ) + .unwrap(); + let (input_sender, input_receiver) = mpsc::channel(100); + + let input_buffer = Arc::new(Mutex::new(VecDeque::new())); + let input_stream = match input_supported_sample_format { + SampleFormat::F32 => input_device.build_input_stream( + &input_config, + input_callback::<f32>(input_encoder, input_sender), + err_fn, + ), + SampleFormat::I16 => input_device.build_input_stream( + &input_config, + input_callback::<i16>(input_encoder, input_sender), + err_fn, + ), + SampleFormat::U16 => input_device.build_input_stream( + &input_config, + input_callback::<u16>(input_encoder, input_sender), + err_fn, + ), } .unwrap(); Self { - output_config: config, - output_stream: stream, + output_config, + output_stream, + input_config, + input_stream, + input_buffer, + input_channel_receiver: Some(input_receiver), client_streams, } } @@ -66,7 +133,9 @@ impl Audio { pub fn decode_packet(&self, session_id: u32, payload: VoicePacketPayload) { match self.client_streams.lock().unwrap().entry(session_id) { Entry::Occupied(mut entry) => { - entry.get_mut().decode_packet(payload, self.output_config.channels as usize); + entry + .get_mut() + .decode_packet(payload, self.output_config.channels as usize); } Entry::Vacant(_) => { eprintln!("cannot find session id {}", session_id); @@ -82,7 +151,7 @@ impl Audio { Entry::Vacant(entry) => { entry.insert(ClientStream::new( self.output_config.sample_rate.0, - self.output_config.channels + self.output_config.channels, )); } } @@ -94,10 +163,17 @@ impl Audio { entry.remove(); } Entry::Vacant(_) => { - eprintln!("tried to remove session id {} that doesn't exist", session_id); + eprintln!( + "tried to remove session id {} that doesn't exist", + session_id + ); } } } + + pub fn take_receiver(&mut self) -> Option<Receiver<VoicePacketPayload>> { + self.input_channel_receiver.take() + } } impl ClientStream { @@ -109,23 +185,20 @@ impl ClientStream { match channels { 1 => Channels::Mono, 2 => Channels::Stereo, - _ => unimplemented!( - "only 1 or 2 supported, got {})", - channels - ), + _ => unimplemented!("only 1 or 2 channels supported, got {})", channels), }, - ).unwrap(), + ) + .unwrap(), } } fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec<f32> = - vec![0.0; bytes.len() * channels * 4]; + let mut out: Vec<f32> = vec![0.0; bytes.len() * channels * 4]; self.opus_decoder - .decode_float(&bytes[..], &mut out, false) - .expect("error decoding"); + .decode_float(&bytes, &mut out, false) + .expect("error decoding"); //FIXME sometimes panics here self.buffer.extend(out); } _ => { @@ -143,7 +216,7 @@ impl SaturatingAdd for f32 { fn saturating_add(self, rhs: Self) -> Self { match self + rhs { a if a < -1.0 => -1.0, - a if a > 1.0 => 1.0, + a if a > 1.0 => 1.0, a => a, } } @@ -161,7 +234,7 @@ impl SaturatingAdd for u16 { } } -fn curry_callback<T: Sample + AddAssign + SaturatingAdd>( +fn output_curry_callback<T: Sample + AddAssign + SaturatingAdd>( buf: Arc<Mutex<HashMap<u32, ClientStream>>>, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { move |data: &mut [T], _info: &OutputCallbackInfo| { @@ -172,8 +245,35 @@ fn curry_callback<T: Sample + AddAssign + SaturatingAdd>( let mut lock = buf.lock().unwrap(); for client_stream in lock.values_mut() { for sample in data.iter_mut() { - *sample = sample.saturating_add(Sample::from(&client_stream.buffer.pop_front().unwrap_or(0.0))); + *sample = sample.saturating_add(Sample::from( + &client_stream.buffer.pop_front().unwrap_or(0.0), + )); } } } } + +fn input_callback<T: Sample>( + mut opus_encoder: opus::Encoder, + mut input_sender: Sender<VoicePacketPayload>, +) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { + let buf = Arc::new(Mutex::new(VecDeque::new())); + move |data: &[T], _info: &InputCallbackInfo| { + let mut buf = buf.lock().unwrap(); + let out: Vec<f32> = data.iter().map(|e| e.to_f32()).collect(); + buf.extend(out); + while buf.len() >= 2880 { + let tail = buf.split_off(2880); + let mut opus_buf: Vec<u8> = vec![0; 100_000]; + let result = opus_encoder + .encode_float(&Vec::from(buf.clone()), &mut opus_buf) + .unwrap(); + opus_buf.truncate(result); + let bytes = Bytes::copy_from_slice(&opus_buf); + input_sender + .try_send(VoicePacketPayload::Opus(bytes, false)) + .unwrap(); //TODO handle full buffer / disconnect + *buf = tail; + } + } +} diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 4c3b67c..1608947 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -61,10 +61,6 @@ async fn main() { crypt_state_sender, Arc::clone(&audio), ), - network::handle_udp( - server_addr, - crypt_state_receiver, - audio, - ) + network::handle_udp(server_addr, crypt_state_receiver, audio,), ); } diff --git a/mumd/src/network.rs b/mumd/src/network.rs index c59754d..875463c 100644 --- a/mumd/src/network.rs +++ b/mumd/src/network.rs @@ -1,15 +1,15 @@ use crate::audio::Audio; use bytes::Bytes; -use futures::SinkExt; -use futures::StreamExt; use futures::channel::oneshot; use futures::join; +use futures::SinkExt; +use futures::StreamExt; use futures_util::stream::{SplitSink, SplitStream}; +use mumble_protocol::control::msgs; use mumble_protocol::control::ClientControlCodec; use mumble_protocol::control::ControlCodec; use mumble_protocol::control::ControlPacket; -use mumble_protocol::control::msgs; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; use mumble_protocol::voice::VoicePacketPayload; @@ -217,17 +217,6 @@ async fn listen_udp( .. } => { audio.lock().unwrap().decode_packet(session_id, payload); - - // Got audio, naively echo it back - //let reply = VoicePacket::Audio { - // _dst: std::marker::PhantomData, - // target: 0, // normal speech - // session_id: (), // unused for server-bound packets - // seq_num, - // payload, - // position_info, - //}; - //sink.send((reply, src_addr)).await.unwrap(); } } } @@ -249,6 +238,32 @@ async fn send_ping_udp(sink: &mut UdpSender, server_addr: SocketAddr) { .unwrap(); } +async fn send_voice_udp( + sink: Arc<Mutex<UdpSender>>, + server_addr: SocketAddr, + audio: Arc<Mutex<Audio>>, +) { + let mut receiver = audio.lock().unwrap().take_receiver().unwrap(); + + let mut count = 0; + while let Some(payload) = receiver.recv().await { + let reply = VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: count, + payload, + position_info: None, + }; + count += 1; + sink.lock() + .unwrap() + .send((reply, server_addr)) + .await + .unwrap(); + } +} + pub async fn handle_udp( server_addr: SocketAddr, crypt_state: oneshot::Receiver<ClientCryptState>, @@ -261,5 +276,9 @@ pub async fn handle_udp( // dummy voice packet. send_ping_udp(&mut sink, server_addr).await; - listen_udp(Arc::new(Mutex::new(sink)), source, audio).await; + let sink = Arc::new(Mutex::new(sink)); + join!( + listen_udp(Arc::clone(&sink), source, Arc::clone(&audio)), + send_voice_udp(sink, server_addr, audio) + ); } |
