diff options
Diffstat (limited to 'mumd/src')
| -rw-r--r-- | mumd/src/audio.rs | 166 | ||||
| -rw-r--r-- | mumd/src/audio/input.rs | 52 | ||||
| -rw-r--r-- | mumd/src/audio/output.rs | 90 |
3 files changed, 158 insertions, 150 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index bbde547..8609a91 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -1,34 +1,25 @@ -use bytes::Bytes; +pub mod input; +pub mod output; + use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{ - InputCallbackInfo, OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig, -}; +use cpal::{SampleFormat, SampleRate, Stream, StreamConfig}; use log::*; use mumble_protocol::voice::VoicePacketPayload; use opus::Channels; use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::collections::VecDeque; -use std::ops::AddAssign; -use std::sync::Arc; -use std::sync::Mutex; -use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::sync::watch; - -struct ClientStream { - buffer: VecDeque<f32>, //TODO ring buffer? - opus_decoder: opus::Decoder, -} +use std::sync::{Arc, Mutex}; +use tokio::sync::{mpsc, watch}; pub struct Audio { output_config: StreamConfig, _output_stream: Stream, _input_stream: Stream, - input_channel_receiver: Option<Receiver<VoicePacketPayload>>, + input_channel_receiver: Option<mpsc::Receiver<VoicePacketPayload>>, input_volume_sender: watch::Sender<f32>, - client_streams: Arc<Mutex<HashMap<u32, ClientStream>>>, + client_streams: Arc<Mutex<HashMap<u32, output::ClientStream>>>, } impl Audio { @@ -66,17 +57,17 @@ impl Audio { let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( &output_config, - output_curry_callback::<f32>(Arc::clone(&client_streams)), + output::curry_callback::<f32>(Arc::clone(&client_streams)), err_fn, ), SampleFormat::I16 => output_device.build_output_stream( &output_config, - output_curry_callback::<i16>(Arc::clone(&client_streams)), + output::curry_callback::<i16>(Arc::clone(&client_streams)), err_fn, ), SampleFormat::U16 => output_device.build_output_stream( &output_config, - output_curry_callback::<u16>(Arc::clone(&client_streams)), + output::curry_callback::<u16>(Arc::clone(&client_streams)), err_fn, ), } @@ -102,7 +93,7 @@ impl Audio { let input_stream = match input_supported_sample_format { SampleFormat::F32 => input_device.build_input_stream( &input_config, - input_callback::<f32>( + input::callback::<f32>( input_encoder, input_sender, input_config.sample_rate.0, @@ -113,7 +104,7 @@ impl Audio { ), SampleFormat::I16 => input_device.build_input_stream( &input_config, - input_callback::<i16>( + input::callback::<i16>( input_encoder, input_sender, input_config.sample_rate.0, @@ -124,7 +115,7 @@ impl Audio { ), SampleFormat::U16 => input_device.build_input_stream( &input_config, - input_callback::<u16>( + input::callback::<u16>( input_encoder, input_sender, input_config.sample_rate.0, @@ -167,7 +158,7 @@ impl Audio { warn!("Session id {} already exists", session_id); } Entry::Vacant(entry) => { - entry.insert(ClientStream::new( + entry.insert(output::ClientStream::new( self.output_config.sample_rate.0, self.output_config.channels, )); @@ -189,7 +180,7 @@ impl Audio { } } - pub fn take_receiver(&mut self) -> Option<Receiver<VoicePacketPayload>> { + pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<VoicePacketPayload>> { self.input_channel_receiver.take() } @@ -201,128 +192,3 @@ impl Audio { self.input_volume_sender.broadcast(input_volume).unwrap(); } } - -impl ClientStream { - fn new(sample_rate: u32, channels: u16) -> Self { - Self { - buffer: VecDeque::new(), - opus_decoder: opus::Decoder::new( - sample_rate, - match channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), - }, - ) - .unwrap(), - } - } - - fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) { - match payload { - VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode - let parsed = self - .opus_decoder - .decode_float(&bytes, &mut out, false) - .expect("Error decoding"); - out.truncate(parsed); - self.buffer.extend(out); - } - _ => { - unimplemented!("Payload type not supported"); - } - } - } -} - -trait SaturatingAdd { - fn saturating_add(self, rhs: Self) -> Self; -} - -impl SaturatingAdd for f32 { - fn saturating_add(self, rhs: Self) -> Self { - match self + rhs { - a if a < -1.0 => -1.0, - a if a > 1.0 => 1.0, - a => a, - } - } -} - -impl SaturatingAdd for i16 { - fn saturating_add(self, rhs: Self) -> Self { - i16::saturating_add(self, rhs) - } -} - -impl SaturatingAdd for u16 { - fn saturating_add(self, rhs: Self) -> Self { - u16::saturating_add(self, rhs) - } -} - -fn output_curry_callback<T: Sample + AddAssign + SaturatingAdd>( - buf: Arc<Mutex<HashMap<u32, ClientStream>>>, -) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { - move |data: &mut [T], _info: &OutputCallbackInfo| { - for sample in data.iter_mut() { - *sample = Sample::from(&0.0); - } - - let mut lock = buf.lock().unwrap(); - for client_stream in lock.values_mut() { - for sample in data.iter_mut() { - *sample = sample.saturating_add(Sample::from( - &client_stream.buffer.pop_front().unwrap_or(0.0), - )); - } - } - } -} - -fn input_callback<T: Sample>( - mut opus_encoder: opus::Encoder, - mut input_sender: Sender<VoicePacketPayload>, - sample_rate: u32, - input_volume_receiver: watch::Receiver<f32>, - opus_frame_size_blocks: u32, // blocks of 2.5ms -) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { - if !(opus_frame_size_blocks == 1 - || opus_frame_size_blocks == 2 - || opus_frame_size_blocks == 4 - || opus_frame_size_blocks == 8) - { - panic!( - "Unsupported amount of opus frame blocks {}", - opus_frame_size_blocks - ); - } - let opus_frame_size = opus_frame_size_blocks * sample_rate / 400; - - let buf = Arc::new(Mutex::new(VecDeque::new())); - move |data: &[T], _info: &InputCallbackInfo| { - let mut buf = buf.lock().unwrap(); - let input_volume = *input_volume_receiver.borrow(); - let out: Vec<f32> = data.iter().map(|e| e.to_f32()) - .map(|e| e * input_volume) - .collect(); - buf.extend(out); - while buf.len() >= opus_frame_size as usize { - let tail = buf.split_off(opus_frame_size as usize); - let mut opus_buf: Vec<u8> = vec![0; opus_frame_size as usize]; - let result = opus_encoder - .encode_float(&Vec::from(buf.clone()), &mut opus_buf) - .unwrap(); - opus_buf.truncate(result); - let bytes = Bytes::copy_from_slice(&opus_buf); - match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) { - Ok(_) => {} - Err(_e) => { - //warn!("Error sending audio packet: {:?}", e); - } - } - *buf = tail; - } - } -} diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs new file mode 100644 index 0000000..4e95360 --- /dev/null +++ b/mumd/src/audio/input.rs @@ -0,0 +1,52 @@ +use bytes::Bytes; +use cpal::{InputCallbackInfo, Sample}; +use mumble_protocol::voice::VoicePacketPayload; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use tokio::sync::{mpsc, watch}; + +pub fn callback<T: Sample>( + mut opus_encoder: opus::Encoder, + mut input_sender: mpsc::Sender<VoicePacketPayload>, + sample_rate: u32, + input_volume_receiver: watch::Receiver<f32>, + opus_frame_size_blocks: u32, // blocks of 2.5ms +) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { + if !(opus_frame_size_blocks == 1 + || opus_frame_size_blocks == 2 + || opus_frame_size_blocks == 4 + || opus_frame_size_blocks == 8) + { + panic!( + "Unsupported amount of opus frame blocks {}", + opus_frame_size_blocks + ); + } + let opus_frame_size = opus_frame_size_blocks * sample_rate / 400; + + let buf = Arc::new(Mutex::new(VecDeque::new())); + move |data: &[T], _info: &InputCallbackInfo| { + let mut buf = buf.lock().unwrap(); + let input_volume = *input_volume_receiver.borrow(); + let out: Vec<f32> = data.iter().map(|e| e.to_f32()) + .map(|e| e * input_volume) + .collect(); + buf.extend(out); + while buf.len() >= opus_frame_size as usize { + let tail = buf.split_off(opus_frame_size as usize); + let mut opus_buf: Vec<u8> = vec![0; opus_frame_size as usize]; + let result = opus_encoder + .encode_float(&Vec::from(buf.clone()), &mut opus_buf) + .unwrap(); + opus_buf.truncate(result); + let bytes = Bytes::copy_from_slice(&opus_buf); + match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) { + Ok(_) => {} + Err(_e) => { + //warn!("Error sending audio packet: {:?}", e); + } + } + *buf = tail; + } + } +} diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs new file mode 100644 index 0000000..94e4b21 --- /dev/null +++ b/mumd/src/audio/output.rs @@ -0,0 +1,90 @@ +use cpal::{OutputCallbackInfo, Sample}; +use mumble_protocol::voice::VoicePacketPayload; +use opus::Channels; +use std::collections::{HashMap, VecDeque}; +use std::ops::AddAssign; +use std::sync::{Arc, Mutex}; + +pub struct ClientStream { + buffer: VecDeque<f32>, //TODO ring buffer? + opus_decoder: opus::Decoder, +} + +impl ClientStream { + pub fn new(sample_rate: u32, channels: u16) -> Self { + Self { + buffer: VecDeque::new(), + opus_decoder: opus::Decoder::new( + sample_rate, + match channels { + 1 => Channels::Mono, + 2 => Channels::Stereo, + _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), + }, + ) + .unwrap(), + } + } + + pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) { + match payload { + VoicePacketPayload::Opus(bytes, _eot) => { + let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode + let parsed = self + .opus_decoder + .decode_float(&bytes, &mut out, false) + .expect("Error decoding"); + out.truncate(parsed); + self.buffer.extend(out); + } + _ => { + unimplemented!("Payload type not supported"); + } + } + } +} + +pub trait SaturatingAdd { + fn saturating_add(self, rhs: Self) -> Self; +} + +impl SaturatingAdd for f32 { + fn saturating_add(self, rhs: Self) -> Self { + match self + rhs { + a if a < -1.0 => -1.0, + a if a > 1.0 => 1.0, + a => a, + } + } +} + +impl SaturatingAdd for i16 { + fn saturating_add(self, rhs: Self) -> Self { + i16::saturating_add(self, rhs) + } +} + +impl SaturatingAdd for u16 { + fn saturating_add(self, rhs: Self) -> Self { + u16::saturating_add(self, rhs) + } +} + +pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd>( + buf: Arc<Mutex<HashMap<u32, ClientStream>>>, +) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { + move |data: &mut [T], _info: &OutputCallbackInfo| { + for sample in data.iter_mut() { + *sample = Sample::from(&0.0); + } + + let mut lock = buf.lock().unwrap(); + for client_stream in lock.values_mut() { + for sample in data.iter_mut() { + *sample = sample.saturating_add(Sample::from( + &client_stream.buffer.pop_front().unwrap_or(0.0), + )); + } + } + } +} |
