diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2021-06-06 23:26:24 +0200 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2021-06-06 23:26:24 +0200 |
| commit | be76c2aa51733a0cf495e92659fbcbe527f41149 (patch) | |
| tree | 617fb1caa999c076a45233b4bedea6a78192db25 /mumd/src/network/udp.rs | |
| parent | 7fc5a1a36404ee4cbc09d20c955e6edd3d2ac523 (diff) | |
| download | mum-be76c2aa51733a0cf495e92659fbcbe527f41149.tar.gz | |
cargo fmt
Diffstat (limited to 'mumd/src/network/udp.rs')
| -rw-r--r-- | mumd/src/network/udp.rs | 83 |
1 files changed, 47 insertions, 36 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 0958912..95dcf33 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -2,9 +2,9 @@ use crate::error::UdpError; use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; -use futures_util::{FutureExt, SinkExt, StreamExt}; use futures_util::future::join4; use futures_util::stream::{SplitSink, SplitStream, Stream}; +use futures_util::{FutureExt, SinkExt, StreamExt}; use log::*; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; @@ -13,10 +13,13 @@ use mumble_protocol::Serverbound; use std::collections::{hash_map::Entry, HashMap}; use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; -use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock}; -use tokio::{join, net::UdpSocket}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, +}; use tokio::sync::{mpsc, oneshot, watch, Mutex}; use tokio::time::{interval, timeout, Duration}; +use tokio::{join, net::UdpSocket}; use tokio_util::udp::UdpFramed; use super::{run_until, VoiceStreamType}; @@ -53,11 +56,7 @@ pub async fn handle( run_until( |phase| matches!(phase, StatePhase::Disconnected), join4( - listen( - Arc::clone(&state), - Arc::clone(&source), - &last_ping_recv, - ), + listen(Arc::clone(&state), Arc::clone(&source), &last_ping_recv), send_voice( Arc::clone(&sink), connection_info.socket_addr, @@ -71,9 +70,11 @@ pub async fn handle( &last_ping_recv, ), new_crypt_state(&mut crypt_state_receiver, sink, source), - ).map(|_| ()), + ) + .map(|_| ()), phase_watcher, - ).await; + ) + .await; debug!("Fully disconnected UDP stream, waiting for new connection info"); } @@ -83,8 +84,7 @@ async fn connect( crypt_state: &mut mpsc::Receiver<ClientCryptState>, ) -> Result<(UdpSender, UdpReceiver), UdpError> { // Bind UDP socket - let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) - .await?; + let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)).await?; // Wait for initial CryptState let crypt_state = match crypt_state.recv().await { @@ -146,11 +146,11 @@ async fn listen( // position_info, .. } => { - state - .read() - .unwrap() - .audio_output() - .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); + state.read().unwrap().audio_output().decode_packet_payload( + VoiceStreamType::UDP, + session_id, + payload, + ); } } } @@ -178,12 +178,17 @@ async fn send_pings( match sink .lock() .await - .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr)) + .send(( + VoicePacket::Ping { + timestamp: last_recv + 1, + }, + server_addr, + )) .await { Ok(_) => { last_send = Some(last_recv + 1); - }, + } Err(e) => { debug!("Error sending UDP ping: {}", e); } @@ -201,7 +206,10 @@ async fn send_voice( let mut inner_phase_watcher = phase_watcher.clone(); loop { inner_phase_watcher.changed().await.unwrap(); - if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::UDP)) { + if matches!( + *inner_phase_watcher.borrow(), + StatePhase::Connected(VoiceStreamType::UDP) + ) { break; } } @@ -215,13 +223,12 @@ async fn send_voice( } }, phase_watcher.clone(), - ).await; + ) + .await; } } -pub async fn handle_pings( - mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>, -) { +pub async fn handle_pings(mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>) { let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await .expect("Failed to bind UDP socket"); @@ -246,19 +253,23 @@ pub async fn handle_pings( } tokio::spawn(async move { - handle( - match timeout(Duration::from_secs(1), rx).await { - Ok(Ok(r)) => Some(r), - Ok(Err(_)) => { - warn!("Ping response sender for server {}, ping id {} dropped", socket_addr, id); - None - } - Err(_) => { - debug!("Server {} timed out when sending ping id {}", socket_addr, id); - None - } + handle(match timeout(Duration::from_secs(1), rx).await { + Ok(Ok(r)) => Some(r), + Ok(Err(_)) => { + warn!( + "Ping response sender for server {}, ping id {} dropped", + socket_addr, id + ); + None } - ); + Err(_) => { + debug!( + "Server {} timed out when sending ping id {}", + socket_addr, id + ); + None + } + }); }); } }; |
