diff options
Diffstat (limited to 'mumd/src/network/udp.rs')
| -rw-r--r-- | mumd/src/network/udp.rs | 39 |
1 files changed, 31 insertions, 8 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index cfbabe1..1bc012d 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -12,6 +12,7 @@ use std::collections::HashMap; use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; use std::rc::Rc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; use tokio::sync::{mpsc, oneshot, watch}; @@ -47,22 +48,26 @@ pub async fn handle( let source = Arc::new(Mutex::new(source)); let phase_watcher = state.lock().unwrap().phase_receiver(); + let last_ping_recv = AtomicU64::new(0); let mut audio_receiver_lock = receiver.lock().unwrap(); join!( listen( Arc::clone(&state), Arc::clone(&source), - phase_watcher.clone() + phase_watcher.clone(), + &last_ping_recv, ), send_voice( Arc::clone(&sink), connection_info.socket_addr, phase_watcher, - &mut *audio_receiver_lock + &mut *audio_receiver_lock, ), send_pings( + Arc::clone(&state), Arc::clone(&sink), connection_info.socket_addr, + &last_ping_recv, ), new_crypt_state(&mut crypt_state_receiver, sink, source), ); @@ -113,6 +118,7 @@ async fn listen( state: Arc<Mutex<State>>, source: Arc<Mutex<UdpReceiver>>, mut phase_watcher: watch::Receiver<StatePhase>, + last_ping_recv: &AtomicU64, ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { @@ -154,10 +160,12 @@ async fn listen( } }; match packet { - VoicePacket::Ping { .. } => { - // Note: A normal application would handle these and only use UDP for voice - // once it has received one. - continue; + VoicePacket::Ping { timestamp } => { + state + .lock() + .unwrap() + .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP)); + last_ping_recv.store(timestamp, Ordering::Relaxed); } VoicePacket::Audio { session_id, @@ -183,17 +191,32 @@ async fn listen( debug!("UDP listener process killed"); } -async fn send_pings(sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr) { +async fn send_pings( + state: Arc<Mutex<State>>, + sink: Arc<Mutex<UdpSender>>, + server_addr: SocketAddr, + last_ping_recv: &AtomicU64, +) { + let mut last_send = None; let mut interval = interval(Duration::from_millis(1000)); loop { + let last_recv = last_ping_recv.load(Ordering::Relaxed); + if last_send.is_some() && last_send.unwrap() != last_recv { + state + .lock() + .unwrap() + .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); + } match sink .lock() .unwrap() .send((VoicePacket::Ping { timestamp: 0 }, server_addr)) .await { - Ok(_) => { /* TODO */ }, + Ok(_) => { + last_send = Some(last_recv + 1); + }, Err(e) => { debug!("Error sending UDP ping: {}", e); } |
