aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mumd/src/audio.rs174
-rw-r--r--mumd/src/main.rs6
-rw-r--r--mumd/src/network.rs49
3 files changed, 172 insertions, 57 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index 2d504cf..8b8a08e 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -1,64 +1,131 @@
+use bytes::Bytes;
use cpal::traits::DeviceTrait;
use cpal::traits::HostTrait;
-use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig};
+use cpal::{
+ InputCallbackInfo, OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig,
+};
use mumble_protocol::voice::VoicePacketPayload;
use opus::Channels;
+use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::VecDeque;
-use std::collections::hash_map::Entry;
use std::ops::AddAssign;
use std::sync::Arc;
use std::sync::Mutex;
+use tokio::sync::mpsc::{self, Receiver, Sender};
struct ClientStream {
buffer: VecDeque<f32>, //TODO ring buffer?
opus_decoder: opus::Decoder,
}
+//TODO remove pub where possible
pub struct Audio {
pub output_config: StreamConfig,
pub output_stream: Stream,
+ pub input_config: StreamConfig,
+ pub input_stream: Stream,
+ pub input_buffer: Arc<Mutex<VecDeque<f32>>>,
+ input_channel_receiver: Option<Receiver<VoicePacketPayload>>,
+
client_streams: Arc<Mutex<HashMap<u32, ClientStream>>>,
}
+//TODO split into input/output
impl Audio {
pub fn new() -> Self {
let host = cpal::default_host();
- let device = host
+ let output_device = host
.default_output_device()
.expect("default output device not found");
- let mut supported_configs_range = device
+ let mut output_supported_configs_range = output_device
.supported_output_configs()
- .expect("error querying configs");
- let supported_config = supported_configs_range
+ .expect("error querying output configs");
+ let output_supported_config = output_supported_configs_range
+ .next()
+ .expect("no supported output config??")
+ .with_sample_rate(SampleRate(48000));
+ let output_supported_sample_format = output_supported_config.sample_format();
+ let output_config: StreamConfig = output_supported_config.into();
+
+ let input_device = host
+ .default_input_device()
+ .expect("default input device not found");
+ let mut input_supported_configs_range = input_device
+ .supported_input_configs()
+ .expect("error querying input configs");
+ let input_supported_config = input_supported_configs_range
.next()
- .expect("no supported config??")
+ .expect("no supported input config??")
.with_sample_rate(SampleRate(48000));
- let supported_sample_format = supported_config.sample_format();
- let config: StreamConfig = supported_config.into();
+ let input_supported_sample_format = input_supported_config.sample_format();
+ let input_config: StreamConfig = input_supported_config.into();
let err_fn = |err| eprintln!("an error occurred on the output audio stream: {}", err);
let client_streams = Arc::new(Mutex::new(HashMap::new()));
- let output_client_streams = Arc::clone(&client_streams);
+ let output_stream = match output_supported_sample_format {
+ SampleFormat::F32 => output_device.build_output_stream(
+ &output_config,
+ output_curry_callback::<f32>(Arc::clone(&client_streams)),
+ err_fn,
+ ),
+ SampleFormat::I16 => output_device.build_output_stream(
+ &output_config,
+ output_curry_callback::<i16>(Arc::clone(&client_streams)),
+ err_fn,
+ ),
+ SampleFormat::U16 => output_device.build_output_stream(
+ &output_config,
+ output_curry_callback::<u16>(Arc::clone(&client_streams)),
+ err_fn,
+ ),
+ }
+ .unwrap();
- let stream = match supported_sample_format {
- SampleFormat::F32 => {
- device.build_output_stream(&config, curry_callback::<f32>(output_client_streams), err_fn)
- }
- SampleFormat::I16 => {
- device.build_output_stream(&config, curry_callback::<i16>(output_client_streams), err_fn)
- }
- SampleFormat::U16 => {
- device.build_output_stream(&config, curry_callback::<u16>(output_client_streams), err_fn)
- }
+ let input_encoder = opus::Encoder::new(
+ input_config.sample_rate.0,
+ match input_config.channels {
+ 1 => Channels::Mono,
+ 2 => Channels::Stereo,
+ _ => unimplemented!(
+ "only 1 or 2 channels supported, got {})",
+ input_config.channels
+ ),
+ },
+ opus::Application::Voip,
+ )
+ .unwrap();
+ let (input_sender, input_receiver) = mpsc::channel(100);
+
+ let input_buffer = Arc::new(Mutex::new(VecDeque::new()));
+ let input_stream = match input_supported_sample_format {
+ SampleFormat::F32 => input_device.build_input_stream(
+ &input_config,
+ input_callback::<f32>(input_encoder, input_sender),
+ err_fn,
+ ),
+ SampleFormat::I16 => input_device.build_input_stream(
+ &input_config,
+ input_callback::<i16>(input_encoder, input_sender),
+ err_fn,
+ ),
+ SampleFormat::U16 => input_device.build_input_stream(
+ &input_config,
+ input_callback::<u16>(input_encoder, input_sender),
+ err_fn,
+ ),
}
.unwrap();
Self {
- output_config: config,
- output_stream: stream,
+ output_config,
+ output_stream,
+ input_config,
+ input_stream,
+ input_buffer,
+ input_channel_receiver: Some(input_receiver),
client_streams,
}
}
@@ -66,7 +133,9 @@ impl Audio {
pub fn decode_packet(&self, session_id: u32, payload: VoicePacketPayload) {
match self.client_streams.lock().unwrap().entry(session_id) {
Entry::Occupied(mut entry) => {
- entry.get_mut().decode_packet(payload, self.output_config.channels as usize);
+ entry
+ .get_mut()
+ .decode_packet(payload, self.output_config.channels as usize);
}
Entry::Vacant(_) => {
eprintln!("cannot find session id {}", session_id);
@@ -82,7 +151,7 @@ impl Audio {
Entry::Vacant(entry) => {
entry.insert(ClientStream::new(
self.output_config.sample_rate.0,
- self.output_config.channels
+ self.output_config.channels,
));
}
}
@@ -94,10 +163,17 @@ impl Audio {
entry.remove();
}
Entry::Vacant(_) => {
- eprintln!("tried to remove session id {} that doesn't exist", session_id);
+ eprintln!(
+ "tried to remove session id {} that doesn't exist",
+ session_id
+ );
}
}
}
+
+ pub fn take_receiver(&mut self) -> Option<Receiver<VoicePacketPayload>> {
+ self.input_channel_receiver.take()
+ }
}
impl ClientStream {
@@ -109,23 +185,20 @@ impl ClientStream {
match channels {
1 => Channels::Mono,
2 => Channels::Stereo,
- _ => unimplemented!(
- "only 1 or 2 supported, got {})",
- channels
- ),
+ _ => unimplemented!("only 1 or 2 channels supported, got {})", channels),
},
- ).unwrap(),
+ )
+ .unwrap(),
}
}
fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) {
match payload {
VoicePacketPayload::Opus(bytes, _eot) => {
- let mut out: Vec<f32> =
- vec![0.0; bytes.len() * channels * 4];
+ let mut out: Vec<f32> = vec![0.0; bytes.len() * channels * 4];
self.opus_decoder
- .decode_float(&bytes[..], &mut out, false)
- .expect("error decoding");
+ .decode_float(&bytes, &mut out, false)
+ .expect("error decoding"); //FIXME sometimes panics here
self.buffer.extend(out);
}
_ => {
@@ -143,7 +216,7 @@ impl SaturatingAdd for f32 {
fn saturating_add(self, rhs: Self) -> Self {
match self + rhs {
a if a < -1.0 => -1.0,
- a if a > 1.0 => 1.0,
+ a if a > 1.0 => 1.0,
a => a,
}
}
@@ -161,7 +234,7 @@ impl SaturatingAdd for u16 {
}
}
-fn curry_callback<T: Sample + AddAssign + SaturatingAdd>(
+fn output_curry_callback<T: Sample + AddAssign + SaturatingAdd>(
buf: Arc<Mutex<HashMap<u32, ClientStream>>>,
) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
move |data: &mut [T], _info: &OutputCallbackInfo| {
@@ -172,8 +245,35 @@ fn curry_callback<T: Sample + AddAssign + SaturatingAdd>(
let mut lock = buf.lock().unwrap();
for client_stream in lock.values_mut() {
for sample in data.iter_mut() {
- *sample = sample.saturating_add(Sample::from(&client_stream.buffer.pop_front().unwrap_or(0.0)));
+ *sample = sample.saturating_add(Sample::from(
+ &client_stream.buffer.pop_front().unwrap_or(0.0),
+ ));
}
}
}
}
+
+fn input_callback<T: Sample>(
+ mut opus_encoder: opus::Encoder,
+ mut input_sender: Sender<VoicePacketPayload>,
+) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static {
+ let buf = Arc::new(Mutex::new(VecDeque::new()));
+ move |data: &[T], _info: &InputCallbackInfo| {
+ let mut buf = buf.lock().unwrap();
+ let out: Vec<f32> = data.iter().map(|e| e.to_f32()).collect();
+ buf.extend(out);
+ while buf.len() >= 2880 {
+ let tail = buf.split_off(2880);
+ let mut opus_buf: Vec<u8> = vec![0; 100_000];
+ let result = opus_encoder
+ .encode_float(&Vec::from(buf.clone()), &mut opus_buf)
+ .unwrap();
+ opus_buf.truncate(result);
+ let bytes = Bytes::copy_from_slice(&opus_buf);
+ input_sender
+ .try_send(VoicePacketPayload::Opus(bytes, false))
+ .unwrap(); //TODO handle full buffer / disconnect
+ *buf = tail;
+ }
+ }
+}
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index 4c3b67c..1608947 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -61,10 +61,6 @@ async fn main() {
crypt_state_sender,
Arc::clone(&audio),
),
- network::handle_udp(
- server_addr,
- crypt_state_receiver,
- audio,
- )
+ network::handle_udp(server_addr, crypt_state_receiver, audio,),
);
}
diff --git a/mumd/src/network.rs b/mumd/src/network.rs
index c59754d..875463c 100644
--- a/mumd/src/network.rs
+++ b/mumd/src/network.rs
@@ -1,15 +1,15 @@
use crate::audio::Audio;
use bytes::Bytes;
-use futures::SinkExt;
-use futures::StreamExt;
use futures::channel::oneshot;
use futures::join;
+use futures::SinkExt;
+use futures::StreamExt;
use futures_util::stream::{SplitSink, SplitStream};
+use mumble_protocol::control::msgs;
use mumble_protocol::control::ClientControlCodec;
use mumble_protocol::control::ControlCodec;
use mumble_protocol::control::ControlPacket;
-use mumble_protocol::control::msgs;
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::voice::VoicePacket;
use mumble_protocol::voice::VoicePacketPayload;
@@ -217,17 +217,6 @@ async fn listen_udp(
..
} => {
audio.lock().unwrap().decode_packet(session_id, payload);
-
- // Got audio, naively echo it back
- //let reply = VoicePacket::Audio {
- // _dst: std::marker::PhantomData,
- // target: 0, // normal speech
- // session_id: (), // unused for server-bound packets
- // seq_num,
- // payload,
- // position_info,
- //};
- //sink.send((reply, src_addr)).await.unwrap();
}
}
}
@@ -249,6 +238,32 @@ async fn send_ping_udp(sink: &mut UdpSender, server_addr: SocketAddr) {
.unwrap();
}
+async fn send_voice_udp(
+ sink: Arc<Mutex<UdpSender>>,
+ server_addr: SocketAddr,
+ audio: Arc<Mutex<Audio>>,
+) {
+ let mut receiver = audio.lock().unwrap().take_receiver().unwrap();
+
+ let mut count = 0;
+ while let Some(payload) = receiver.recv().await {
+ let reply = VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0, // normal speech
+ session_id: (), // unused for server-bound packets
+ seq_num: count,
+ payload,
+ position_info: None,
+ };
+ count += 1;
+ sink.lock()
+ .unwrap()
+ .send((reply, server_addr))
+ .await
+ .unwrap();
+ }
+}
+
pub async fn handle_udp(
server_addr: SocketAddr,
crypt_state: oneshot::Receiver<ClientCryptState>,
@@ -261,5 +276,9 @@ pub async fn handle_udp(
// dummy voice packet.
send_ping_udp(&mut sink, server_addr).await;
- listen_udp(Arc::new(Mutex::new(sink)), source, audio).await;
+ let sink = Arc::new(Mutex::new(sink));
+ join!(
+ listen_udp(Arc::clone(&sink), source, Arc::clone(&audio)),
+ send_voice_udp(sink, server_addr, audio)
+ );
}