From 63c2b18a558dc9c22be61c0068fd2ae62b188dea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 00:40:38 +0200 Subject: split network.rs to tcp.rs and udp.rs (#4) --- mumd/src/main.rs | 4 +- mumd/src/network.rs | 299 ------------------------------------------------ mumd/src/network/mod.rs | 2 + mumd/src/network/tcp.rs | 180 +++++++++++++++++++++++++++++ mumd/src/network/udp.rs | 131 +++++++++++++++++++++ 5 files changed, 315 insertions(+), 301 deletions(-) delete mode 100644 mumd/src/network.rs create mode 100644 mumd/src/network/mod.rs create mode 100644 mumd/src/network/tcp.rs create mode 100644 mumd/src/network/udp.rs diff --git a/mumd/src/main.rs b/mumd/src/main.rs index fcffb87..2a0fcbd 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -82,7 +82,7 @@ async fn main() { // Run it join!( - network::handle_tcp( + network::tcp::handle( server_state, server_addr, server_host, @@ -91,7 +91,7 @@ async fn main() { crypt_state_sender, Arc::clone(&audio), ), - network::handle_udp( + network::udp::handle( server_addr, crypt_state_receiver, audio, diff --git a/mumd/src/network.rs b/mumd/src/network.rs deleted file mode 100644 index 947612f..0000000 --- a/mumd/src/network.rs +++ /dev/null @@ -1,299 +0,0 @@ -use crate::audio::Audio; -use crate::state::Server; - -use bytes::Bytes; -use futures::channel::oneshot; -use futures::{join, SinkExt, StreamExt}; -use futures_util::stream::{SplitSink, SplitStream}; -use log::*; -use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; -use mumble_protocol::crypt::ClientCryptState; -use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; -use mumble_protocol::{Clientbound, Serverbound}; -use std::convert::{Into, TryInto}; -use std::net::{Ipv6Addr, SocketAddr}; -use std::sync::{Arc, Mutex}; -use tokio::net::{TcpStream, UdpSocket}; -use tokio::time::{self, Duration}; -use tokio_tls::{TlsConnector, TlsStream}; -use tokio_util::codec::{Decoder, Framed}; -use tokio_util::udp::UdpFramed; - -type TcpSender = SplitSink< - Framed, ControlCodec>, - ControlPacket, ->; -type TcpReceiver = - SplitStream, ControlCodec>>; -type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; -type UdpReceiver = SplitStream>; - -async fn connect_tcp( - server_addr: SocketAddr, - server_host: String, - accept_invalid_cert: bool, -) -> (TcpSender, TcpReceiver) { - let stream = TcpStream::connect(&server_addr) - .await - .expect("failed to connect to server:"); - debug!("TCP connected"); - - let mut builder = native_tls::TlsConnector::builder(); - builder.danger_accept_invalid_certs(accept_invalid_cert); - let connector: TlsConnector = builder - .build() - .expect("failed to create TLS connector") - .into(); - let tls_stream = connector - .connect(&server_host, stream) - .await - .expect("failed to connect TLS: {}"); - debug!("TLS connected"); - - // Wrap the TLS stream with Mumble's client-side control-channel codec - ClientControlCodec::new().framed(tls_stream).split() -} - -pub async fn connect_udp( - crypt_state: oneshot::Receiver, -) -> (UdpSender, UdpReceiver) { - // Bind UDP socket - let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) - .await - .expect("Failed to bind UDP socket"); - - // Wait for initial CryptState - let crypt_state = match crypt_state.await { - Ok(crypt_state) => crypt_state, - // disconnected before we received the CryptSetup packet, oh well - Err(_) => panic!("disconnect before crypt packet received"), //TODO exit gracefully - }; - debug!("UDP connected"); - - // Wrap the raw UDP packets in Mumble's crypto and voice codec (CryptState does both) - UdpFramed::new(udp_socket, crypt_state).split() -} - -async fn send_pings(sink: Arc>, delay_seconds: u64) { - let mut interval = time::interval(Duration::from_secs(delay_seconds)); - loop { - interval.tick().await; - trace!("Sending ping"); - let msg = msgs::Ping::new(); - sink.lock().unwrap().send(msg.into()).await.unwrap(); - } -} - -async fn authenticate(sink: Arc>, username: String) { - let mut msg = msgs::Authenticate::new(); - msg.set_username(username); - msg.set_opus(true); - sink.lock().unwrap().send(msg.into()).await.unwrap(); -} - -async fn listen_tcp( - server: Arc>, - sink: Arc>, - mut stream: TcpReceiver, - crypt_state_sender: oneshot::Sender, - audio: Arc>, -) { - let mut crypt_state = None; - let mut crypt_state_sender = Some(crypt_state_sender); - - while let Some(packet) = stream.next().await { - //TODO handle types separately - match packet.unwrap() { - ControlPacket::TextMessage(mut msg) => { - info!( - "Got message from user with session ID {}: {}", - msg.get_actor(), - msg.get_message() - ); - // Send reply back to server - let mut response = msgs::TextMessage::new(); - response.mut_session().push(msg.get_actor()); - response.set_message(msg.take_message()); - let mut lock = sink.lock().unwrap(); - lock.send(response.into()).await.unwrap(); - } - ControlPacket::CryptSetup(msg) => { - debug!("Crypt setup"); - // Wait until we're fully connected before initiating UDP voice - crypt_state = Some(ClientCryptState::new_from( - msg.get_key() - .try_into() - .expect("Server sent private key with incorrect size"), - msg.get_client_nonce() - .try_into() - .expect("Server sent client_nonce with incorrect size"), - msg.get_server_nonce() - .try_into() - .expect("Server sent server_nonce with incorrect size"), - )); - } - ControlPacket::ServerSync(msg) => { - info!("Logged in"); - if let Some(sender) = crypt_state_sender.take() { - let _ = sender.send( - crypt_state - .take() - .expect("Server didn't send us any CryptSetup packet!"), - ); - } - let mut server = server.lock().unwrap(); - server.parse_server_sync(msg); - match &server.welcome_text { - Some(s) => info!("Welcome: {}", s), - None => info!("No welcome received"), - } - for (_, channel) in server.channels() { - info!("Found channel {}", channel.name()); - } - sink.lock().unwrap().send(msgs::UserList::new().into()).await.unwrap(); - } - ControlPacket::Reject(msg) => { - warn!("Login rejected: {:?}", msg); - } - ControlPacket::UserState(msg) => { - audio.lock().unwrap().add_client(msg.get_session()); - let mut server = server.lock().unwrap(); - let session = msg.get_session(); - server.parse_user_state(msg); - let user = server.users().get(&session).unwrap(); - info!("User {} connected to {}", - user.name(), - user.channel()); - } - ControlPacket::UserRemove(msg) => { - info!("User {} left", msg.get_session()); - audio.lock().unwrap().remove_client(msg.get_session()); - } - ControlPacket::ChannelState(msg) => { - debug!("Channel state received"); - server.lock().unwrap().parse_channel_state(msg); - } - ControlPacket::ChannelRemove(msg) => { - server.lock().unwrap().parse_channel_remove(msg); - } - _ => {} - } - } -} - -pub async fn handle_tcp( - server: Arc>, - server_addr: SocketAddr, - server_host: String, - username: String, - accept_invalid_cert: bool, - crypt_state_sender: oneshot::Sender, - audio: Arc>, -) { - let (sink, stream) = connect_tcp(server_addr, server_host, accept_invalid_cert).await; - let sink = Arc::new(Mutex::new(sink)); - - // Handshake (omitting `Version` message for brevity) - authenticate(Arc::clone(&sink), username).await; - - info!("Logging in..."); - - join!( - send_pings(Arc::clone(&sink), 10), - listen_tcp(server, sink, stream, crypt_state_sender, audio), - ); -} - -async fn listen_udp( - _sink: Arc>, - mut source: UdpReceiver, - audio: Arc>, -) { - while let Some(packet) = source.next().await { - let (packet, _src_addr) = match packet { - Ok(packet) => packet, - Err(err) => { - warn!("Got an invalid UDP packet: {}", err); - // To be expected, considering this is the internet, just ignore it - continue; - } - }; - match packet { - VoicePacket::Ping { .. } => { - // Note: A normal application would handle these and only use UDP for voice - // once it has received one. - continue; - } - VoicePacket::Audio { - session_id, - // seq_num, - payload, - // position_info, - .. - } => { - audio.lock().unwrap().decode_packet(session_id, payload); - } - } - } -} - -async fn send_ping_udp(sink: &mut UdpSender, server_addr: SocketAddr) { - sink.send(( - VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, - session_id: (), - seq_num: 0, - payload: VoicePacketPayload::Opus(Bytes::from([0u8; 128].as_ref()), true), - position_info: None, - }, - server_addr, - )) - .await - .unwrap(); -} - -async fn send_voice_udp( - sink: Arc>, - server_addr: SocketAddr, - audio: Arc>, -) { - let mut receiver = audio.lock().unwrap().take_receiver().unwrap(); - - let mut count = 0; - while let Some(payload) = receiver.recv().await { - let reply = VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: count, - payload, - position_info: None, - }; - count += 1; - sink.lock() - .unwrap() - .send((reply, server_addr)) - .await - .unwrap(); - } -} - -pub async fn handle_udp( - server_addr: SocketAddr, - crypt_state: oneshot::Receiver, - audio: Arc>, -) { - let (mut sink, source) = connect_udp(crypt_state).await; - - // Note: A normal application would also send periodic Ping packets, and its own audio - // via UDP. We instead trick the server into accepting us by sending it one - // dummy voice packet. - send_ping_udp(&mut sink, server_addr).await; - - let sink = Arc::new(Mutex::new(sink)); - join!( - listen_udp(Arc::clone(&sink), source, Arc::clone(&audio)), - send_voice_udp(sink, server_addr, audio) - ); -} diff --git a/mumd/src/network/mod.rs b/mumd/src/network/mod.rs new file mode 100644 index 0000000..f7a6a76 --- /dev/null +++ b/mumd/src/network/mod.rs @@ -0,0 +1,2 @@ +pub mod tcp; +pub mod udp; diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs new file mode 100644 index 0000000..dde98aa --- /dev/null +++ b/mumd/src/network/tcp.rs @@ -0,0 +1,180 @@ +use crate::audio::Audio; +use crate::state::Server; +use log::*; + +use futures::channel::oneshot; +use futures::{join, SinkExt, StreamExt}; +use futures_util::stream::{SplitSink, SplitStream}; +use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; +use mumble_protocol::crypt::ClientCryptState; +use mumble_protocol::{Clientbound, Serverbound}; +use std::convert::{Into, TryInto}; +use std::net::{SocketAddr}; +use std::sync::{Arc, Mutex}; +use tokio::net::TcpStream; +use tokio::time::{self, Duration}; +use tokio_tls::{TlsConnector, TlsStream}; +use tokio_util::codec::{Decoder, Framed}; + +type TcpSender = SplitSink< + Framed, ControlCodec>, + ControlPacket, +>; +type TcpReceiver = + SplitStream, ControlCodec>>; + +pub async fn handle( + server: Arc>, + server_addr: SocketAddr, + server_host: String, + username: String, + accept_invalid_cert: bool, + crypt_state_sender: oneshot::Sender, + audio: Arc>, +) { + let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; + let sink = Arc::new(Mutex::new(sink)); + + // Handshake (omitting `Version` message for brevity) + authenticate(Arc::clone(&sink), username).await; + + info!("Logging in..."); + + join!( + send_pings(Arc::clone(&sink), 10), + listen(server, sink, stream, crypt_state_sender, audio), + ); +} + +async fn connect( + server_addr: SocketAddr, + server_host: String, + accept_invalid_cert: bool, +) -> (TcpSender, TcpReceiver) { + let stream = TcpStream::connect(&server_addr) + .await + .expect("failed to connect to server:"); + debug!("TCP connected"); + + let mut builder = native_tls::TlsConnector::builder(); + builder.danger_accept_invalid_certs(accept_invalid_cert); + let connector: TlsConnector = builder + .build() + .expect("failed to create TLS connector") + .into(); + let tls_stream = connector + .connect(&server_host, stream) + .await + .expect("failed to connect TLS: {}"); + debug!("TLS connected"); + + // Wrap the TLS stream with Mumble's client-side control-channel codec + ClientControlCodec::new().framed(tls_stream).split() +} + +async fn authenticate(sink: Arc>, username: String) { + let mut msg = msgs::Authenticate::new(); + msg.set_username(username); + msg.set_opus(true); + sink.lock().unwrap().send(msg.into()).await.unwrap(); +} + +async fn send_pings(sink: Arc>, delay_seconds: u64) { + let mut interval = time::interval(Duration::from_secs(delay_seconds)); + loop { + interval.tick().await; + trace!("Sending ping"); + let msg = msgs::Ping::new(); + sink.lock().unwrap().send(msg.into()).await.unwrap(); + } +} + +async fn listen( + server: Arc>, + sink: Arc>, + mut stream: TcpReceiver, + crypt_state_sender: oneshot::Sender, + audio: Arc>, +) { + let mut crypt_state = None; + let mut crypt_state_sender = Some(crypt_state_sender); + + while let Some(packet) = stream.next().await { + //TODO handle types separately + match packet.unwrap() { + ControlPacket::TextMessage(mut msg) => { + info!( + "Got message from user with session ID {}: {}", + msg.get_actor(), + msg.get_message() + ); + // Send reply back to server + let mut response = msgs::TextMessage::new(); + response.mut_session().push(msg.get_actor()); + response.set_message(msg.take_message()); + let mut lock = sink.lock().unwrap(); + lock.send(response.into()).await.unwrap(); + } + ControlPacket::CryptSetup(msg) => { + debug!("Crypt setup"); + // Wait until we're fully connected before initiating UDP voice + crypt_state = Some(ClientCryptState::new_from( + msg.get_key() + .try_into() + .expect("Server sent private key with incorrect size"), + msg.get_client_nonce() + .try_into() + .expect("Server sent client_nonce with incorrect size"), + msg.get_server_nonce() + .try_into() + .expect("Server sent server_nonce with incorrect size"), + )); + } + ControlPacket::ServerSync(msg) => { + info!("Logged in"); + if let Some(sender) = crypt_state_sender.take() { + let _ = sender.send( + crypt_state + .take() + .expect("Server didn't send us any CryptSetup packet!"), + ); + } + let mut server = server.lock().unwrap(); + server.parse_server_sync(msg); + match &server.welcome_text { + Some(s) => info!("Welcome: {}", s), + None => info!("No welcome received"), + } + for (_, channel) in server.channels() { + info!("Found channel {}", channel.name()); + } + sink.lock().unwrap().send(msgs::UserList::new().into()).await.unwrap(); + } + ControlPacket::Reject(msg) => { + warn!("Login rejected: {:?}", msg); + } + ControlPacket::UserState(msg) => { + audio.lock().unwrap().add_client(msg.get_session()); + let mut server = server.lock().unwrap(); + let session = msg.get_session(); + server.parse_user_state(msg); + let user = server.users().get(&session).unwrap(); + info!("User {} connected to {}", + user.name(), + user.channel()); + } + ControlPacket::UserRemove(msg) => { + info!("User {} left", msg.get_session()); + audio.lock().unwrap().remove_client(msg.get_session()); + } + ControlPacket::ChannelState(msg) => { + debug!("Channel state received"); + server.lock().unwrap().parse_channel_state(msg); + } + ControlPacket::ChannelRemove(msg) => { + server.lock().unwrap().parse_channel_remove(msg); + } + _ => {} + } + } +} diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs new file mode 100644 index 0000000..39f16b6 --- /dev/null +++ b/mumd/src/network/udp.rs @@ -0,0 +1,131 @@ +use crate::audio::Audio; +use log::*; + +use bytes::Bytes; +use futures::channel::oneshot; +use futures::{join, SinkExt, StreamExt}; +use futures_util::stream::{SplitSink, SplitStream}; +use mumble_protocol::crypt::ClientCryptState; +use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; +use mumble_protocol::Serverbound; +use std::net::{Ipv6Addr, SocketAddr}; +use std::sync::{Arc, Mutex}; +use tokio::net::UdpSocket; +use tokio_util::udp::UdpFramed; + +type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; +type UdpReceiver = SplitStream>; + +pub async fn connect( + crypt_state: oneshot::Receiver, +) -> (UdpSender, UdpReceiver) { + // Bind UDP socket + let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) + .await + .expect("Failed to bind UDP socket"); + + // Wait for initial CryptState + let crypt_state = match crypt_state.await { + Ok(crypt_state) => crypt_state, + // disconnected before we received the CryptSetup packet, oh well + Err(_) => panic!("disconnect before crypt packet received"), //TODO exit gracefully + }; + debug!("UDP connected"); + + // Wrap the raw UDP packets in Mumble's crypto and voice codec (CryptState does both) + UdpFramed::new(udp_socket, crypt_state).split() +} + +async fn listen( + _sink: Arc>, + mut source: UdpReceiver, + audio: Arc>, +) { + while let Some(packet) = source.next().await { + let (packet, _src_addr) = match packet { + Ok(packet) => packet, + Err(err) => { + warn!("Got an invalid UDP packet: {}", err); + // To be expected, considering this is the internet, just ignore it + continue; + } + }; + match packet { + VoicePacket::Ping { .. } => { + // Note: A normal application would handle these and only use UDP for voice + // once it has received one. + continue; + } + VoicePacket::Audio { + session_id, + // seq_num, + payload, + // position_info, + .. + } => { + audio.lock().unwrap().decode_packet(session_id, payload); + } + } + } +} + +async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { + sink.send(( + VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, + session_id: (), + seq_num: 0, + payload: VoicePacketPayload::Opus(Bytes::from([0u8; 128].as_ref()), true), + position_info: None, + }, + server_addr, + )) + .await + .unwrap(); +} + +async fn send_voice( + sink: Arc>, + server_addr: SocketAddr, + audio: Arc>, +) { + let mut receiver = audio.lock().unwrap().take_receiver().unwrap(); + + let mut count = 0; + while let Some(payload) = receiver.recv().await { + let reply = VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: count, + payload, + position_info: None, + }; + count += 1; + sink.lock() + .unwrap() + .send((reply, server_addr)) + .await + .unwrap(); + } +} + +pub async fn handle( + server_addr: SocketAddr, + crypt_state: oneshot::Receiver, + audio: Arc>, +) { + let (mut sink, source) = connect(crypt_state).await; + + // Note: A normal application would also send periodic Ping packets, and its own audio + // via UDP. We instead trick the server into accepting us by sending it one + // dummy voice packet. + send_ping(&mut sink, server_addr).await; + + let sink = Arc::new(Mutex::new(sink)); + join!( + listen(Arc::clone(&sink), source, Arc::clone(&audio)), + send_voice(sink, server_addr, audio) + ); +} -- cgit v1.2.1