aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2021-01-05 13:09:23 +0100
committerGustav Sörnäs <gustav@sornas.net>2021-01-05 13:09:23 +0100
commit6c59a37fbfce72a92581b362048b509dcb67dae1 (patch)
tree4e618c87dec36791b17c993613c67c4666146785 /mumd/src
parent00969263678bf0626de8229fd21b1d5d183b62e8 (diff)
downloadmum-6c59a37fbfce72a92581b362048b509dcb67dae1.tar.gz
compare udp ping responses to sent values
Diffstat (limited to 'mumd/src')
-rw-r--r--mumd/src/network/udp.rs39
-rw-r--r--mumd/src/state.rs8
2 files changed, 37 insertions, 10 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index cfbabe1..1bc012d 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -12,6 +12,7 @@ use std::collections::HashMap;
use std::convert::TryFrom;
use std::net::{Ipv6Addr, SocketAddr};
use std::rc::Rc;
+use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, oneshot, watch};
@@ -47,22 +48,26 @@ pub async fn handle(
let source = Arc::new(Mutex::new(source));
let phase_watcher = state.lock().unwrap().phase_receiver();
+ let last_ping_recv = AtomicU64::new(0);
let mut audio_receiver_lock = receiver.lock().unwrap();
join!(
listen(
Arc::clone(&state),
Arc::clone(&source),
- phase_watcher.clone()
+ phase_watcher.clone(),
+ &last_ping_recv,
),
send_voice(
Arc::clone(&sink),
connection_info.socket_addr,
phase_watcher,
- &mut *audio_receiver_lock
+ &mut *audio_receiver_lock,
),
send_pings(
+ Arc::clone(&state),
Arc::clone(&sink),
connection_info.socket_addr,
+ &last_ping_recv,
),
new_crypt_state(&mut crypt_state_receiver, sink, source),
);
@@ -113,6 +118,7 @@ async fn listen(
state: Arc<Mutex<State>>,
source: Arc<Mutex<UdpReceiver>>,
mut phase_watcher: watch::Receiver<StatePhase>,
+ last_ping_recv: &AtomicU64,
) {
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
@@ -154,10 +160,12 @@ async fn listen(
}
};
match packet {
- VoicePacket::Ping { .. } => {
- // Note: A normal application would handle these and only use UDP for voice
- // once it has received one.
- continue;
+ VoicePacket::Ping { timestamp } => {
+ state
+ .lock()
+ .unwrap()
+ .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP));
+ last_ping_recv.store(timestamp, Ordering::Relaxed);
}
VoicePacket::Audio {
session_id,
@@ -183,17 +191,32 @@ async fn listen(
debug!("UDP listener process killed");
}
-async fn send_pings(sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr) {
+async fn send_pings(
+ state: Arc<Mutex<State>>,
+ sink: Arc<Mutex<UdpSender>>,
+ server_addr: SocketAddr,
+ last_ping_recv: &AtomicU64,
+) {
+ let mut last_send = None;
let mut interval = interval(Duration::from_millis(1000));
loop {
+ let last_recv = last_ping_recv.load(Ordering::Relaxed);
+ if last_send.is_some() && last_send.unwrap() != last_recv {
+ state
+ .lock()
+ .unwrap()
+ .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP));
+ }
match sink
.lock()
.unwrap()
.send((VoicePacket::Ping { timestamp: 0 }, server_addr))
.await
{
- Ok(_) => { /* TODO */ },
+ Ok(_) => {
+ last_send = Some(last_recv + 1);
+ },
Err(e) => {
debug!("Error sending UDP ping: {}", e);
}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index 4e8a886..2ed73b2 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -578,11 +578,15 @@ impl State {
}
}
- pub fn initialized(&self) {
+ pub fn broadcast_phase(&self, phase: StatePhase) {
self.phase_watcher
.0
- .send(StatePhase::Connected(VoiceStreamType::UDP))
+ .send(phase)
.unwrap();
+ }
+
+ pub fn initialized(&self) {
+ self.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP));
self.audio.play_effect(NotificationEvents::ServerConnect);
}