aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2020-10-10 20:33:55 +0200
committerGustav Sörnäs <gustav@sornas.net>2020-10-10 20:33:55 +0200
commitab8116223328412484d1e76a9ff7b2055f05abf5 (patch)
treea38ddfa10111aa6f7b5015015a1b33fae2fdb42c
parent2aee60f7bbc6186cf6ca63aef182e1fe52fa03ad (diff)
downloadmum-ab8116223328412484d1e76a9ff7b2055f05abf5.tar.gz
big ol refactor
Co-authored-by: Eskil Queseth <eskilq@kth.se>
-rw-r--r--mumd/src/audio.rs41
-rw-r--r--mumd/src/main.rs209
-rw-r--r--mumd/src/network.rs241
3 files changed, 263 insertions, 228 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index d1a309c..c6f30fb 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -1,6 +1,6 @@
-use cpal::{Sample, SampleFormat, OutputCallbackInfo, Stream, StreamConfig, SampleRate};
use cpal::traits::DeviceTrait;
use cpal::traits::HostTrait;
+use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig};
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;
@@ -11,17 +11,21 @@ pub struct Audio {
pub output_stream: Stream,
}
-impl Audio {
+impl Audio {
pub fn new() -> Self {
let output_buffer = Arc::new(Mutex::new(VecDeque::new()));
let host = cpal::default_host();
- let device = host.default_output_device().expect("default output device not found");
- let mut supported_configs_range = device.supported_output_configs()
- .expect("error querying configs");
- let supported_config = supported_configs_range.next()
- .expect("no supported config??")
- .with_sample_rate(SampleRate(48000));
+ let device = host
+ .default_output_device()
+ .expect("default output device not found");
+ let mut supported_configs_range = device
+ .supported_output_configs()
+ .expect("error querying configs");
+ let supported_config = supported_configs_range
+ .next()
+ .expect("no supported config??")
+ .with_sample_rate(SampleRate(48000));
let supported_sample_format = supported_config.sample_format();
let config: StreamConfig = supported_config.into();
@@ -29,10 +33,17 @@ impl Audio {
let stream_audio_buf = Arc::clone(&output_buffer);
let stream = match supported_sample_format {
- SampleFormat::F32 => device.build_output_stream(&config, curry_callback::<f32>(stream_audio_buf), err_fn),
- SampleFormat::I16 => device.build_output_stream(&config, curry_callback::<i16>(stream_audio_buf), err_fn),
- SampleFormat::U16 => device.build_output_stream(&config, curry_callback::<u16>(stream_audio_buf), err_fn),
- }.unwrap();
+ SampleFormat::F32 => {
+ device.build_output_stream(&config, curry_callback::<f32>(stream_audio_buf), err_fn)
+ }
+ SampleFormat::I16 => {
+ device.build_output_stream(&config, curry_callback::<i16>(stream_audio_buf), err_fn)
+ }
+ SampleFormat::U16 => {
+ device.build_output_stream(&config, curry_callback::<u16>(stream_audio_buf), err_fn)
+ }
+ }
+ .unwrap();
Self {
output_buffer,
@@ -42,9 +53,9 @@ impl Audio {
}
}
-fn curry_callback<T: Sample>(buf: Arc<Mutex<VecDeque<f32>>>)
- -> impl FnMut(&mut [T],
- &OutputCallbackInfo) + Send + 'static {
+fn curry_callback<T: Sample>(
+ buf: Arc<Mutex<VecDeque<f32>>>,
+) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
move |data: &mut [T], _info: &OutputCallbackInfo| {
let mut lock = buf.lock().unwrap();
for sample in data.iter_mut() {
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index ffd5547..3960b48 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -5,212 +5,18 @@ use crate::audio::Audio;
use argparse::ArgumentParser;
use argparse::Store;
use argparse::StoreTrue;
-use bytes::Bytes;
-
+use cpal::traits::StreamTrait;
use futures::channel::oneshot;
use futures::join;
-use futures::StreamExt;
-use futures::SinkExt;
-
-use mumble_protocol::control::msgs;
-use mumble_protocol::control::ControlPacket;
use mumble_protocol::crypt::ClientCryptState;
-use mumble_protocol::voice::VoicePacket;
-use mumble_protocol::voice::VoicePacketPayload;
-
-use std::convert::Into;
-use std::convert::TryInto;
-
-use std::net::SocketAddr;
use std::net::ToSocketAddrs;
-use tokio::time::{self, Duration};
-
-use std::sync::Arc;
-use std::sync::Mutex;
-
-use cpal::traits::StreamTrait;
-
-use opus::Channels;
-
-async fn connect(
- server_addr: SocketAddr,
- server_host: String,
- user_name: String,
- accept_invalid_cert: bool,
- crypt_state_sender: oneshot::Sender<ClientCryptState>,
-) {
- // Wrap crypt_state_sender in Option, so we can call it only once
- let mut crypt_state_sender = Some(crypt_state_sender);
-
- let (sink, mut stream) = network::connect_tcp(server_addr, server_host, accept_invalid_cert).await;
- let sink = Arc::new(Mutex::new(sink));
-
- // Handshake (omitting `Version` message for brevity)
- let mut msg = msgs::Authenticate::new();
- msg.set_username(user_name);
- msg.set_opus(true);
- let mut lock = sink.lock().unwrap();
- lock.send(msg.into()).await.unwrap();
- drop(lock);
-
- println!("Logging in..");
- let mut crypt_state = None;
-
- let ping_sink = Arc::clone(&sink);
- let handle = async {
- let mut interval = time::interval(Duration::from_secs(10));
- let sink = ping_sink;
-
- loop {
- interval.tick().await;
- let msg = msgs::Ping::new();
- let mut lock = sink.lock().unwrap();
- lock.send(msg.into()).await.unwrap();
- }
- };
-
- // Handle incoming packets
- let receive = async {
- while let Some(packet) = stream.next().await {
- match packet.unwrap() {
- ControlPacket::TextMessage(mut msg) => {
- println!(
- "Got message from user with session ID {}: {}",
- msg.get_actor(),
- msg.get_message()
- );
- // Send reply back to server
- let mut response = msgs::TextMessage::new();
- response.mut_session().push(msg.get_actor());
- response.set_message(msg.take_message());
- let mut lock = sink.lock().unwrap();
- lock.send(response.into()).await.unwrap();
- }
- ControlPacket::CryptSetup(msg) => {
- println!("crypt setup");
- // Wait until we're fully connected before initiating UDP voice
- crypt_state = Some(ClientCryptState::new_from(
- msg.get_key()
- .try_into()
- .expect("Server sent private key with incorrect size"),
- msg.get_client_nonce()
- .try_into()
- .expect("Server sent client_nonce with incorrect size"),
- msg.get_server_nonce()
- .try_into()
- .expect("Server sent server_nonce with incorrect size"),
- ));
- }
- ControlPacket::ServerSync(_) => {
- println!("Logged in!");
- if let Some(sender) = crypt_state_sender.take() {
- let _ = sender.send(
- crypt_state
- .take()
- .expect("Server didn't send us any CryptSetup packet!"),
- );
- }
- }
- ControlPacket::Reject(msg) => {
- println!("Login rejected: {:?}", msg);
- }
- _ => {},
- }
- }
- };
- join!(handle, receive);
-}
-
-async fn handle_udp(
- server_addr: SocketAddr,
- crypt_state: oneshot::Receiver<ClientCryptState>,
-) {
- let audio = Audio::new();
- audio.output_stream.play().unwrap();
-
- // create opus decoder (might be expensive)
- let mut opus_decoder = opus::Decoder::new(
- audio.output_config.sample_rate.0 as u32,
- match audio.output_config.channels {
- 1 => Channels::Mono,
- 2 => Channels::Stereo,
- _ => unimplemented!("ljudnörd (got {} channels, need 1 or 2)", audio.output_config.channels)
- }
- ).unwrap();
-
- let (mut sink, mut source) = network::connect_udp(server_addr, crypt_state).await;
-
- // Note: A normal application would also send periodic Ping packets, and its own audio
- // via UDP. We instead trick the server into accepting us by sending it one
- // dummy voice packet.
- sink.send((
- VoicePacket::Audio {
- _dst: std::marker::PhantomData,
- target: 0,
- session_id: (),
- seq_num: 0,
- payload: VoicePacketPayload::Opus(Bytes::from([0u8; 128].as_ref()), true),
- position_info: None,
- },
- server_addr,
- )).await.unwrap();
-
- // Handle incoming UDP packets
- while let Some(packet) = source.next().await {
- let (packet, _src_addr) = match packet {
- Ok(packet) => packet,
- Err(err) => {
- eprintln!("Got an invalid UDP packet: {}", err);
- // To be expected, considering this is the internet, just ignore it
- continue
- }
- };
- match packet {
- VoicePacket::Ping { .. } => {
- // Note: A normal application would handle these and only use UDP for voice
- // once it has received one.
- continue
- }
- VoicePacket::Audio {
- // seq_num,
- payload,
- // position_info,
- ..
- } => {
- match payload {
- VoicePacketPayload::Opus(bytes, _eot) => {
- let mut out: Vec<f32> = vec![0.0; bytes.len() * audio.output_config.channels as usize * 4];
- opus_decoder.decode_float(&bytes[..], &mut out, false).expect("error decoding");
- let mut lock = audio.output_buffer.lock().unwrap();
- lock.extend(out);
- },
- _ => { unimplemented!("något fint"); }
- }
-
- // decode paylout and put it in buffer
-
- // Got audio, naively echo it back
- //let reply = VoicePacket::Audio {
- // _dst: std::marker::PhantomData,
- // target: 0, // normal speech
- // session_id: (), // unused for server-bound packets
- // seq_num,
- // payload,
- // position_info,
- //};
- //sink.send((reply, src_addr)).await.unwrap();
- }
- }
- }
-}
-
#[tokio::main]
async fn main() {
// Handle command line arguments
let mut server_host = "".to_string();
let mut server_port = 64738u16;
- let mut user_name = "EchoBot".to_string();
+ let mut username = "EchoBot".to_string();
let mut accept_invalid_cert = false;
{
let mut ap = ArgumentParser::new();
@@ -220,7 +26,7 @@ async fn main() {
.required();
ap.refer(&mut server_port)
.add_option(&["--port"], Store, "Port of mumble server");
- ap.refer(&mut user_name)
+ ap.refer(&mut username)
.add_option(&["--username"], Store, "User name used to connect");
ap.refer(&mut accept_invalid_cert).add_option(
&["--accept-invalid-cert"],
@@ -239,15 +45,18 @@ async fn main() {
// For simplicity we don't deal with re-syncing, real applications would have to.
let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::<ClientCryptState>();
+ let audio = Audio::new();
+ audio.output_stream.play().unwrap();
+
// Run it
join!(
- connect(
+ network::handle_tcp(
server_addr,
server_host,
- user_name,
+ username,
accept_invalid_cert,
crypt_state_sender,
),
- handle_udp(server_addr, crypt_state_receiver)
+ network::handle_udp(server_addr, crypt_state_receiver, audio,)
);
}
diff --git a/mumd/src/network.rs b/mumd/src/network.rs
index 34b5d01..82b45da 100644
--- a/mumd/src/network.rs
+++ b/mumd/src/network.rs
@@ -1,32 +1,53 @@
+use crate::audio::Audio;
+
+use bytes::Bytes;
+use mumble_protocol::voice::VoicePacketPayload;
+use opus::Channels;
+
use futures::channel::oneshot;
+use futures::join;
+use futures::SinkExt;
use futures::StreamExt;
use futures_util::stream::{SplitSink, SplitStream};
-use mumble_protocol::{Serverbound, Clientbound};
+use mumble_protocol::control::msgs;
use mumble_protocol::control::ClientControlCodec;
use mumble_protocol::control::ControlCodec;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::voice::VoicePacket;
+use mumble_protocol::{Clientbound, Serverbound};
+use std::convert::Into;
+use std::convert::TryInto;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
+use std::sync::Arc;
+use std::sync::Mutex;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
+use tokio::time::{self, Duration};
use tokio_tls::TlsConnector;
use tokio_tls::TlsStream;
use tokio_util::codec::Decoder;
use tokio_util::codec::Framed;
use tokio_util::udp::UdpFramed;
-pub async fn connect_tcp(
+type TcpSender = SplitSink<
+ Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>,
+ ControlPacket<Serverbound>,
+>;
+type TcpReceiver =
+ SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>;
+type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>;
+type UdpReceiver = SplitStream<UdpFramed<ClientCryptState>>;
+
+async fn connect_tcp(
server_addr: SocketAddr,
server_host: String,
accept_invalid_cert: bool,
-) -> (
- SplitSink<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>, ControlPacket<Serverbound>>,
- SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>
-) {
-
- let stream = TcpStream::connect(&server_addr).await.expect("failed to connect to server:");
+) -> (TcpSender, TcpReceiver) {
+ let stream = TcpStream::connect(&server_addr)
+ .await
+ .expect("failed to connect to server:");
println!("TCP connected");
let mut builder = native_tls::TlsConnector::builder();
@@ -46,12 +67,8 @@ pub async fn connect_tcp(
}
pub async fn connect_udp(
- server_addr: SocketAddr,
crypt_state: oneshot::Receiver<ClientCryptState>,
-) -> (
- SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>,
- SplitStream<UdpFramed<ClientCryptState>>
-) {
+) -> (UdpSender, UdpReceiver) {
// Bind UDP socket
let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
.await
@@ -68,3 +85,201 @@ pub async fn connect_udp(
// Wrap the raw UDP packets in Mumble's crypto and voice codec (CryptState does both)
UdpFramed::new(udp_socket, crypt_state).split()
}
+
+async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) {
+ let mut interval = time::interval(Duration::from_secs(delay_seconds));
+ loop {
+ interval.tick().await;
+ println!("Sending ping");
+ let msg = msgs::Ping::new();
+ sink.lock().unwrap().send(msg.into()).await.unwrap();
+ }
+}
+
+async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) {
+ let mut msg = msgs::Authenticate::new();
+ msg.set_username(username);
+ msg.set_opus(true);
+ sink.lock().unwrap().send(msg.into()).await.unwrap();
+}
+
+async fn listen_tcp(
+ sink: Arc<Mutex<TcpSender>>,
+ mut stream: TcpReceiver,
+ crypt_state_sender: oneshot::Sender<ClientCryptState>,
+) {
+ let mut crypt_state = None;
+ let mut crypt_state_sender = Some(crypt_state_sender);
+
+ while let Some(packet) = stream.next().await {
+ //TODO handle types separately
+ match packet.unwrap() {
+ ControlPacket::TextMessage(mut msg) => {
+ println!(
+ "Got message from user with session ID {}: {}",
+ msg.get_actor(),
+ msg.get_message()
+ );
+ // Send reply back to server
+ let mut response = msgs::TextMessage::new();
+ response.mut_session().push(msg.get_actor());
+ response.set_message(msg.take_message());
+ let mut lock = sink.lock().unwrap();
+ lock.send(response.into()).await.unwrap();
+ }
+ ControlPacket::CryptSetup(msg) => {
+ println!("crypt setup");
+ // Wait until we're fully connected before initiating UDP voice
+ crypt_state = Some(ClientCryptState::new_from(
+ msg.get_key()
+ .try_into()
+ .expect("Server sent private key with incorrect size"),
+ msg.get_client_nonce()
+ .try_into()
+ .expect("Server sent client_nonce with incorrect size"),
+ msg.get_server_nonce()
+ .try_into()
+ .expect("Server sent server_nonce with incorrect size"),
+ ));
+ }
+ ControlPacket::ServerSync(_) => {
+ println!("Logged in!");
+ if let Some(sender) = crypt_state_sender.take() {
+ let _ = sender.send(
+ crypt_state
+ .take()
+ .expect("Server didn't send us any CryptSetup packet!"),
+ );
+ }
+ }
+ ControlPacket::Reject(msg) => {
+ println!("Login rejected: {:?}", msg);
+ }
+ _ => {}
+ }
+ }
+}
+
+pub async fn handle_tcp(
+ server_addr: SocketAddr,
+ server_host: String,
+ username: String,
+ accept_invalid_cert: bool,
+ crypt_state_sender: oneshot::Sender<ClientCryptState>,
+) {
+ let (sink, stream) = connect_tcp(server_addr, server_host, accept_invalid_cert).await;
+ let sink = Arc::new(Mutex::new(sink));
+
+ // Handshake (omitting `Version` message for brevity)
+ authenticate(Arc::clone(&sink), username).await;
+
+ println!("Logging in..");
+
+ join!(
+ send_pings(Arc::clone(&sink), 10),
+ listen_tcp(sink, stream, crypt_state_sender),
+ );
+}
+
+async fn listen_udp(
+ _sink: Arc<Mutex<UdpSender>>,
+ mut source: UdpReceiver,
+ mut opus_decoder: opus::Decoder,
+ audio: Audio,
+) {
+ while let Some(packet) = source.next().await {
+ let (packet, _src_addr) = match packet {
+ Ok(packet) => packet,
+ Err(err) => {
+ eprintln!("Got an invalid UDP packet: {}", err);
+ // To be expected, considering this is the internet, just ignore it
+ continue;
+ }
+ };
+ match packet {
+ VoicePacket::Ping { .. } => {
+ // Note: A normal application would handle these and only use UDP for voice
+ // once it has received one.
+ continue;
+ }
+ VoicePacket::Audio {
+ // seq_num,
+ payload,
+ // position_info,
+ ..
+ } => {
+ match payload {
+ VoicePacketPayload::Opus(bytes, _eot) => {
+ let mut out: Vec<f32> =
+ vec![0.0; bytes.len() * audio.output_config.channels as usize * 4];
+ opus_decoder
+ .decode_float(&bytes[..], &mut out, false)
+ .expect("error decoding");
+ let mut lock = audio.output_buffer.lock().unwrap();
+ lock.extend(out);
+ }
+ _ => {
+ unimplemented!("något fint");
+ }
+ }
+
+ // decode paylout and put it in buffer
+
+ // Got audio, naively echo it back
+ //let reply = VoicePacket::Audio {
+ // _dst: std::marker::PhantomData,
+ // target: 0, // normal speech
+ // session_id: (), // unused for server-bound packets
+ // seq_num,
+ // payload,
+ // position_info,
+ //};
+ //sink.send((reply, src_addr)).await.unwrap();
+ }
+ }
+ }
+}
+
+async fn send_ping_udp(sink: &mut UdpSender, server_addr: SocketAddr) {
+ sink.send((
+ VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0,
+ session_id: (),
+ seq_num: 0,
+ payload: VoicePacketPayload::Opus(Bytes::from([0u8; 128].as_ref()), true),
+ position_info: None,
+ },
+ server_addr,
+ ))
+ .await
+ .unwrap();
+}
+
+pub async fn handle_udp(
+ server_addr: SocketAddr,
+ crypt_state: oneshot::Receiver<ClientCryptState>,
+ audio: Audio,
+) {
+ let opus_decoder = opus::Decoder::new(
+ audio.output_config.sample_rate.0 as u32,
+ match audio.output_config.channels {
+ 1 => Channels::Mono,
+ 2 => Channels::Stereo,
+ _ => unimplemented!(
+ "ljudnörd (got {} channels, need 1 or 2)",
+ audio.output_config.channels
+ ),
+ },
+ )
+ .unwrap();
+
+ let (mut sink, source) = connect_udp(crypt_state).await;
+
+ // Note: A normal application would also send periodic Ping packets, and its own audio
+ // via UDP. We instead trick the server into accepting us by sending it one
+ // dummy voice packet.
+ send_ping_udp(&mut sink, server_addr).await;
+
+ listen_udp(Arc::new(Mutex::new(sink)), source, opus_decoder, audio).await;
+}