From 96ac028918baa1094374e823a2464016f7f20479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Sat, 26 Dec 2020 22:33:10 +0100 Subject: add todos --- mumd/src/network/tcp.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'mumd/src/network/tcp.rs') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 47b1c20..09cd844 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -177,7 +177,7 @@ async fn authenticate( msg.set_password(password); } msg.set_opus(true); - sink.send(msg.into()).await.unwrap(); + sink.send(msg.into()).await.unwrap(); //TODO handle panic } async fn send_pings( @@ -189,7 +189,7 @@ async fn send_pings( interval.tick().await; trace!("Sending TCP ping"); let msg = msgs::Ping::new(); - packet_sender.send(msg.into()).unwrap(); + packet_sender.send(msg.into()).unwrap(); //TODO handle panic } } @@ -198,8 +198,8 @@ async fn send_packets( packet_receiver: &mut mpsc::UnboundedReceiver>, ) { loop { - let packet = packet_receiver.recv().await.unwrap(); - sink.send(packet).await.unwrap(); + let packet = packet_receiver.recv().await.unwrap(); //TODO handle panic + sink.send(packet).await.unwrap(); //TODO handle panic } } -- cgit v1.2.1 From 80dba403ed968982ec23ab7416d48dc5b69329f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 30 Mar 2021 14:18:18 +0200 Subject: return tcp errors from tcp internals --- mumd/src/network/tcp.rs | 42 ++++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) (limited to 'mumd/src/network/tcp.rs') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 09cd844..6460cba 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,3 +1,4 @@ +use crate::error::TcpError; use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; @@ -84,7 +85,7 @@ pub async fn handle( packet_sender: mpsc::UnboundedSender>, mut packet_receiver: mpsc::UnboundedReceiver>, mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, -) { +) -> Result<(), TcpError> { loop { let connection_info = 'data: loop { while connection_info_receiver.changed().await.is_ok() { @@ -92,20 +93,20 @@ pub async fn handle( break 'data data; } } - return; + return Err(TcpError::NoConnectionInfoReceived); }; let (mut sink, stream) = connect( connection_info.socket_addr, connection_info.hostname, connection_info.accept_invalid_cert, ) - .await; + .await?; // Handshake (omitting `Version` message for brevity) let state_lock = state.lock().await; let username = state_lock.username().unwrap().to_string(); let password = state_lock.password().map(|x| x.to_string()); - authenticate(&mut sink, username, password).await; + authenticate(&mut sink, username, password).await?; let phase_watcher = state_lock.phase_receiver(); let input_receiver = state_lock.audio().input_receiver(); drop(state_lock); @@ -115,6 +116,7 @@ pub async fn handle( run_until( |phase| matches!(phase, StatePhase::Disconnected), + //TODO take out the errors here and return them join5( send_pings(packet_sender.clone(), 10), listen( @@ -144,62 +146,62 @@ async fn connect( server_addr: SocketAddr, server_host: String, accept_invalid_cert: bool, -) -> (TcpSender, TcpReceiver) { - let stream = TcpStream::connect(&server_addr) - .await - .expect("failed to connect to server:"); +) -> Result<(TcpSender, TcpReceiver), TcpError> { + let stream = TcpStream::connect(&server_addr).await?; debug!("TCP connected"); let mut builder = native_tls::TlsConnector::builder(); builder.danger_accept_invalid_certs(accept_invalid_cert); let connector: TlsConnector = builder .build() - .expect("failed to create TLS connector") + .map_err(|e| TcpError::TlsConnectorBuilderError(e))? .into(); let tls_stream = connector .connect(&server_host, stream) .await - .expect("failed to connect TLS: {}"); + .map_err(|e| TcpError::TlsConnectError(e))?; debug!("TLS connected"); // Wrap the TLS stream with Mumble's client-side control-channel codec - ClientControlCodec::new().framed(tls_stream).split() + Ok(ClientControlCodec::new().framed(tls_stream).split()) } async fn authenticate( sink: &mut TcpSender, username: String, password: Option -) { +) -> Result<(), TcpError> { let mut msg = msgs::Authenticate::new(); msg.set_username(username); if let Some(password) = password { msg.set_password(password); } msg.set_opus(true); - sink.send(msg.into()).await.unwrap(); //TODO handle panic + sink.send(msg.into()).await?; + Ok(()) } async fn send_pings( packet_sender: mpsc::UnboundedSender>, delay_seconds: u64, -) { +) -> Result<(), TcpError> { let mut interval = time::interval(Duration::from_secs(delay_seconds)); loop { interval.tick().await; trace!("Sending TCP ping"); let msg = msgs::Ping::new(); - packet_sender.send(msg.into()).unwrap(); //TODO handle panic + packet_sender.send(msg.into())?; } } async fn send_packets( mut sink: TcpSender, packet_receiver: &mut mpsc::UnboundedReceiver>, -) { +) -> Result<(), TcpError> { loop { - let packet = packet_receiver.recv().await.unwrap(); //TODO handle panic - sink.send(packet).await.unwrap(); //TODO handle panic + // Safe since we always have at least one sender alive. + let packet = packet_receiver.recv().await.unwrap(); + sink.send(packet).await?; } } @@ -226,9 +228,9 @@ async fn send_voice( .await .next() .await - .unwrap() + .unwrap() //TODO handle panic .into()) - .unwrap(); + .unwrap(); //TODO handle panic } }, inner_phase_watcher.clone(), -- cgit v1.2.1 From 1d331f0707eaa4a056aa6261410fb1edb63097b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 30 Mar 2021 16:15:53 +0200 Subject: report tcp errors all the way --- mumd/src/network/tcp.rs | 46 +++++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 21 deletions(-) (limited to 'mumd/src/network/tcp.rs') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 6460cba..9b0b68e 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -4,6 +4,7 @@ use crate::state::{State, StatePhase}; use log::*; use futures_util::{FutureExt, SinkExt, StreamExt}; +use futures_util::select; use futures_util::stream::{SplitSink, SplitStream, Stream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; @@ -20,7 +21,6 @@ use tokio_native_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; use super::{run_until, VoiceStreamType}; -use futures_util::future::join5; type TcpSender = SplitSink< Framed, ControlCodec>, @@ -114,27 +114,30 @@ pub async fn handle( info!("Logging in..."); + let phase_watcher_inner = phase_watcher.clone(); + run_until( |phase| matches!(phase, StatePhase::Disconnected), - //TODO take out the errors here and return them - join5( - send_pings(packet_sender.clone(), 10), - listen( - Arc::clone(&state), - stream, - crypt_state_sender.clone(), - event_queue.clone(), - ), - send_voice( - packet_sender.clone(), - Arc::clone(&input_receiver), - phase_watcher.clone(), - ), - send_packets(sink, &mut packet_receiver), - register_events(&mut tcp_event_register_receiver, event_queue.clone()), - ).map(|_| ()), + async { + select! { + r = send_pings(packet_sender.clone(), 10).fuse() => r, + r = listen( + Arc::clone(&state), + stream, + crypt_state_sender.clone(), + event_queue.clone(), + ).fuse() => r, + r = send_voice( + packet_sender.clone(), + Arc::clone(&input_receiver), + phase_watcher_inner, + ).fuse() => r, + r = send_packets(sink, &mut packet_receiver).fuse() => r, + _ = register_events(&mut tcp_event_register_receiver, event_queue.clone()).fuse() => Ok(()), + } + }, phase_watcher, - ).await; + ).await.unwrap_or(Ok(()))?; event_queue.resolve(TcpEventData::Disconnected).await; @@ -209,7 +212,7 @@ async fn send_voice( packet_sender: mpsc::UnboundedSender>, receiver: Arc> + Unpin)>>>, phase_watcher: watch::Receiver, -) { +) -> Result<(), TcpError> { loop { let mut inner_phase_watcher = phase_watcher.clone(); loop { @@ -243,7 +246,7 @@ async fn listen( mut stream: TcpReceiver, crypt_state_sender: mpsc::Sender, event_queue: TcpEventQueue, -) { +) -> Result<(), TcpError> { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); @@ -369,6 +372,7 @@ async fn listen( } } } + Ok(()) } async fn register_events( -- cgit v1.2.1 From 69f189fd45b410be2db3c77e2a4bfa6d9ad8946d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 31 Mar 2021 09:05:12 +0200 Subject: review --- mumd/src/network/tcp.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'mumd/src/network/tcp.rs') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 9b0b68e..5783cc8 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,4 +1,4 @@ -use crate::error::TcpError; +use crate::error::{ServerSendError, TcpError}; use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; @@ -225,19 +225,21 @@ async fn send_voice( |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::TCP)), async { loop { - packet_sender.send( + let res: Result<(), ServerSendError> = packet_sender.send( receiver .lock() .await .next() .await - .unwrap() //TODO handle panic - .into()) - .unwrap(); //TODO handle panic + .expect("No audio stream") + .into()); + if matches!(res, Err(_)) { + return res; + } } }, inner_phase_watcher.clone(), - ).await; + ).await.unwrap_or(Ok(()))?; } } -- cgit v1.2.1 From 46a3938b6d9d81649e38e6e793599a52991d803d Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 31 Mar 2021 21:50:50 +0200 Subject: tyrbofish ? --- mumd/src/network/tcp.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'mumd/src/network/tcp.rs') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 5783cc8..6402a89 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -225,21 +225,18 @@ async fn send_voice( |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::TCP)), async { loop { - let res: Result<(), ServerSendError> = packet_sender.send( + packet_sender.send( receiver .lock() .await .next() .await .expect("No audio stream") - .into()); - if matches!(res, Err(_)) { - return res; - } + .into())?; } }, inner_phase_watcher.clone(), - ).await.unwrap_or(Ok(()))?; + ).await.unwrap_or(Ok::<(), ServerSendError>(()))?; } } -- cgit v1.2.1