diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2021-03-30 16:15:53 +0200 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2021-03-30 16:15:53 +0200 |
| commit | 1d331f0707eaa4a056aa6261410fb1edb63097b7 (patch) | |
| tree | 3beca0c239306bd1ed6f9ee792abd5a855e190f7 /mumd/src/network | |
| parent | 79702d18bbd23df2faf0c00b0d9537ce26508f6b (diff) | |
| download | mum-1d331f0707eaa4a056aa6261410fb1edb63097b7.tar.gz | |
report tcp errors all the way
Diffstat (limited to 'mumd/src/network')
| -rw-r--r-- | mumd/src/network/tcp.rs | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 6460cba..9b0b68e 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -4,6 +4,7 @@ use crate::state::{State, StatePhase}; use log::*; use futures_util::{FutureExt, SinkExt, StreamExt}; +use futures_util::select; use futures_util::stream::{SplitSink, SplitStream, Stream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; @@ -20,7 +21,6 @@ use tokio_native_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; use super::{run_until, VoiceStreamType}; -use futures_util::future::join5; type TcpSender = SplitSink< Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>, @@ -114,27 +114,30 @@ pub async fn handle( info!("Logging in..."); + let phase_watcher_inner = phase_watcher.clone(); + run_until( |phase| matches!(phase, StatePhase::Disconnected), - //TODO take out the errors here and return them - join5( - send_pings(packet_sender.clone(), 10), - listen( - Arc::clone(&state), - stream, - crypt_state_sender.clone(), - event_queue.clone(), - ), - send_voice( - packet_sender.clone(), - Arc::clone(&input_receiver), - phase_watcher.clone(), - ), - send_packets(sink, &mut packet_receiver), - register_events(&mut tcp_event_register_receiver, event_queue.clone()), - ).map(|_| ()), + async { + select! { + r = send_pings(packet_sender.clone(), 10).fuse() => r, + r = listen( + Arc::clone(&state), + stream, + crypt_state_sender.clone(), + event_queue.clone(), + ).fuse() => r, + r = send_voice( + packet_sender.clone(), + Arc::clone(&input_receiver), + phase_watcher_inner, + ).fuse() => r, + r = send_packets(sink, &mut packet_receiver).fuse() => r, + _ = register_events(&mut tcp_event_register_receiver, event_queue.clone()).fuse() => Ok(()), + } + }, phase_watcher, - ).await; + ).await.unwrap_or(Ok(()))?; event_queue.resolve(TcpEventData::Disconnected).await; @@ -209,7 +212,7 @@ async fn send_voice( packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, receiver: Arc<Mutex<Box<(dyn Stream<Item = VoicePacket<Serverbound>> + Unpin)>>>, phase_watcher: watch::Receiver<StatePhase>, -) { +) -> Result<(), TcpError> { loop { let mut inner_phase_watcher = phase_watcher.clone(); loop { @@ -243,7 +246,7 @@ async fn listen( mut stream: TcpReceiver, crypt_state_sender: mpsc::Sender<ClientCryptState>, event_queue: TcpEventQueue, -) { +) -> Result<(), TcpError> { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); @@ -369,6 +372,7 @@ async fn listen( } } } + Ok(()) } async fn register_events( |
