diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2021-01-05 17:08:48 +0100 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2021-01-05 17:08:48 +0100 |
| commit | ab038b58b4440804cdfded56167ce72b599d87c8 (patch) | |
| tree | 5ef4e7328a4c1a57c5006b4bf1af6709a64dafd4 /mumd/src/network/udp.rs | |
| parent | 6c59a37fbfce72a92581b362048b509dcb67dae1 (diff) | |
| download | mum-ab038b58b4440804cdfded56167ce72b599d87c8.tar.gz | |
yikes
Diffstat (limited to 'mumd/src/network/udp.rs')
| -rw-r--r-- | mumd/src/network/udp.rs | 73 |
1 files changed, 25 insertions, 48 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 1bc012d..9435e94 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -19,7 +19,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{interval, Duration}; use tokio_util::udp::UdpFramed; -use super::VoiceStreamType; +use super::{run_until, VoiceStreamType}; pub type PingRequest = (u64, SocketAddr, Box<dyn FnOnce(PongPacket)>); @@ -31,7 +31,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>, ) { - let receiver = state.lock().unwrap().audio_mut().take_receiver(); + let receiver = state.lock().unwrap().audio().input_receiver(); loop { let connection_info = 'data: loop { @@ -49,7 +49,6 @@ pub async fn handle( let phase_watcher = state.lock().unwrap().phase_receiver(); let last_ping_recv = AtomicU64::new(0); - let mut audio_receiver_lock = receiver.lock().unwrap(); join!( listen( Arc::clone(&state), @@ -61,7 +60,7 @@ pub async fn handle( Arc::clone(&sink), connection_info.socket_addr, phase_watcher, - &mut *audio_receiver_lock, + Arc::clone(&receiver), ), send_pings( Arc::clone(&state), @@ -228,51 +227,29 @@ async fn send_pings( async fn send_voice( sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr, - mut phase_watcher: watch::Receiver<StatePhase>, - receiver: &mut (dyn Stream<Item = VoicePacket<Serverbound>> + Unpin), + phase_watcher: watch::Receiver<StatePhase>, + receiver: Arc<Mutex<Box<(dyn Stream<Item = VoicePacket<Serverbound>> + Unpin)>>>, ) { - pin_mut!(receiver); - let (tx, rx) = oneshot::channel(); - let phase_transition_block = async { - loop { - phase_watcher.changed().await.unwrap(); - if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) { - break; - } - } - tx.send(true).unwrap(); - }; - - let main_block = async { - let rx = rx.fuse(); - pin_mut!(rx); - loop { - let packet_recv = receiver.next().fuse(); - pin_mut!(packet_recv); - let exitor = select! { - data = packet_recv => Some(data), - _ = rx => None - }; - match exitor { - None => { - break; - } - Some(None) => { - warn!("Channel closed before disconnect command"); - break; - } - Some(Some(reply)) => { - sink.lock() - .unwrap() - .send((reply, server_addr)) - .await - .unwrap(); - } - } - } - }; - - join!(main_block, phase_transition_block); + let inner_phase_watcher = phase_watcher.clone(); + run_until( + |phase| matches!(phase, StatePhase::Disconnected), + || async { + run_until( + |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::UDP)), + || async { + sink.lock().unwrap().send((receiver.lock().unwrap().next().await.unwrap(), server_addr)).await.unwrap(); + Some(Some(())) + }, + |_| async {}, + || async {}, + inner_phase_watcher.clone(), + ).await; + Some(Some(())) + }, + |_| async {}, + || async {}, + phase_watcher, + ).await; debug!("UDP sender process killed"); } |
