aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network/udp.rs
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2021-06-06 23:26:24 +0200
committerGustav Sörnäs <gustav@sornas.net>2021-06-06 23:26:24 +0200
commitbe76c2aa51733a0cf495e92659fbcbe527f41149 (patch)
tree617fb1caa999c076a45233b4bedea6a78192db25 /mumd/src/network/udp.rs
parent7fc5a1a36404ee4cbc09d20c955e6edd3d2ac523 (diff)
downloadmum-be76c2aa51733a0cf495e92659fbcbe527f41149.tar.gz
cargo fmt
Diffstat (limited to 'mumd/src/network/udp.rs')
-rw-r--r--mumd/src/network/udp.rs83
1 files changed, 47 insertions, 36 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 0958912..95dcf33 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -2,9 +2,9 @@ use crate::error::UdpError;
use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
-use futures_util::{FutureExt, SinkExt, StreamExt};
use futures_util::future::join4;
use futures_util::stream::{SplitSink, SplitStream, Stream};
+use futures_util::{FutureExt, SinkExt, StreamExt};
use log::*;
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::ping::{PingPacket, PongPacket};
@@ -13,10 +13,13 @@ use mumble_protocol::Serverbound;
use std::collections::{hash_map::Entry, HashMap};
use std::convert::TryFrom;
use std::net::{Ipv6Addr, SocketAddr};
-use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock};
-use tokio::{join, net::UdpSocket};
+use std::sync::{
+ atomic::{AtomicU64, Ordering},
+ Arc, RwLock,
+};
use tokio::sync::{mpsc, oneshot, watch, Mutex};
use tokio::time::{interval, timeout, Duration};
+use tokio::{join, net::UdpSocket};
use tokio_util::udp::UdpFramed;
use super::{run_until, VoiceStreamType};
@@ -53,11 +56,7 @@ pub async fn handle(
run_until(
|phase| matches!(phase, StatePhase::Disconnected),
join4(
- listen(
- Arc::clone(&state),
- Arc::clone(&source),
- &last_ping_recv,
- ),
+ listen(Arc::clone(&state), Arc::clone(&source), &last_ping_recv),
send_voice(
Arc::clone(&sink),
connection_info.socket_addr,
@@ -71,9 +70,11 @@ pub async fn handle(
&last_ping_recv,
),
new_crypt_state(&mut crypt_state_receiver, sink, source),
- ).map(|_| ()),
+ )
+ .map(|_| ()),
phase_watcher,
- ).await;
+ )
+ .await;
debug!("Fully disconnected UDP stream, waiting for new connection info");
}
@@ -83,8 +84,7 @@ async fn connect(
crypt_state: &mut mpsc::Receiver<ClientCryptState>,
) -> Result<(UdpSender, UdpReceiver), UdpError> {
// Bind UDP socket
- let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
- .await?;
+ let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)).await?;
// Wait for initial CryptState
let crypt_state = match crypt_state.recv().await {
@@ -146,11 +146,11 @@ async fn listen(
// position_info,
..
} => {
- state
- .read()
- .unwrap()
- .audio_output()
- .decode_packet_payload(VoiceStreamType::UDP, session_id, payload);
+ state.read().unwrap().audio_output().decode_packet_payload(
+ VoiceStreamType::UDP,
+ session_id,
+ payload,
+ );
}
}
}
@@ -178,12 +178,17 @@ async fn send_pings(
match sink
.lock()
.await
- .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr))
+ .send((
+ VoicePacket::Ping {
+ timestamp: last_recv + 1,
+ },
+ server_addr,
+ ))
.await
{
Ok(_) => {
last_send = Some(last_recv + 1);
- },
+ }
Err(e) => {
debug!("Error sending UDP ping: {}", e);
}
@@ -201,7 +206,10 @@ async fn send_voice(
let mut inner_phase_watcher = phase_watcher.clone();
loop {
inner_phase_watcher.changed().await.unwrap();
- if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::UDP)) {
+ if matches!(
+ *inner_phase_watcher.borrow(),
+ StatePhase::Connected(VoiceStreamType::UDP)
+ ) {
break;
}
}
@@ -215,13 +223,12 @@ async fn send_voice(
}
},
phase_watcher.clone(),
- ).await;
+ )
+ .await;
}
}
-pub async fn handle_pings(
- mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>,
-) {
+pub async fn handle_pings(mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>) {
let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
.await
.expect("Failed to bind UDP socket");
@@ -246,19 +253,23 @@ pub async fn handle_pings(
}
tokio::spawn(async move {
- handle(
- match timeout(Duration::from_secs(1), rx).await {
- Ok(Ok(r)) => Some(r),
- Ok(Err(_)) => {
- warn!("Ping response sender for server {}, ping id {} dropped", socket_addr, id);
- None
- }
- Err(_) => {
- debug!("Server {} timed out when sending ping id {}", socket_addr, id);
- None
- }
+ handle(match timeout(Duration::from_secs(1), rx).await {
+ Ok(Ok(r)) => Some(r),
+ Ok(Err(_)) => {
+ warn!(
+ "Ping response sender for server {}, ping id {} dropped",
+ socket_addr, id
+ );
+ None
}
- );
+ Err(_) => {
+ debug!(
+ "Server {} timed out when sending ping id {}",
+ socket_addr, id
+ );
+ None
+ }
+ });
});
}
};