aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src
diff options
context:
space:
mode:
Diffstat (limited to 'mumd/src')
-rw-r--r--mumd/src/audio.rs127
-rw-r--r--mumd/src/main.rs10
-rw-r--r--mumd/src/network.rs56
3 files changed, 140 insertions, 53 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index c6f30fb..747716c 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -1,20 +1,29 @@
use cpal::traits::DeviceTrait;
use cpal::traits::HostTrait;
use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig};
+use mumble_protocol::voice::VoicePacketPayload;
+use opus::Channels;
+use std::collections::HashMap;
use std::collections::VecDeque;
+use std::collections::hash_map::Entry;
+use std::ops::AddAssign;
use std::sync::Arc;
use std::sync::Mutex;
+struct ClientStream {
+ buffer: VecDeque<f32>, //TODO ring buffer?
+ opus_decoder: opus::Decoder,
+}
+
pub struct Audio {
- pub output_buffer: Arc<Mutex<VecDeque<f32>>>, //TODO ring buffer?
pub output_config: StreamConfig,
pub output_stream: Stream,
+
+ client_streams: Arc<Mutex<HashMap<u32, ClientStream>>>,
}
impl Audio {
pub fn new() -> Self {
- let output_buffer = Arc::new(Mutex::new(VecDeque::new()));
-
let host = cpal::default_host();
let device = host
.default_output_device()
@@ -31,35 +40,129 @@ impl Audio {
let err_fn = |err| eprintln!("an error occurred on the output audio stream: {}", err);
- let stream_audio_buf = Arc::clone(&output_buffer);
+ let client_streams = Arc::new(Mutex::new(HashMap::new()));
+ let output_client_streams = Arc::clone(&client_streams);
+
let stream = match supported_sample_format {
SampleFormat::F32 => {
- device.build_output_stream(&config, curry_callback::<f32>(stream_audio_buf), err_fn)
+ device.build_output_stream(&config, curry_callback::<f32>(output_client_streams), err_fn)
}
SampleFormat::I16 => {
- device.build_output_stream(&config, curry_callback::<i16>(stream_audio_buf), err_fn)
+ device.build_output_stream(&config, curry_callback::<i16>(output_client_streams), err_fn)
}
SampleFormat::U16 => {
- device.build_output_stream(&config, curry_callback::<u16>(stream_audio_buf), err_fn)
+ device.build_output_stream(&config, curry_callback::<u16>(output_client_streams), err_fn)
}
}
.unwrap();
Self {
- output_buffer,
output_config: config,
output_stream: stream,
+ client_streams,
+ }
+ }
+
+ pub fn decode_packet(&self, session_id: u32, payload: VoicePacketPayload) {
+ match self.client_streams.lock().unwrap().entry(session_id) {
+ Entry::Occupied(mut entry) => {
+ entry.get_mut().decode_packet(payload, self.output_config.channels as usize);
+ }
+ Entry::Vacant(_) => {
+ eprintln!("cannot find session id {}", session_id);
+ }
+ }
+ }
+
+ pub fn add_client(&self, session_id: u32) {
+ match self.client_streams.lock().unwrap().entry(session_id) {
+ Entry::Occupied(_) => {
+ eprintln!("session id {} already exists", session_id);
+ }
+ Entry::Vacant(entry) => {
+ entry.insert(ClientStream::new(
+ self.output_config.sample_rate.0,
+ self.output_config.channels
+ ));
+ }
+ }
+ }
+}
+
+impl ClientStream {
+ fn new(sample_rate: u32, channels: u16) -> Self {
+ Self {
+ buffer: VecDeque::new(),
+ opus_decoder: opus::Decoder::new(
+ sample_rate,
+ match channels {
+ 1 => Channels::Mono,
+ 2 => Channels::Stereo,
+ _ => unimplemented!(
+ "only 1 or 2 supported, got {})",
+ channels
+ ),
+ },
+ ).unwrap(),
+ }
+ }
+
+ fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) {
+ match payload {
+ VoicePacketPayload::Opus(bytes, _eot) => {
+ let mut out: Vec<f32> =
+ vec![0.0; bytes.len() * channels * 4];
+ self.opus_decoder
+ .decode_float(&bytes[..], &mut out, false)
+ .expect("error decoding");
+ self.buffer.extend(out);
+ }
+ _ => {
+ unimplemented!("payload type not supported");
+ }
}
}
}
-fn curry_callback<T: Sample>(
- buf: Arc<Mutex<VecDeque<f32>>>,
+trait SaturatingAdd {
+ fn saturating_add(self, rhs: Self) -> Self;
+}
+
+impl SaturatingAdd for f32 {
+ fn saturating_add(self, rhs: Self) -> Self {
+ match self + rhs {
+ a if a < -1.0 => -1.0,
+ a if a > 1.0 => 1.0,
+ a => a,
+ }
+ }
+}
+
+impl SaturatingAdd for i16 {
+ fn saturating_add(self, rhs: Self) -> Self {
+ i16::saturating_add(self, rhs)
+ }
+}
+
+impl SaturatingAdd for u16 {
+ fn saturating_add(self, rhs: Self) -> Self {
+ u16::saturating_add(self, rhs)
+ }
+}
+
+fn curry_callback<T: Sample + AddAssign + SaturatingAdd>(
+ buf: Arc<Mutex<HashMap<u32, ClientStream>>>,
) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
move |data: &mut [T], _info: &OutputCallbackInfo| {
- let mut lock = buf.lock().unwrap();
for sample in data.iter_mut() {
- *sample = Sample::from(&lock.pop_front().unwrap_or(0.0));
+ *sample = Sample::from(&0.0);
+ }
+
+ let mut lock = buf.lock().unwrap();
+ for client_stream in lock.values_mut() {
+ for sample in data.iter_mut() {
+ *sample = sample.saturating_add(Sample::from(&client_stream.buffer.pop_front().unwrap_or(0.0)));
+ }
}
}
}
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index 3960b48..4c3b67c 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -10,6 +10,8 @@ use futures::channel::oneshot;
use futures::join;
use mumble_protocol::crypt::ClientCryptState;
use std::net::ToSocketAddrs;
+use std::sync::Arc;
+use std::sync::Mutex;
#[tokio::main]
async fn main() {
@@ -47,6 +49,7 @@ async fn main() {
let audio = Audio::new();
audio.output_stream.play().unwrap();
+ let audio = Arc::new(Mutex::new(audio));
// Run it
join!(
@@ -56,7 +59,12 @@ async fn main() {
username,
accept_invalid_cert,
crypt_state_sender,
+ Arc::clone(&audio),
),
- network::handle_udp(server_addr, crypt_state_receiver, audio,)
+ network::handle_udp(
+ server_addr,
+ crypt_state_receiver,
+ audio,
+ )
);
}
diff --git a/mumd/src/network.rs b/mumd/src/network.rs
index 82b45da..a2be9ea 100644
--- a/mumd/src/network.rs
+++ b/mumd/src/network.rs
@@ -1,20 +1,18 @@
use crate::audio::Audio;
use bytes::Bytes;
-use mumble_protocol::voice::VoicePacketPayload;
-use opus::Channels;
-
-use futures::channel::oneshot;
-use futures::join;
use futures::SinkExt;
use futures::StreamExt;
+use futures::channel::oneshot;
+use futures::join;
use futures_util::stream::{SplitSink, SplitStream};
-use mumble_protocol::control::msgs;
use mumble_protocol::control::ClientControlCodec;
use mumble_protocol::control::ControlCodec;
use mumble_protocol::control::ControlPacket;
+use mumble_protocol::control::msgs;
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::voice::VoicePacket;
+use mumble_protocol::voice::VoicePacketPayload;
use mumble_protocol::{Clientbound, Serverbound};
use std::convert::Into;
use std::convert::TryInto;
@@ -107,6 +105,7 @@ async fn listen_tcp(
sink: Arc<Mutex<TcpSender>>,
mut stream: TcpReceiver,
crypt_state_sender: oneshot::Sender<ClientCryptState>,
+ audio: Arc<Mutex<Audio>>,
) {
let mut crypt_state = None;
let mut crypt_state_sender = Some(crypt_state_sender);
@@ -155,6 +154,10 @@ async fn listen_tcp(
ControlPacket::Reject(msg) => {
println!("Login rejected: {:?}", msg);
}
+ ControlPacket::UserState(msg) => {
+ println!("Found user {}", msg.get_name());
+ audio.lock().unwrap().add_client(msg.get_session());
+ }
_ => {}
}
}
@@ -166,6 +169,7 @@ pub async fn handle_tcp(
username: String,
accept_invalid_cert: bool,
crypt_state_sender: oneshot::Sender<ClientCryptState>,
+ audio: Arc<Mutex<Audio>>,
) {
let (sink, stream) = connect_tcp(server_addr, server_host, accept_invalid_cert).await;
let sink = Arc::new(Mutex::new(sink));
@@ -177,15 +181,14 @@ pub async fn handle_tcp(
join!(
send_pings(Arc::clone(&sink), 10),
- listen_tcp(sink, stream, crypt_state_sender),
+ listen_tcp(sink, stream, crypt_state_sender, audio),
);
}
async fn listen_udp(
_sink: Arc<Mutex<UdpSender>>,
mut source: UdpReceiver,
- mut opus_decoder: opus::Decoder,
- audio: Audio,
+ audio: Arc<Mutex<Audio>>,
) {
while let Some(packet) = source.next().await {
let (packet, _src_addr) = match packet {
@@ -203,27 +206,13 @@ async fn listen_udp(
continue;
}
VoicePacket::Audio {
+ session_id,
// seq_num,
payload,
// position_info,
..
} => {
- match payload {
- VoicePacketPayload::Opus(bytes, _eot) => {
- let mut out: Vec<f32> =
- vec![0.0; bytes.len() * audio.output_config.channels as usize * 4];
- opus_decoder
- .decode_float(&bytes[..], &mut out, false)
- .expect("error decoding");
- let mut lock = audio.output_buffer.lock().unwrap();
- lock.extend(out);
- }
- _ => {
- unimplemented!("något fint");
- }
- }
-
- // decode paylout and put it in buffer
+ audio.lock().unwrap().decode_packet(session_id, payload);
// Got audio, naively echo it back
//let reply = VoicePacket::Audio {
@@ -259,21 +248,8 @@ async fn send_ping_udp(sink: &mut UdpSender, server_addr: SocketAddr) {
pub async fn handle_udp(
server_addr: SocketAddr,
crypt_state: oneshot::Receiver<ClientCryptState>,
- audio: Audio,
+ audio: Arc<Mutex<Audio>>,
) {
- let opus_decoder = opus::Decoder::new(
- audio.output_config.sample_rate.0 as u32,
- match audio.output_config.channels {
- 1 => Channels::Mono,
- 2 => Channels::Stereo,
- _ => unimplemented!(
- "ljudnörd (got {} channels, need 1 or 2)",
- audio.output_config.channels
- ),
- },
- )
- .unwrap();
-
let (mut sink, source) = connect_udp(crypt_state).await;
// Note: A normal application would also send periodic Ping packets, and its own audio
@@ -281,5 +257,5 @@ pub async fn handle_udp(
// dummy voice packet.
send_ping_udp(&mut sink, server_addr).await;
- listen_udp(Arc::new(Mutex::new(sink)), source, opus_decoder, audio).await;
+ listen_udp(Arc::new(Mutex::new(sink)), source, audio).await;
}