aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src
diff options
context:
space:
mode:
Diffstat (limited to 'mumd/src')
-rw-r--r--mumd/src/audio.rs166
-rw-r--r--mumd/src/audio/input.rs52
-rw-r--r--mumd/src/audio/output.rs90
3 files changed, 158 insertions, 150 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index bbde547..8609a91 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -1,34 +1,25 @@
-use bytes::Bytes;
+pub mod input;
+pub mod output;
+
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
-use cpal::{
- InputCallbackInfo, OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig,
-};
+use cpal::{SampleFormat, SampleRate, Stream, StreamConfig};
use log::*;
use mumble_protocol::voice::VoicePacketPayload;
use opus::Channels;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
-use std::collections::VecDeque;
-use std::ops::AddAssign;
-use std::sync::Arc;
-use std::sync::Mutex;
-use tokio::sync::mpsc::{self, Receiver, Sender};
-use tokio::sync::watch;
-
-struct ClientStream {
- buffer: VecDeque<f32>, //TODO ring buffer?
- opus_decoder: opus::Decoder,
-}
+use std::sync::{Arc, Mutex};
+use tokio::sync::{mpsc, watch};
pub struct Audio {
output_config: StreamConfig,
_output_stream: Stream,
_input_stream: Stream,
- input_channel_receiver: Option<Receiver<VoicePacketPayload>>,
+ input_channel_receiver: Option<mpsc::Receiver<VoicePacketPayload>>,
input_volume_sender: watch::Sender<f32>,
- client_streams: Arc<Mutex<HashMap<u32, ClientStream>>>,
+ client_streams: Arc<Mutex<HashMap<u32, output::ClientStream>>>,
}
impl Audio {
@@ -66,17 +57,17 @@ impl Audio {
let output_stream = match output_supported_sample_format {
SampleFormat::F32 => output_device.build_output_stream(
&output_config,
- output_curry_callback::<f32>(Arc::clone(&client_streams)),
+ output::curry_callback::<f32>(Arc::clone(&client_streams)),
err_fn,
),
SampleFormat::I16 => output_device.build_output_stream(
&output_config,
- output_curry_callback::<i16>(Arc::clone(&client_streams)),
+ output::curry_callback::<i16>(Arc::clone(&client_streams)),
err_fn,
),
SampleFormat::U16 => output_device.build_output_stream(
&output_config,
- output_curry_callback::<u16>(Arc::clone(&client_streams)),
+ output::curry_callback::<u16>(Arc::clone(&client_streams)),
err_fn,
),
}
@@ -102,7 +93,7 @@ impl Audio {
let input_stream = match input_supported_sample_format {
SampleFormat::F32 => input_device.build_input_stream(
&input_config,
- input_callback::<f32>(
+ input::callback::<f32>(
input_encoder,
input_sender,
input_config.sample_rate.0,
@@ -113,7 +104,7 @@ impl Audio {
),
SampleFormat::I16 => input_device.build_input_stream(
&input_config,
- input_callback::<i16>(
+ input::callback::<i16>(
input_encoder,
input_sender,
input_config.sample_rate.0,
@@ -124,7 +115,7 @@ impl Audio {
),
SampleFormat::U16 => input_device.build_input_stream(
&input_config,
- input_callback::<u16>(
+ input::callback::<u16>(
input_encoder,
input_sender,
input_config.sample_rate.0,
@@ -167,7 +158,7 @@ impl Audio {
warn!("Session id {} already exists", session_id);
}
Entry::Vacant(entry) => {
- entry.insert(ClientStream::new(
+ entry.insert(output::ClientStream::new(
self.output_config.sample_rate.0,
self.output_config.channels,
));
@@ -189,7 +180,7 @@ impl Audio {
}
}
- pub fn take_receiver(&mut self) -> Option<Receiver<VoicePacketPayload>> {
+ pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<VoicePacketPayload>> {
self.input_channel_receiver.take()
}
@@ -201,128 +192,3 @@ impl Audio {
self.input_volume_sender.broadcast(input_volume).unwrap();
}
}
-
-impl ClientStream {
- fn new(sample_rate: u32, channels: u16) -> Self {
- Self {
- buffer: VecDeque::new(),
- opus_decoder: opus::Decoder::new(
- sample_rate,
- match channels {
- 1 => Channels::Mono,
- 2 => Channels::Stereo,
- _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels),
- },
- )
- .unwrap(),
- }
- }
-
- fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) {
- match payload {
- VoicePacketPayload::Opus(bytes, _eot) => {
- let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode
- let parsed = self
- .opus_decoder
- .decode_float(&bytes, &mut out, false)
- .expect("Error decoding");
- out.truncate(parsed);
- self.buffer.extend(out);
- }
- _ => {
- unimplemented!("Payload type not supported");
- }
- }
- }
-}
-
-trait SaturatingAdd {
- fn saturating_add(self, rhs: Self) -> Self;
-}
-
-impl SaturatingAdd for f32 {
- fn saturating_add(self, rhs: Self) -> Self {
- match self + rhs {
- a if a < -1.0 => -1.0,
- a if a > 1.0 => 1.0,
- a => a,
- }
- }
-}
-
-impl SaturatingAdd for i16 {
- fn saturating_add(self, rhs: Self) -> Self {
- i16::saturating_add(self, rhs)
- }
-}
-
-impl SaturatingAdd for u16 {
- fn saturating_add(self, rhs: Self) -> Self {
- u16::saturating_add(self, rhs)
- }
-}
-
-fn output_curry_callback<T: Sample + AddAssign + SaturatingAdd>(
- buf: Arc<Mutex<HashMap<u32, ClientStream>>>,
-) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
- move |data: &mut [T], _info: &OutputCallbackInfo| {
- for sample in data.iter_mut() {
- *sample = Sample::from(&0.0);
- }
-
- let mut lock = buf.lock().unwrap();
- for client_stream in lock.values_mut() {
- for sample in data.iter_mut() {
- *sample = sample.saturating_add(Sample::from(
- &client_stream.buffer.pop_front().unwrap_or(0.0),
- ));
- }
- }
- }
-}
-
-fn input_callback<T: Sample>(
- mut opus_encoder: opus::Encoder,
- mut input_sender: Sender<VoicePacketPayload>,
- sample_rate: u32,
- input_volume_receiver: watch::Receiver<f32>,
- opus_frame_size_blocks: u32, // blocks of 2.5ms
-) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static {
- if !(opus_frame_size_blocks == 1
- || opus_frame_size_blocks == 2
- || opus_frame_size_blocks == 4
- || opus_frame_size_blocks == 8)
- {
- panic!(
- "Unsupported amount of opus frame blocks {}",
- opus_frame_size_blocks
- );
- }
- let opus_frame_size = opus_frame_size_blocks * sample_rate / 400;
-
- let buf = Arc::new(Mutex::new(VecDeque::new()));
- move |data: &[T], _info: &InputCallbackInfo| {
- let mut buf = buf.lock().unwrap();
- let input_volume = *input_volume_receiver.borrow();
- let out: Vec<f32> = data.iter().map(|e| e.to_f32())
- .map(|e| e * input_volume)
- .collect();
- buf.extend(out);
- while buf.len() >= opus_frame_size as usize {
- let tail = buf.split_off(opus_frame_size as usize);
- let mut opus_buf: Vec<u8> = vec![0; opus_frame_size as usize];
- let result = opus_encoder
- .encode_float(&Vec::from(buf.clone()), &mut opus_buf)
- .unwrap();
- opus_buf.truncate(result);
- let bytes = Bytes::copy_from_slice(&opus_buf);
- match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) {
- Ok(_) => {}
- Err(_e) => {
- //warn!("Error sending audio packet: {:?}", e);
- }
- }
- *buf = tail;
- }
- }
-}
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
new file mode 100644
index 0000000..4e95360
--- /dev/null
+++ b/mumd/src/audio/input.rs
@@ -0,0 +1,52 @@
+use bytes::Bytes;
+use cpal::{InputCallbackInfo, Sample};
+use mumble_protocol::voice::VoicePacketPayload;
+use std::collections::VecDeque;
+use std::sync::{Arc, Mutex};
+use tokio::sync::{mpsc, watch};
+
+pub fn callback<T: Sample>(
+ mut opus_encoder: opus::Encoder,
+ mut input_sender: mpsc::Sender<VoicePacketPayload>,
+ sample_rate: u32,
+ input_volume_receiver: watch::Receiver<f32>,
+ opus_frame_size_blocks: u32, // blocks of 2.5ms
+) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static {
+ if !(opus_frame_size_blocks == 1
+ || opus_frame_size_blocks == 2
+ || opus_frame_size_blocks == 4
+ || opus_frame_size_blocks == 8)
+ {
+ panic!(
+ "Unsupported amount of opus frame blocks {}",
+ opus_frame_size_blocks
+ );
+ }
+ let opus_frame_size = opus_frame_size_blocks * sample_rate / 400;
+
+ let buf = Arc::new(Mutex::new(VecDeque::new()));
+ move |data: &[T], _info: &InputCallbackInfo| {
+ let mut buf = buf.lock().unwrap();
+ let input_volume = *input_volume_receiver.borrow();
+ let out: Vec<f32> = data.iter().map(|e| e.to_f32())
+ .map(|e| e * input_volume)
+ .collect();
+ buf.extend(out);
+ while buf.len() >= opus_frame_size as usize {
+ let tail = buf.split_off(opus_frame_size as usize);
+ let mut opus_buf: Vec<u8> = vec![0; opus_frame_size as usize];
+ let result = opus_encoder
+ .encode_float(&Vec::from(buf.clone()), &mut opus_buf)
+ .unwrap();
+ opus_buf.truncate(result);
+ let bytes = Bytes::copy_from_slice(&opus_buf);
+ match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) {
+ Ok(_) => {}
+ Err(_e) => {
+ //warn!("Error sending audio packet: {:?}", e);
+ }
+ }
+ *buf = tail;
+ }
+ }
+}
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs
new file mode 100644
index 0000000..94e4b21
--- /dev/null
+++ b/mumd/src/audio/output.rs
@@ -0,0 +1,90 @@
+use cpal::{OutputCallbackInfo, Sample};
+use mumble_protocol::voice::VoicePacketPayload;
+use opus::Channels;
+use std::collections::{HashMap, VecDeque};
+use std::ops::AddAssign;
+use std::sync::{Arc, Mutex};
+
+pub struct ClientStream {
+ buffer: VecDeque<f32>, //TODO ring buffer?
+ opus_decoder: opus::Decoder,
+}
+
+impl ClientStream {
+ pub fn new(sample_rate: u32, channels: u16) -> Self {
+ Self {
+ buffer: VecDeque::new(),
+ opus_decoder: opus::Decoder::new(
+ sample_rate,
+ match channels {
+ 1 => Channels::Mono,
+ 2 => Channels::Stereo,
+ _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels),
+ },
+ )
+ .unwrap(),
+ }
+ }
+
+ pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) {
+ match payload {
+ VoicePacketPayload::Opus(bytes, _eot) => {
+ let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode
+ let parsed = self
+ .opus_decoder
+ .decode_float(&bytes, &mut out, false)
+ .expect("Error decoding");
+ out.truncate(parsed);
+ self.buffer.extend(out);
+ }
+ _ => {
+ unimplemented!("Payload type not supported");
+ }
+ }
+ }
+}
+
+pub trait SaturatingAdd {
+ fn saturating_add(self, rhs: Self) -> Self;
+}
+
+impl SaturatingAdd for f32 {
+ fn saturating_add(self, rhs: Self) -> Self {
+ match self + rhs {
+ a if a < -1.0 => -1.0,
+ a if a > 1.0 => 1.0,
+ a => a,
+ }
+ }
+}
+
+impl SaturatingAdd for i16 {
+ fn saturating_add(self, rhs: Self) -> Self {
+ i16::saturating_add(self, rhs)
+ }
+}
+
+impl SaturatingAdd for u16 {
+ fn saturating_add(self, rhs: Self) -> Self {
+ u16::saturating_add(self, rhs)
+ }
+}
+
+pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd>(
+ buf: Arc<Mutex<HashMap<u32, ClientStream>>>,
+) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
+ move |data: &mut [T], _info: &OutputCallbackInfo| {
+ for sample in data.iter_mut() {
+ *sample = Sample::from(&0.0);
+ }
+
+ let mut lock = buf.lock().unwrap();
+ for client_stream in lock.values_mut() {
+ for sample in data.iter_mut() {
+ *sample = sample.saturating_add(Sample::from(
+ &client_stream.buffer.pop_front().unwrap_or(0.0),
+ ));
+ }
+ }
+ }
+}