aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network/tcp.rs
diff options
context:
space:
mode:
authorEskil Queseth <eskilq@kth.se>2020-10-14 17:45:04 +0200
committerEskil Queseth <eskilq@kth.se>2020-10-14 17:45:04 +0200
commitab0cdc240c65fdc6b764ed17f6611786d449acc3 (patch)
treebbad07ff338616c17208cf257eb3c6d359eb857e /mumd/src/network/tcp.rs
parente4406676a28f2dfb756f8f9e38a4242166f19c0e (diff)
downloadmum-ab0cdc240c65fdc6b764ed17f6611786d449acc3.tar.gz
add support for reconnecting to server
Diffstat (limited to 'mumd/src/network/tcp.rs')
-rw-r--r--mumd/src/network/tcp.rs65
1 files changed, 33 insertions, 32 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 1e0feee..d45b49d 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -2,7 +2,6 @@ use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
use log::*;
-use tokio::sync::oneshot;
use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt};
use futures_util::stream::{SplitSink, SplitStream};
use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket};
@@ -12,7 +11,7 @@ use std::convert::{Into, TryInto};
use std::net::{SocketAddr};
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
-use tokio::sync::{mpsc, watch};
+use tokio::sync::{mpsc, watch, oneshot};
use tokio::time::{self, Duration};
use tokio_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
@@ -27,37 +26,39 @@ type TcpReceiver =
pub async fn handle(
state: Arc<Mutex<State>>,
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
- crypt_state_sender: oneshot::Sender<ClientCryptState>,
- packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>,
+ crypt_state_sender: mpsc::Sender<ClientCryptState>,
+ mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>,
) {
- let connection_info = loop {
- match connection_info_receiver.recv().await {
- None => { return; }
- Some(None) => {}
- Some(Some(connection_info)) => { break connection_info; }
- }
- };
- let (mut sink, stream) = connect(connection_info.socket_addr,
- connection_info.hostname,
- connection_info.accept_invalid_cert)
- .await;
+ loop {
+ let connection_info = loop {
+ match connection_info_receiver.recv().await {
+ None => { return; }
+ Some(None) => {}
+ Some(Some(connection_info)) => { break connection_info; }
+ }
+ };
+ let (mut sink, stream) = connect(connection_info.socket_addr,
+ connection_info.hostname,
+ connection_info.accept_invalid_cert)
+ .await;
- // Handshake (omitting `Version` message for brevity)
- let state_lock = state.lock().unwrap();
- authenticate(&mut sink, state_lock.username().unwrap().to_string()).await;
- let phase_watcher = state_lock.phase_receiver();
- let packet_sender = state_lock.packet_sender();
- drop(state_lock);
+ // Handshake (omitting `Version` message for brevity)
+ let state_lock = state.lock().unwrap();
+ authenticate(&mut sink, state_lock.username().unwrap().to_string()).await;
+ let phase_watcher = state_lock.phase_receiver();
+ let packet_sender = state_lock.packet_sender();
+ drop(state_lock);
- info!("Logging in...");
+ info!("Logging in...");
- join!(
- send_pings(packet_sender, 10, phase_watcher.clone()),
- listen(state, stream, crypt_state_sender, phase_watcher.clone()),
- send_packets(sink, packet_receiver, phase_watcher),
- );
+ join!(
+ send_pings(packet_sender, 10, phase_watcher.clone()),
+ listen(Arc::clone(&state), stream, crypt_state_sender.clone(), phase_watcher.clone()),
+ send_packets(sink, &mut packet_receiver, phase_watcher),
+ );
- debug!("Fully disconnected TCP stream");
+ debug!("Fully disconnected TCP stream, waiting for new connection info");
+ }
}
async fn connect(
@@ -134,7 +135,7 @@ async fn send_pings(
async fn send_packets(
mut sink: TcpSender,
- mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>,
+ packet_receiver: &mut mpsc::UnboundedReceiver<ControlPacket<Serverbound>>,
mut phase_watcher: watch::Receiver<StatePhase>,
) {
let (tx, rx) = oneshot::channel();
@@ -181,7 +182,7 @@ async fn send_packets(
async fn listen(
state: Arc<Mutex<State>>,
mut stream: TcpReceiver,
- crypt_state_sender: oneshot::Sender<ClientCryptState>,
+ crypt_state_sender: mpsc::Sender<ClientCryptState>,
mut phase_watcher: watch::Receiver<StatePhase>,
) {
let mut crypt_state = None;
@@ -238,12 +239,12 @@ async fn listen(
}
ControlPacket::ServerSync(msg) => {
info!("Logged in");
- if let Some(sender) = crypt_state_sender.take() {
+ if let Some(mut sender) = crypt_state_sender.take() {
let _ = sender.send(
crypt_state
.take()
.expect("Server didn't send us any CryptSetup packet!"),
- );
+ ).await;
}
let mut state = state.lock().unwrap();
let server = state.server_mut();