diff options
Diffstat (limited to 'mumd')
| -rw-r--r-- | mumd/src/network/udp.rs | 91 |
1 files changed, 55 insertions, 36 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index ac67bcb..4167c15 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -59,7 +59,7 @@ pub async fn handle( send_voice( Arc::clone(&sink), connection_info.socket_addr, - phase_watcher, + phase_watcher.clone(), Arc::clone(&receiver), ), send_pings( @@ -67,8 +67,9 @@ pub async fn handle( Arc::clone(&sink), connection_info.socket_addr, &last_ping_recv, + phase_watcher.clone(), ), - new_crypt_state(&mut crypt_state_receiver, sink, source), + new_crypt_state(&mut crypt_state_receiver, sink, source, phase_watcher), ); debug!("Fully disconnected UDP stream, waiting for new connection info"); @@ -99,18 +100,26 @@ async fn new_crypt_state( crypt_state: &mut mpsc::Receiver<ClientCryptState>, sink: Arc<Mutex<UdpSender>>, source: Arc<Mutex<UdpReceiver>>, + phase_watcher: watch::Receiver<StatePhase>, ) { - loop { - if let Some(crypt_state) = crypt_state.recv().await { - info!("Received new crypt state"); - let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) - .await - .expect("Failed to bind UDP socket"); - let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split(); - *sink.lock().await = new_sink; - *source.lock().await = new_source; - } - } + run_until( + |phase| matches!(phase, StatePhase::Disconnected), + async { + loop { + if let Some(crypt_state) = crypt_state.recv().await { + info!("Received new crypt state"); + let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) + .await + .expect("Failed to bind UDP socket"); + let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split(); + *sink.lock().await = new_sink; + *source.lock().await = new_source; + } + } + }, + || async {}, + phase_watcher, + ).await; } async fn listen( @@ -168,34 +177,44 @@ async fn send_pings( sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr, last_ping_recv: &AtomicU64, + phase_watcher: watch::Receiver<StatePhase>, ) { let mut last_send = None; let mut interval = interval(Duration::from_millis(1000)); - loop { - interval.tick().await; - let last_recv = last_ping_recv.load(Ordering::Relaxed); - if last_send.is_some() && last_send.unwrap() != last_recv { - debug!("Sending TCP voice"); - state - .lock() - .await - .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); - } - match sink - .lock() - .await - .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr)) - .await - { - Ok(_) => { - last_send = Some(last_recv + 1); - }, - Err(e) => { - debug!("Error sending UDP ping: {}", e); + run_until( + |phase| matches!(phase, StatePhase::Disconnected), + async { + loop { + interval.tick().await; + let last_recv = last_ping_recv.load(Ordering::Relaxed); + if last_send.is_some() && last_send.unwrap() != last_recv { + debug!("Sending TCP voice"); + state + .lock() + .await + .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); + } + match sink + .lock() + .await + .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr)) + .await + { + Ok(_) => { + last_send = Some(last_recv + 1); + }, + Err(e) => { + debug!("Error sending UDP ping: {}", e); + } + } } - } - } + }, + || async {}, + phase_watcher, + ).await; + + debug!("UDP ping sender process killed"); } async fn send_voice( |
