aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2020-10-13 00:40:38 +0200
committerGitHub <noreply@github.com>2020-10-13 00:40:38 +0200
commit63c2b18a558dc9c22be61c0068fd2ae62b188dea (patch)
treeb29faeac040627d851627b5ef547d102fade5bee
parent57c258398831b180d5678be526854bc43f521e42 (diff)
downloadmum-63c2b18a558dc9c22be61c0068fd2ae62b188dea.tar.gz
split network.rs to tcp.rs and udp.rs (#4)
-rw-r--r--mumd/src/main.rs4
-rw-r--r--mumd/src/network/mod.rs2
-rw-r--r--mumd/src/network/tcp.rs (renamed from mumd/src/network.rs)185
-rw-r--r--mumd/src/network/udp.rs131
4 files changed, 168 insertions, 154 deletions
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index fcffb87..2a0fcbd 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -82,7 +82,7 @@ async fn main() {
// Run it
join!(
- network::handle_tcp(
+ network::tcp::handle(
server_state,
server_addr,
server_host,
@@ -91,7 +91,7 @@ async fn main() {
crypt_state_sender,
Arc::clone(&audio),
),
- network::handle_udp(
+ network::udp::handle(
server_addr,
crypt_state_receiver,
audio,
diff --git a/mumd/src/network/mod.rs b/mumd/src/network/mod.rs
new file mode 100644
index 0000000..f7a6a76
--- /dev/null
+++ b/mumd/src/network/mod.rs
@@ -0,0 +1,2 @@
+pub mod tcp;
+pub mod udp;
diff --git a/mumd/src/network.rs b/mumd/src/network/tcp.rs
index 947612f..dde98aa 100644
--- a/mumd/src/network.rs
+++ b/mumd/src/network/tcp.rs
@@ -1,23 +1,20 @@
use crate::audio::Audio;
use crate::state::Server;
+use log::*;
-use bytes::Bytes;
use futures::channel::oneshot;
use futures::{join, SinkExt, StreamExt};
use futures_util::stream::{SplitSink, SplitStream};
-use log::*;
use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket};
use mumble_protocol::crypt::ClientCryptState;
-use mumble_protocol::voice::{VoicePacket, VoicePacketPayload};
use mumble_protocol::{Clientbound, Serverbound};
use std::convert::{Into, TryInto};
-use std::net::{Ipv6Addr, SocketAddr};
+use std::net::{SocketAddr};
use std::sync::{Arc, Mutex};
-use tokio::net::{TcpStream, UdpSocket};
+use tokio::net::TcpStream;
use tokio::time::{self, Duration};
use tokio_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
-use tokio_util::udp::UdpFramed;
type TcpSender = SplitSink<
Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>,
@@ -25,10 +22,31 @@ type TcpSender = SplitSink<
>;
type TcpReceiver =
SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>;
-type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>;
-type UdpReceiver = SplitStream<UdpFramed<ClientCryptState>>;
-async fn connect_tcp(
+pub async fn handle(
+ server: Arc<Mutex<Server>>,
+ server_addr: SocketAddr,
+ server_host: String,
+ username: String,
+ accept_invalid_cert: bool,
+ crypt_state_sender: oneshot::Sender<ClientCryptState>,
+ audio: Arc<Mutex<Audio>>,
+) {
+ let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await;
+ let sink = Arc::new(Mutex::new(sink));
+
+ // Handshake (omitting `Version` message for brevity)
+ authenticate(Arc::clone(&sink), username).await;
+
+ info!("Logging in...");
+
+ join!(
+ send_pings(Arc::clone(&sink), 10),
+ listen(server, sink, stream, crypt_state_sender, audio),
+ );
+}
+
+async fn connect(
server_addr: SocketAddr,
server_host: String,
accept_invalid_cert: bool,
@@ -54,24 +72,11 @@ async fn connect_tcp(
ClientControlCodec::new().framed(tls_stream).split()
}
-pub async fn connect_udp(
- crypt_state: oneshot::Receiver<ClientCryptState>,
-) -> (UdpSender, UdpReceiver) {
- // Bind UDP socket
- let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
- .await
- .expect("Failed to bind UDP socket");
-
- // Wait for initial CryptState
- let crypt_state = match crypt_state.await {
- Ok(crypt_state) => crypt_state,
- // disconnected before we received the CryptSetup packet, oh well
- Err(_) => panic!("disconnect before crypt packet received"), //TODO exit gracefully
- };
- debug!("UDP connected");
-
- // Wrap the raw UDP packets in Mumble's crypto and voice codec (CryptState does both)
- UdpFramed::new(udp_socket, crypt_state).split()
+async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) {
+ let mut msg = msgs::Authenticate::new();
+ msg.set_username(username);
+ msg.set_opus(true);
+ sink.lock().unwrap().send(msg.into()).await.unwrap();
}
async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) {
@@ -84,14 +89,7 @@ async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) {
}
}
-async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) {
- let mut msg = msgs::Authenticate::new();
- msg.set_username(username);
- msg.set_opus(true);
- sink.lock().unwrap().send(msg.into()).await.unwrap();
-}
-
-async fn listen_tcp(
+async fn listen(
server: Arc<Mutex<Server>>,
sink: Arc<Mutex<TcpSender>>,
mut stream: TcpReceiver,
@@ -180,120 +178,3 @@ async fn listen_tcp(
}
}
}
-
-pub async fn handle_tcp(
- server: Arc<Mutex<Server>>,
- server_addr: SocketAddr,
- server_host: String,
- username: String,
- accept_invalid_cert: bool,
- crypt_state_sender: oneshot::Sender<ClientCryptState>,
- audio: Arc<Mutex<Audio>>,
-) {
- let (sink, stream) = connect_tcp(server_addr, server_host, accept_invalid_cert).await;
- let sink = Arc::new(Mutex::new(sink));
-
- // Handshake (omitting `Version` message for brevity)
- authenticate(Arc::clone(&sink), username).await;
-
- info!("Logging in...");
-
- join!(
- send_pings(Arc::clone(&sink), 10),
- listen_tcp(server, sink, stream, crypt_state_sender, audio),
- );
-}
-
-async fn listen_udp(
- _sink: Arc<Mutex<UdpSender>>,
- mut source: UdpReceiver,
- audio: Arc<Mutex<Audio>>,
-) {
- while let Some(packet) = source.next().await {
- let (packet, _src_addr) = match packet {
- Ok(packet) => packet,
- Err(err) => {
- warn!("Got an invalid UDP packet: {}", err);
- // To be expected, considering this is the internet, just ignore it
- continue;
- }
- };
- match packet {
- VoicePacket::Ping { .. } => {
- // Note: A normal application would handle these and only use UDP for voice
- // once it has received one.
- continue;
- }
- VoicePacket::Audio {
- session_id,
- // seq_num,
- payload,
- // position_info,
- ..
- } => {
- audio.lock().unwrap().decode_packet(session_id, payload);
- }
- }
- }
-}
-
-async fn send_ping_udp(sink: &mut UdpSender, server_addr: SocketAddr) {
- sink.send((
- VoicePacket::Audio {
- _dst: std::marker::PhantomData,
- target: 0,
- session_id: (),
- seq_num: 0,
- payload: VoicePacketPayload::Opus(Bytes::from([0u8; 128].as_ref()), true),
- position_info: None,
- },
- server_addr,
- ))
- .await
- .unwrap();
-}
-
-async fn send_voice_udp(
- sink: Arc<Mutex<UdpSender>>,
- server_addr: SocketAddr,
- audio: Arc<Mutex<Audio>>,
-) {
- let mut receiver = audio.lock().unwrap().take_receiver().unwrap();
-
- let mut count = 0;
- while let Some(payload) = receiver.recv().await {
- let reply = VoicePacket::Audio {
- _dst: std::marker::PhantomData,
- target: 0, // normal speech
- session_id: (), // unused for server-bound packets
- seq_num: count,
- payload,
- position_info: None,
- };
- count += 1;
- sink.lock()
- .unwrap()
- .send((reply, server_addr))
- .await
- .unwrap();
- }
-}
-
-pub async fn handle_udp(
- server_addr: SocketAddr,
- crypt_state: oneshot::Receiver<ClientCryptState>,
- audio: Arc<Mutex<Audio>>,
-) {
- let (mut sink, source) = connect_udp(crypt_state).await;
-
- // Note: A normal application would also send periodic Ping packets, and its own audio
- // via UDP. We instead trick the server into accepting us by sending it one
- // dummy voice packet.
- send_ping_udp(&mut sink, server_addr).await;
-
- let sink = Arc::new(Mutex::new(sink));
- join!(
- listen_udp(Arc::clone(&sink), source, Arc::clone(&audio)),
- send_voice_udp(sink, server_addr, audio)
- );
-}
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
new file mode 100644
index 0000000..39f16b6
--- /dev/null
+++ b/mumd/src/network/udp.rs
@@ -0,0 +1,131 @@
+use crate::audio::Audio;
+use log::*;
+
+use bytes::Bytes;
+use futures::channel::oneshot;
+use futures::{join, SinkExt, StreamExt};
+use futures_util::stream::{SplitSink, SplitStream};
+use mumble_protocol::crypt::ClientCryptState;
+use mumble_protocol::voice::{VoicePacket, VoicePacketPayload};
+use mumble_protocol::Serverbound;
+use std::net::{Ipv6Addr, SocketAddr};
+use std::sync::{Arc, Mutex};
+use tokio::net::UdpSocket;
+use tokio_util::udp::UdpFramed;
+
+type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>;
+type UdpReceiver = SplitStream<UdpFramed<ClientCryptState>>;
+
+pub async fn connect(
+ crypt_state: oneshot::Receiver<ClientCryptState>,
+) -> (UdpSender, UdpReceiver) {
+ // Bind UDP socket
+ let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
+ .await
+ .expect("Failed to bind UDP socket");
+
+ // Wait for initial CryptState
+ let crypt_state = match crypt_state.await {
+ Ok(crypt_state) => crypt_state,
+ // disconnected before we received the CryptSetup packet, oh well
+ Err(_) => panic!("disconnect before crypt packet received"), //TODO exit gracefully
+ };
+ debug!("UDP connected");
+
+ // Wrap the raw UDP packets in Mumble's crypto and voice codec (CryptState does both)
+ UdpFramed::new(udp_socket, crypt_state).split()
+}
+
+async fn listen(
+ _sink: Arc<Mutex<UdpSender>>,
+ mut source: UdpReceiver,
+ audio: Arc<Mutex<Audio>>,
+) {
+ while let Some(packet) = source.next().await {
+ let (packet, _src_addr) = match packet {
+ Ok(packet) => packet,
+ Err(err) => {
+ warn!("Got an invalid UDP packet: {}", err);
+ // To be expected, considering this is the internet, just ignore it
+ continue;
+ }
+ };
+ match packet {
+ VoicePacket::Ping { .. } => {
+ // Note: A normal application would handle these and only use UDP for voice
+ // once it has received one.
+ continue;
+ }
+ VoicePacket::Audio {
+ session_id,
+ // seq_num,
+ payload,
+ // position_info,
+ ..
+ } => {
+ audio.lock().unwrap().decode_packet(session_id, payload);
+ }
+ }
+ }
+}
+
+async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) {
+ sink.send((
+ VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0,
+ session_id: (),
+ seq_num: 0,
+ payload: VoicePacketPayload::Opus(Bytes::from([0u8; 128].as_ref()), true),
+ position_info: None,
+ },
+ server_addr,
+ ))
+ .await
+ .unwrap();
+}
+
+async fn send_voice(
+ sink: Arc<Mutex<UdpSender>>,
+ server_addr: SocketAddr,
+ audio: Arc<Mutex<Audio>>,
+) {
+ let mut receiver = audio.lock().unwrap().take_receiver().unwrap();
+
+ let mut count = 0;
+ while let Some(payload) = receiver.recv().await {
+ let reply = VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0, // normal speech
+ session_id: (), // unused for server-bound packets
+ seq_num: count,
+ payload,
+ position_info: None,
+ };
+ count += 1;
+ sink.lock()
+ .unwrap()
+ .send((reply, server_addr))
+ .await
+ .unwrap();
+ }
+}
+
+pub async fn handle(
+ server_addr: SocketAddr,
+ crypt_state: oneshot::Receiver<ClientCryptState>,
+ audio: Arc<Mutex<Audio>>,
+) {
+ let (mut sink, source) = connect(crypt_state).await;
+
+ // Note: A normal application would also send periodic Ping packets, and its own audio
+ // via UDP. We instead trick the server into accepting us by sending it one
+ // dummy voice packet.
+ send_ping(&mut sink, server_addr).await;
+
+ let sink = Arc::new(Mutex::new(sink));
+ join!(
+ listen(Arc::clone(&sink), source, Arc::clone(&audio)),
+ send_voice(sink, server_addr, audio)
+ );
+}