diff options
| author | Eskil Queseth <eskilq@kth.se> | 2020-10-14 19:48:05 +0200 |
|---|---|---|
| committer | Eskil Queseth <eskilq@kth.se> | 2020-10-14 19:48:05 +0200 |
| commit | a40d365aacf118b33c07f3353f277eb96c4536a8 (patch) | |
| tree | 1a5e623da01745b3d2a2d1b1d5958a22cd0e382a /mumd/src/network | |
| parent | c0855405832ce47f75fa6e1ff7a33e51a8b36903 (diff) | |
| parent | 6ac72067a75d5e1904226efb5c45bcf0e54a0ae5 (diff) | |
| download | mum-a40d365aacf118b33c07f3353f277eb96c4536a8.tar.gz | |
Merge remote-tracking branch 'origin/commands' into main
Diffstat (limited to 'mumd/src/network')
| -rw-r--r-- | mumd/src/network/mod.rs | 19 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 370 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 236 |
3 files changed, 448 insertions, 177 deletions
diff --git a/mumd/src/network/mod.rs b/mumd/src/network/mod.rs index f7a6a76..1a31ee2 100644 --- a/mumd/src/network/mod.rs +++ b/mumd/src/network/mod.rs @@ -1,2 +1,21 @@ pub mod tcp; pub mod udp; + +use std::net::SocketAddr; + +#[derive(Clone, Debug)] +pub struct ConnectionInfo { + socket_addr: SocketAddr, + hostname: String, + accept_invalid_cert: bool, +} + +impl ConnectionInfo { + pub fn new(socket_addr: SocketAddr, hostname: String, accept_invalid_cert: bool) -> Self { + Self { + socket_addr, + hostname, + accept_invalid_cert, + } + } +} diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index dde98aa..6a369e5 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,17 +1,17 @@ -use crate::audio::Audio; -use crate::state::Server; +use crate::network::ConnectionInfo; +use crate::state::{State, StatePhase}; use log::*; -use futures::channel::oneshot; -use futures::{join, SinkExt, StreamExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::{Clientbound, Serverbound}; use std::convert::{Into, TryInto}; -use std::net::{SocketAddr}; +use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -24,26 +24,52 @@ type TcpReceiver = SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>; pub async fn handle( - server: Arc<Mutex<Server>>, - server_addr: SocketAddr, - server_host: String, - username: String, - accept_invalid_cert: bool, - crypt_state_sender: oneshot::Sender<ClientCryptState>, - audio: Arc<Mutex<Audio>>, + state: Arc<Mutex<State>>, + mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, + crypt_state_sender: mpsc::Sender<ClientCryptState>, + mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, ) { - let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; - let sink = Arc::new(Mutex::new(sink)); + loop { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { + return; + } + Some(None) => {} + Some(Some(connection_info)) => { + break connection_info; + } + } + }; + let (mut sink, stream) = connect( + connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert, + ) + .await; + + // Handshake (omitting `Version` message for brevity) + let state_lock = state.lock().unwrap(); + authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; + let phase_watcher = state_lock.phase_receiver(); + let packet_sender = state_lock.packet_sender(); + drop(state_lock); - // Handshake (omitting `Version` message for brevity) - authenticate(Arc::clone(&sink), username).await; + info!("Logging in..."); - info!("Logging in..."); + join!( + send_pings(packet_sender, 10, phase_watcher.clone()), + listen( + Arc::clone(&state), + stream, + crypt_state_sender.clone(), + phase_watcher.clone() + ), + send_packets(sink, &mut packet_receiver, phase_watcher), + ); - join!( - send_pings(Arc::clone(&sink), 10), - listen(server, sink, stream, crypt_state_sender, audio), - ); + debug!("Fully disconnected TCP stream, waiting for new connection info"); + } } async fn connect( @@ -72,109 +98,239 @@ async fn connect( ClientControlCodec::new().framed(tls_stream).split() } -async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) { +async fn authenticate(sink: &mut TcpSender, username: String) { let mut msg = msgs::Authenticate::new(); msg.set_username(username); msg.set_opus(true); - sink.lock().unwrap().send(msg.into()).await.unwrap(); + sink.send(msg.into()).await.unwrap(); } -async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) { +async fn send_pings( + packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, + delay_seconds: u64, + mut phase_watcher: watch::Receiver<StatePhase>, +) { + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} + tx.send(true).unwrap(); + }; + let mut interval = time::interval(Duration::from_secs(delay_seconds)); - loop { - interval.tick().await; - trace!("Sending ping"); - let msg = msgs::Ping::new(); - sink.lock().unwrap().send(msg.into()).await.unwrap(); - } + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let interval_waiter = interval.tick().fuse(); + pin_mut!(interval_waiter); + let exitor = select! { + data = interval_waiter => Some(data), + _ = rx => None + }; + + match exitor { + Some(_) => { + trace!("Sending ping"); + let msg = msgs::Ping::new(); + packet_sender.send(msg.into()).unwrap(); + } + None => break, + } + } + }; + + join!(main_block, phase_transition_block); + + debug!("Ping sender process killed"); +} + +async fn send_packets( + mut sink: TcpSender, + packet_receiver: &mut mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, + mut phase_watcher: watch::Receiver<StatePhase>, +) { + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} + tx.send(true).unwrap(); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = packet_receiver.recv().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + warn!("Channel closed before disconnect command"); + break; + } + Some(Some(packet)) => { + sink.send(packet).await.unwrap(); + } + } + } + + //clears queue of remaining packets + while packet_receiver.try_recv().is_ok() {} + + sink.close().await.unwrap(); + }; + + join!(main_block, phase_transition_block); + + debug!("TCP packet sender killed"); } async fn listen( - server: Arc<Mutex<Server>>, - sink: Arc<Mutex<TcpSender>>, + state: Arc<Mutex<State>>, mut stream: TcpReceiver, - crypt_state_sender: oneshot::Sender<ClientCryptState>, - audio: Arc<Mutex<Audio>>, + crypt_state_sender: mpsc::Sender<ClientCryptState>, + mut phase_watcher: watch::Receiver<StatePhase>, ) { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); - while let Some(packet) = stream.next().await { - //TODO handle types separately - match packet.unwrap() { - ControlPacket::TextMessage(mut msg) => { - info!( - "Got message from user with session ID {}: {}", - msg.get_actor(), - msg.get_message() - ); - // Send reply back to server - let mut response = msgs::TextMessage::new(); - response.mut_session().push(msg.get_actor()); - response.set_message(msg.take_message()); - let mut lock = sink.lock().unwrap(); - lock.send(response.into()).await.unwrap(); - } - ControlPacket::CryptSetup(msg) => { - debug!("Crypt setup"); - // Wait until we're fully connected before initiating UDP voice - crypt_state = Some(ClientCryptState::new_from( - msg.get_key() - .try_into() - .expect("Server sent private key with incorrect size"), - msg.get_client_nonce() - .try_into() - .expect("Server sent client_nonce with incorrect size"), - msg.get_server_nonce() - .try_into() - .expect("Server sent server_nonce with incorrect size"), - )); - } - ControlPacket::ServerSync(msg) => { - info!("Logged in"); - if let Some(sender) = crypt_state_sender.take() { - let _ = sender.send( - crypt_state - .take() - .expect("Server didn't send us any CryptSetup packet!"), - ); + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} + tx.send(true).unwrap(); + }; + + let listener_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = stream.next().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; } - let mut server = server.lock().unwrap(); - server.parse_server_sync(msg); - match &server.welcome_text { - Some(s) => info!("Welcome: {}", s), - None => info!("No welcome received"), + Some(None) => { + warn!("Channel closed before disconnect command"); + break; } - for (_, channel) in server.channels() { - info!("Found channel {}", channel.name()); + Some(Some(packet)) => { + //TODO handle types separately + match packet.unwrap() { + ControlPacket::TextMessage(msg) => { + info!( + "Got message from user with session ID {}: {}", + msg.get_actor(), + msg.get_message() + ); + } + ControlPacket::CryptSetup(msg) => { + debug!("Crypt setup"); + // Wait until we're fully connected before initiating UDP voice + crypt_state = Some(ClientCryptState::new_from( + msg.get_key() + .try_into() + .expect("Server sent private key with incorrect size"), + msg.get_client_nonce() + .try_into() + .expect("Server sent client_nonce with incorrect size"), + msg.get_server_nonce() + .try_into() + .expect("Server sent server_nonce with incorrect size"), + )); + } + ControlPacket::ServerSync(msg) => { + info!("Logged in"); + if let Some(mut sender) = crypt_state_sender.take() { + let _ = sender + .send( + crypt_state + .take() + .expect("Server didn't send us any CryptSetup packet!"), + ) + .await; + } + let mut state = state.lock().unwrap(); + let server = state.server_mut().unwrap(); + server.parse_server_sync(*msg); + match &server.welcome_text { + Some(s) => info!("Welcome: {}", s), + None => info!("No welcome received"), + } + for channel in server.channels().values() { + info!("Found channel {}", channel.name()); + } + state.initialized(); + } + ControlPacket::Reject(msg) => { + warn!("Login rejected: {:?}", msg); + } + ControlPacket::UserState(msg) => { + let mut state = state.lock().unwrap(); + let session = msg.get_session(); + state.audio_mut().add_client(msg.get_session()); //TODO + if *state.phase_receiver().borrow() == StatePhase::Connecting { + state.parse_initial_user_state(*msg); + } else { + state.server_mut().unwrap().parse_user_state(*msg); + } + let server = state.server_mut().unwrap(); + let user = server.users().get(&session).unwrap(); + info!("User {} connected to {}", user.name(), user.channel()); + } + ControlPacket::UserRemove(msg) => { + info!("User {} left", msg.get_session()); + state + .lock() + .unwrap() + .audio_mut() + .remove_client(msg.get_session()); + } + ControlPacket::ChannelState(msg) => { + debug!("Channel state received"); + state + .lock() + .unwrap() + .server_mut() + .unwrap() + .parse_channel_state(*msg); //TODO parse initial if initial + } + ControlPacket::ChannelRemove(msg) => { + state + .lock() + .unwrap() + .server_mut() + .unwrap() + .parse_channel_remove(*msg); + } + _ => {} + } } - sink.lock().unwrap().send(msgs::UserList::new().into()).await.unwrap(); - } - ControlPacket::Reject(msg) => { - warn!("Login rejected: {:?}", msg); - } - ControlPacket::UserState(msg) => { - audio.lock().unwrap().add_client(msg.get_session()); - let mut server = server.lock().unwrap(); - let session = msg.get_session(); - server.parse_user_state(msg); - let user = server.users().get(&session).unwrap(); - info!("User {} connected to {}", - user.name(), - user.channel()); - } - ControlPacket::UserRemove(msg) => { - info!("User {} left", msg.get_session()); - audio.lock().unwrap().remove_client(msg.get_session()); - } - ControlPacket::ChannelState(msg) => { - debug!("Channel state received"); - server.lock().unwrap().parse_channel_state(msg); } - ControlPacket::ChannelRemove(msg) => { - server.lock().unwrap().parse_channel_remove(msg); - } - _ => {} } - } + + //TODO? clean up stream + }; + + join!(phase_transition_block, listener_block); + + debug!("Killing TCP listener block"); } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 39f16b6..4f96c4c 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,9 +1,9 @@ -use crate::audio::Audio; +use crate::network::ConnectionInfo; +use crate::state::{State, StatePhase}; use log::*; use bytes::Bytes; -use futures::channel::oneshot; -use futures::{join, SinkExt, StreamExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; @@ -11,13 +11,57 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; +use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>; type UdpReceiver = SplitStream<UdpFramed<ClientCryptState>>; +pub async fn handle( + state: Arc<Mutex<State>>, + mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, + mut crypt_state: mpsc::Receiver<ClientCryptState>, +) { + let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); + + loop { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { + return; + } + Some(None) => {} + Some(Some(connection_info)) => { + break connection_info; + } + } + }; + let (mut sink, source) = connect(&mut crypt_state).await; + + // Note: A normal application would also send periodic Ping packets, and its own audio + // via UDP. We instead trick the server into accepting us by sending it one + // dummy voice packet. + send_ping(&mut sink, connection_info.socket_addr).await; + + let sink = Arc::new(Mutex::new(sink)); + + let phase_watcher = state.lock().unwrap().phase_receiver(); + join!( + listen(Arc::clone(&state), source, phase_watcher.clone()), + send_voice( + sink, + connection_info.socket_addr, + phase_watcher, + &mut receiver + ), + ); + + debug!("Fully disconnected UDP stream, waiting for new connection info"); + } +} + pub async fn connect( - crypt_state: oneshot::Receiver<ClientCryptState>, + crypt_state: &mut mpsc::Receiver<ClientCryptState>, ) -> (UdpSender, UdpReceiver) { // Bind UDP socket let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) @@ -25,10 +69,10 @@ pub async fn connect( .expect("Failed to bind UDP socket"); // Wait for initial CryptState - let crypt_state = match crypt_state.await { - Ok(crypt_state) => crypt_state, + let crypt_state = match crypt_state.recv().await { + Some(crypt_state) => crypt_state, // disconnected before we received the CryptSetup packet, oh well - Err(_) => panic!("disconnect before crypt packet received"), //TODO exit gracefully + None => panic!("Disconnect before crypt packet received"), //TODO exit gracefully }; debug!("UDP connected"); @@ -37,36 +81,74 @@ pub async fn connect( } async fn listen( - _sink: Arc<Mutex<UdpSender>>, + state: Arc<Mutex<State>>, mut source: UdpReceiver, - audio: Arc<Mutex<Audio>>, + mut phase_watcher: watch::Receiver<StatePhase>, ) { - while let Some(packet) = source.next().await { - let (packet, _src_addr) = match packet { - Ok(packet) => packet, - Err(err) => { - warn!("Got an invalid UDP packet: {}", err); - // To be expected, considering this is the internet, just ignore it - continue; - } - }; - match packet { - VoicePacket::Ping { .. } => { - // Note: A normal application would handle these and only use UDP for voice - // once it has received one. - continue; - } - VoicePacket::Audio { - session_id, - // seq_num, - payload, - // position_info, - .. - } => { - audio.lock().unwrap().decode_packet(session_id, payload); + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} + tx.send(true).unwrap(); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = source.next().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + warn!("Channel closed before disconnect command"); + break; + } + Some(Some(packet)) => { + let (packet, _src_addr) = match packet { + Ok(packet) => packet, + Err(err) => { + warn!("Got an invalid UDP packet: {}", err); + // To be expected, considering this is the internet, just ignore it + continue; + } + }; + match packet { + VoicePacket::Ping { .. } => { + // Note: A normal application would handle these and only use UDP for voice + // once it has received one. + continue; + } + VoicePacket::Audio { + session_id, + // seq_num, + payload, + // position_info, + .. + } => { + state + .lock() + .unwrap() + .audio() + .decode_packet(session_id, payload); + } + } + } } } - } + }; + + join!(main_block, phase_transition_block); + + debug!("UDP listener process killed"); } async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { @@ -88,44 +170,58 @@ async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { async fn send_voice( sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr, - audio: Arc<Mutex<Audio>>, + mut phase_watcher: watch::Receiver<StatePhase>, + receiver: &mut mpsc::Receiver<VoicePacketPayload>, ) { - let mut receiver = audio.lock().unwrap().take_receiver().unwrap(); + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} + tx.send(true).unwrap(); + }; - let mut count = 0; - while let Some(payload) = receiver.recv().await { - let reply = VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: count, - payload, - position_info: None, - }; - count += 1; - sink.lock() - .unwrap() - .send((reply, server_addr)) - .await - .unwrap(); - } -} + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + let mut count = 0; + loop { + let packet_recv = receiver.recv().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + warn!("Channel closed before disconnect command"); + break; + } + Some(Some(payload)) => { + let reply = VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: count, + payload, + position_info: None, + }; + count += 1; + sink.lock() + .unwrap() + .send((reply, server_addr)) + .await + .unwrap(); + } + } + } + }; -pub async fn handle( - server_addr: SocketAddr, - crypt_state: oneshot::Receiver<ClientCryptState>, - audio: Arc<Mutex<Audio>>, -) { - let (mut sink, source) = connect(crypt_state).await; - - // Note: A normal application would also send periodic Ping packets, and its own audio - // via UDP. We instead trick the server into accepting us by sending it one - // dummy voice packet. - send_ping(&mut sink, server_addr).await; - - let sink = Arc::new(Mutex::new(sink)); - join!( - listen(Arc::clone(&sink), source, Arc::clone(&audio)), - send_voice(sink, server_addr, audio) - ); + join!(main_block, phase_transition_block); + + debug!("UDP sender process killed"); } |
