aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2021-03-30 16:15:53 +0200
committerGustav Sörnäs <gustav@sornas.net>2021-03-30 16:15:53 +0200
commit1d331f0707eaa4a056aa6261410fb1edb63097b7 (patch)
tree3beca0c239306bd1ed6f9ee792abd5a855e190f7 /mumd/src/network
parent79702d18bbd23df2faf0c00b0d9537ce26508f6b (diff)
downloadmum-1d331f0707eaa4a056aa6261410fb1edb63097b7.tar.gz
report tcp errors all the way
Diffstat (limited to 'mumd/src/network')
-rw-r--r--mumd/src/network/tcp.rs46
1 files changed, 25 insertions, 21 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 6460cba..9b0b68e 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -4,6 +4,7 @@ use crate::state::{State, StatePhase};
use log::*;
use futures_util::{FutureExt, SinkExt, StreamExt};
+use futures_util::select;
use futures_util::stream::{SplitSink, SplitStream, Stream};
use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket};
use mumble_protocol::crypt::ClientCryptState;
@@ -20,7 +21,6 @@ use tokio_native_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
use super::{run_until, VoiceStreamType};
-use futures_util::future::join5;
type TcpSender = SplitSink<
Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>,
@@ -114,27 +114,30 @@ pub async fn handle(
info!("Logging in...");
+ let phase_watcher_inner = phase_watcher.clone();
+
run_until(
|phase| matches!(phase, StatePhase::Disconnected),
- //TODO take out the errors here and return them
- join5(
- send_pings(packet_sender.clone(), 10),
- listen(
- Arc::clone(&state),
- stream,
- crypt_state_sender.clone(),
- event_queue.clone(),
- ),
- send_voice(
- packet_sender.clone(),
- Arc::clone(&input_receiver),
- phase_watcher.clone(),
- ),
- send_packets(sink, &mut packet_receiver),
- register_events(&mut tcp_event_register_receiver, event_queue.clone()),
- ).map(|_| ()),
+ async {
+ select! {
+ r = send_pings(packet_sender.clone(), 10).fuse() => r,
+ r = listen(
+ Arc::clone(&state),
+ stream,
+ crypt_state_sender.clone(),
+ event_queue.clone(),
+ ).fuse() => r,
+ r = send_voice(
+ packet_sender.clone(),
+ Arc::clone(&input_receiver),
+ phase_watcher_inner,
+ ).fuse() => r,
+ r = send_packets(sink, &mut packet_receiver).fuse() => r,
+ _ = register_events(&mut tcp_event_register_receiver, event_queue.clone()).fuse() => Ok(()),
+ }
+ },
phase_watcher,
- ).await;
+ ).await.unwrap_or(Ok(()))?;
event_queue.resolve(TcpEventData::Disconnected).await;
@@ -209,7 +212,7 @@ async fn send_voice(
packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
receiver: Arc<Mutex<Box<(dyn Stream<Item = VoicePacket<Serverbound>> + Unpin)>>>,
phase_watcher: watch::Receiver<StatePhase>,
-) {
+) -> Result<(), TcpError> {
loop {
let mut inner_phase_watcher = phase_watcher.clone();
loop {
@@ -243,7 +246,7 @@ async fn listen(
mut stream: TcpReceiver,
crypt_state_sender: mpsc::Sender<ClientCryptState>,
event_queue: TcpEventQueue,
-) {
+) -> Result<(), TcpError> {
let mut crypt_state = None;
let mut crypt_state_sender = Some(crypt_state_sender);
@@ -369,6 +372,7 @@ async fn listen(
}
}
}
+ Ok(())
}
async fn register_events(