diff options
| author | Eskil Queseth <eskilq@kth.se> | 2020-10-14 17:45:04 +0200 |
|---|---|---|
| committer | Eskil Queseth <eskilq@kth.se> | 2020-10-14 17:45:04 +0200 |
| commit | ab0cdc240c65fdc6b764ed17f6611786d449acc3 (patch) | |
| tree | bbad07ff338616c17208cf257eb3c6d359eb857e /mumd/src/network/tcp.rs | |
| parent | e4406676a28f2dfb756f8f9e38a4242166f19c0e (diff) | |
| download | mum-ab0cdc240c65fdc6b764ed17f6611786d449acc3.tar.gz | |
add support for reconnecting to server
Diffstat (limited to 'mumd/src/network/tcp.rs')
| -rw-r--r-- | mumd/src/network/tcp.rs | 65 |
1 files changed, 33 insertions, 32 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 1e0feee..d45b49d 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -2,7 +2,6 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; -use tokio::sync::oneshot; use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; @@ -12,7 +11,7 @@ use std::convert::{Into, TryInto}; use std::net::{SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, oneshot}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -27,37 +26,39 @@ type TcpReceiver = pub async fn handle( state: Arc<Mutex<State>>, mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, - crypt_state_sender: oneshot::Sender<ClientCryptState>, - packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, + crypt_state_sender: mpsc::Sender<ClientCryptState>, + mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, ) { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { return; } - Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } - } - }; - let (mut sink, stream) = connect(connection_info.socket_addr, - connection_info.hostname, - connection_info.accept_invalid_cert) - .await; + loop { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; + let (mut sink, stream) = connect(connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert) + .await; - // Handshake (omitting `Version` message for brevity) - let state_lock = state.lock().unwrap(); - authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; - let phase_watcher = state_lock.phase_receiver(); - let packet_sender = state_lock.packet_sender(); - drop(state_lock); + // Handshake (omitting `Version` message for brevity) + let state_lock = state.lock().unwrap(); + authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; + let phase_watcher = state_lock.phase_receiver(); + let packet_sender = state_lock.packet_sender(); + drop(state_lock); - info!("Logging in..."); + info!("Logging in..."); - join!( - send_pings(packet_sender, 10, phase_watcher.clone()), - listen(state, stream, crypt_state_sender, phase_watcher.clone()), - send_packets(sink, packet_receiver, phase_watcher), - ); + join!( + send_pings(packet_sender, 10, phase_watcher.clone()), + listen(Arc::clone(&state), stream, crypt_state_sender.clone(), phase_watcher.clone()), + send_packets(sink, &mut packet_receiver, phase_watcher), + ); - debug!("Fully disconnected TCP stream"); + debug!("Fully disconnected TCP stream, waiting for new connection info"); + } } async fn connect( @@ -134,7 +135,7 @@ async fn send_pings( async fn send_packets( mut sink: TcpSender, - mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, + packet_receiver: &mut mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, mut phase_watcher: watch::Receiver<StatePhase>, ) { let (tx, rx) = oneshot::channel(); @@ -181,7 +182,7 @@ async fn send_packets( async fn listen( state: Arc<Mutex<State>>, mut stream: TcpReceiver, - crypt_state_sender: oneshot::Sender<ClientCryptState>, + crypt_state_sender: mpsc::Sender<ClientCryptState>, mut phase_watcher: watch::Receiver<StatePhase>, ) { let mut crypt_state = None; @@ -238,12 +239,12 @@ async fn listen( } ControlPacket::ServerSync(msg) => { info!("Logged in"); - if let Some(sender) = crypt_state_sender.take() { + if let Some(mut sender) = crypt_state_sender.take() { let _ = sender.send( crypt_state .take() .expect("Server didn't send us any CryptSetup packet!"), - ); + ).await; } let mut state = state.lock().unwrap(); let server = state.server_mut(); |
