diff options
| author | Eskil Queseth <eskilq@kth.se> | 2020-10-14 17:45:04 +0200 |
|---|---|---|
| committer | Eskil Queseth <eskilq@kth.se> | 2020-10-14 17:45:04 +0200 |
| commit | ab0cdc240c65fdc6b764ed17f6611786d449acc3 (patch) | |
| tree | bbad07ff338616c17208cf257eb3c6d359eb857e /mumd/src/network | |
| parent | e4406676a28f2dfb756f8f9e38a4242166f19c0e (diff) | |
| download | mum-ab0cdc240c65fdc6b764ed17f6611786d449acc3.tar.gz | |
add support for reconnecting to server
Diffstat (limited to 'mumd/src/network')
| -rw-r--r-- | mumd/src/network/tcp.rs | 65 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 105 |
2 files changed, 73 insertions, 97 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 1e0feee..d45b49d 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -2,7 +2,6 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; -use tokio::sync::oneshot; use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; @@ -12,7 +11,7 @@ use std::convert::{Into, TryInto}; use std::net::{SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, oneshot}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -27,37 +26,39 @@ type TcpReceiver = pub async fn handle( state: Arc<Mutex<State>>, mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, - crypt_state_sender: oneshot::Sender<ClientCryptState>, - packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, + crypt_state_sender: mpsc::Sender<ClientCryptState>, + mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, ) { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { return; } - Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } - } - }; - let (mut sink, stream) = connect(connection_info.socket_addr, - connection_info.hostname, - connection_info.accept_invalid_cert) - .await; + loop { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; + let (mut sink, stream) = connect(connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert) + .await; - // Handshake (omitting `Version` message for brevity) - let state_lock = state.lock().unwrap(); - authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; - let phase_watcher = state_lock.phase_receiver(); - let packet_sender = state_lock.packet_sender(); - drop(state_lock); + // Handshake (omitting `Version` message for brevity) + let state_lock = state.lock().unwrap(); + authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; + let phase_watcher = state_lock.phase_receiver(); + let packet_sender = state_lock.packet_sender(); + drop(state_lock); - info!("Logging in..."); + info!("Logging in..."); - join!( - send_pings(packet_sender, 10, phase_watcher.clone()), - listen(state, stream, crypt_state_sender, phase_watcher.clone()), - send_packets(sink, packet_receiver, phase_watcher), - ); + join!( + send_pings(packet_sender, 10, phase_watcher.clone()), + listen(Arc::clone(&state), stream, crypt_state_sender.clone(), phase_watcher.clone()), + send_packets(sink, &mut packet_receiver, phase_watcher), + ); - debug!("Fully disconnected TCP stream"); + debug!("Fully disconnected TCP stream, waiting for new connection info"); + } } async fn connect( @@ -134,7 +135,7 @@ async fn send_pings( async fn send_packets( mut sink: TcpSender, - mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, + packet_receiver: &mut mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, mut phase_watcher: watch::Receiver<StatePhase>, ) { let (tx, rx) = oneshot::channel(); @@ -181,7 +182,7 @@ async fn send_packets( async fn listen( state: Arc<Mutex<State>>, mut stream: TcpReceiver, - crypt_state_sender: oneshot::Sender<ClientCryptState>, + crypt_state_sender: mpsc::Sender<ClientCryptState>, mut phase_watcher: watch::Receiver<StatePhase>, ) { let mut crypt_state = None; @@ -238,12 +239,12 @@ async fn listen( } ControlPacket::ServerSync(msg) => { info!("Logged in"); - if let Some(sender) = crypt_state_sender.take() { + if let Some(mut sender) = crypt_state_sender.take() { let _ = sender.send( crypt_state .take() .expect("Server didn't send us any CryptSetup packet!"), - ); + ).await; } let mut state = state.lock().unwrap(); let server = state.server_mut(); diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index a757a2b..45e6e80 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -11,14 +11,48 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::{watch, oneshot}; +use tokio::sync::{watch, oneshot, mpsc}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>; type UdpReceiver = SplitStream<UdpFramed<ClientCryptState>>; +pub async fn handle( + state: Arc<Mutex<State>>, + mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, + mut crypt_state: mpsc::Receiver<ClientCryptState>, +) { + let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); + + loop { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; + let (mut sink, source) = connect(&mut crypt_state).await; + + // Note: A normal application would also send periodic Ping packets, and its own audio + // via UDP. We instead trick the server into accepting us by sending it one + // dummy voice packet. + send_ping(&mut sink, connection_info.socket_addr).await; + + let sink = Arc::new(Mutex::new(sink)); + + let phase_watcher = state.lock().unwrap().phase_receiver(); + join!( + listen(Arc::clone(&state), source, phase_watcher.clone()), + send_voice(sink, connection_info.socket_addr, phase_watcher, &mut receiver), + ); + + debug!("Fully disconnected UDP stream, waiting for new connection info"); + } +} + pub async fn connect( - crypt_state: oneshot::Receiver<ClientCryptState>, + crypt_state: &mut mpsc::Receiver<ClientCryptState>, ) -> (UdpSender, UdpReceiver) { // Bind UDP socket let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) @@ -26,10 +60,10 @@ pub async fn connect( .expect("Failed to bind UDP socket"); // Wait for initial CryptState - let crypt_state = match crypt_state.await { - Ok(crypt_state) => crypt_state, + let crypt_state = match crypt_state.recv().await { + Some(crypt_state) => crypt_state, // disconnected before we received the CryptSetup packet, oh well - Err(_) => panic!("disconnect before crypt packet received"), //TODO exit gracefully + None => panic!("disconnect before crypt packet received"), //TODO exit gracefully }; debug!("UDP connected"); @@ -37,36 +71,6 @@ pub async fn connect( UdpFramed::new(udp_socket, crypt_state).split() } -pub async fn handle( - state: Arc<Mutex<State>>, - mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, - crypt_state: oneshot::Receiver<ClientCryptState>, -) { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { return; } - Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } - } - }; - let (mut sink, source) = connect(crypt_state).await; - - // Note: A normal application would also send periodic Ping packets, and its own audio - // via UDP. We instead trick the server into accepting us by sending it one - // dummy voice packet. - send_ping(&mut sink, connection_info.socket_addr).await; - - let sink = Arc::new(Mutex::new(sink)); - - let phase_watcher = state.lock().unwrap().phase_receiver(); - join!( - listen(Arc::clone(&state), source, phase_watcher.clone()), - send_voice(state, sink, connection_info.socket_addr, phase_watcher), - ); - - debug!("Fully disconnected UPD stream"); -} - async fn listen( state: Arc<Mutex<State>>, mut source: UdpReceiver, @@ -129,33 +133,6 @@ async fn listen( join!(main_block, phase_transition_block); debug!("UDP listener process killed"); - - /*while let Some(packet) = source.next().await { - let (packet, _src_addr) = match packet { - Ok(packet) => packet, - Err(err) => { - warn!("Got an invalid UDP packet: {}", err); - // To be expected, considering this is the internet, just ignore it - continue; - } - }; - match packet { - VoicePacket::Ping { .. } => { - // Note: A normal application would handle these and only use UDP for voice - // once it has received one. - continue; - } - VoicePacket::Audio { - session_id, - // seq_num, - payload, - // position_info, - .. - } => { - state.lock().unwrap().audio().decode_packet(session_id, payload); - } - } - }*/ } async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { @@ -175,13 +152,11 @@ async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { } async fn send_voice( - state: Arc<Mutex<State>>, sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr, mut phase_watcher: watch::Receiver<StatePhase>, + receiver: &mut mpsc::Receiver<VoicePacketPayload>, ) { - let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); - let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} |
