diff options
Diffstat (limited to 'mumd')
| -rw-r--r-- | mumd/src/network/udp.rs | 110 |
1 files changed, 41 insertions, 69 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 25ec8d5..441d08b 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,7 +1,7 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; -use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt, Stream}; +use futures::{join, SinkExt, StreamExt, Stream}; use futures_util::stream::{SplitSink, SplitStream}; use log::*; use mumble_protocol::crypt::ClientCryptState; @@ -15,7 +15,7 @@ use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc}; use tokio::net::UdpSocket; -use tokio::sync::{mpsc, oneshot, watch, Mutex}; +use tokio::sync::{mpsc, watch, Mutex}; use tokio::time::{interval, Duration}; use tokio_util::udp::UdpFramed; @@ -116,78 +116,50 @@ async fn new_crypt_state( async fn listen( state: Arc<Mutex<State>>, source: Arc<Mutex<UdpReceiver>>, - mut phase_watcher: watch::Receiver<StatePhase>, + phase_watcher: watch::Receiver<StatePhase>, last_ping_recv: &AtomicU64, ) { - let (tx, rx) = oneshot::channel(); - let phase_transition_block = async { - loop { - phase_watcher.changed().await.unwrap(); - if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) { - break; - } - } - tx.send(true).unwrap(); - }; - - let main_block = async { - let rx = rx.fuse(); - pin_mut!(rx); - loop { - let mut source = source.lock().await; - let packet_recv = source.next().fuse(); - pin_mut!(packet_recv); - let exitor = select! { - data = packet_recv => Some(data), - _ = rx => None - }; - drop(source); - match exitor { - None => { - break; - } - Some(None) => { - warn!("Channel closed before disconnect command"); - break; - } - Some(Some(packet)) => { - let (packet, _src_addr) = match packet { - Ok(packet) => packet, - Err(err) => { - warn!("Got an invalid UDP packet: {}", err); - // To be expected, considering this is the internet, just ignore it - continue; - } - }; - match packet { - VoicePacket::Ping { timestamp } => { - // debug!("Sending UDP voice"); - state - .lock() //TODO clean up unnecessary lock by only updating phase if it should change - .await - .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP)); - last_ping_recv.store(timestamp, Ordering::Relaxed); - } - VoicePacket::Audio { - session_id, - // seq_num, - payload, - // position_info, - .. - } => { - state - .lock() - .await - .audio() - .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); - } + run_until( + |phase| matches!(phase, StatePhase::Disconnected), + async { + loop { + let packet = source.lock().await.next().await.unwrap(); + let (packet, _src_addr) = match packet { + Ok(packet) => packet, + Err(err) => { + warn!("Got an invalid UDP packet: {}", err); + // To be expected, considering this is the internet, just ignore it + continue; + } + }; + match packet { + VoicePacket::Ping { timestamp } => { + // debug!("Sending UDP voice"); + state + .lock() //TODO clean up unnecessary lock by only updating phase if it should change + .await + .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP)); + last_ping_recv.store(timestamp, Ordering::Relaxed); + } + VoicePacket::Audio { + session_id, + // seq_num, + payload, + // position_info, + .. + } => { + state + .lock() + .await + .audio() + .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } } - } - }; - - join!(main_block, phase_transition_block); + }, + || async {}, + phase_watcher + ).await; debug!("UDP listener process killed"); } |
