From 92d5b21bf0f910f219c473002f83ba93ddcbee6d Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Wed, 6 Jan 2021 23:50:09 +0100 Subject: fix deadlock --- mumd/src/network/udp.rs | 69 ++++++++++++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 27 deletions(-) (limited to 'mumd/src/network/udp.rs') diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index d35a255..25ec8d5 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -13,9 +13,9 @@ use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc}; use tokio::net::UdpSocket; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::sync::{mpsc, oneshot, watch, Mutex}; use tokio::time::{interval, Duration}; use tokio_util::udp::UdpFramed; @@ -31,7 +31,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) { - let receiver = state.lock().unwrap().audio().input_receiver(); + let receiver = state.lock().await.audio().input_receiver(); loop { let connection_info = 'data: loop { @@ -47,7 +47,7 @@ pub async fn handle( let sink = Arc::new(Mutex::new(sink)); let source = Arc::new(Mutex::new(source)); - let phase_watcher = state.lock().unwrap().phase_receiver(); + let phase_watcher = state.lock().await.phase_receiver(); let last_ping_recv = AtomicU64::new(0); join!( listen( @@ -107,8 +107,8 @@ async fn new_crypt_state( .await .expect("Failed to bind UDP socket"); let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split(); - *sink.lock().unwrap() = new_sink; - *source.lock().unwrap() = new_source; + *sink.lock().await = new_sink; + *source.lock().await = new_source; } } } @@ -134,13 +134,14 @@ async fn listen( let rx = rx.fuse(); pin_mut!(rx); loop { - let mut source = source.lock().unwrap(); + let mut source = source.lock().await; let packet_recv = source.next().fuse(); pin_mut!(packet_recv); let exitor = select! { data = packet_recv => Some(data), _ = rx => None }; + drop(source); match exitor { None => { break; @@ -160,9 +161,10 @@ async fn listen( }; match packet { VoicePacket::Ping { timestamp } => { + // debug!("Sending UDP voice"); state - .lock() - .unwrap() + .lock() //TODO clean up unnecessary lock by only updating phase if it should change + .await .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP)); last_ping_recv.store(timestamp, Ordering::Relaxed); } @@ -175,9 +177,9 @@ async fn listen( } => { state .lock() - .unwrap() + .await .audio() - .decode_packet(VoiceStreamType::UDP, session_id, payload); + .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } } @@ -198,19 +200,21 @@ async fn send_pings( ) { let mut last_send = None; let mut interval = interval(Duration::from_millis(1000)); + interval.tick().await; //this is so we get rid of the first instant resolve loop { let last_recv = last_ping_recv.load(Ordering::Relaxed); if last_send.is_some() && last_send.unwrap() != last_recv { + debug!("Sending TCP voice"); state .lock() - .unwrap() + .await .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); } match sink .lock() - .unwrap() - .send((VoicePacket::Ping { timestamp: 0 }, server_addr)) + .await + .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr)) .await { Ok(_) => { @@ -228,22 +232,33 @@ async fn send_voice( sink: Arc>, server_addr: SocketAddr, phase_watcher: watch::Receiver, - receiver: Arc> + Unpin)>>>, + receiver: Arc> + Unpin)>>>, ) { let inner_phase_watcher = phase_watcher.clone(); run_until( |phase| matches!(phase, StatePhase::Disconnected), async { - run_until( - |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::UDP)), - async { - debug!("Sending UDP audio"); - sink.lock().unwrap().send((receiver.lock().await.next().await.unwrap(), server_addr)).await.unwrap(); - debug!("Sent UDP audio"); - }, - || async {}, - inner_phase_watcher.clone(), - ).await; + loop { + let mut inner_phase_watcher_2 = inner_phase_watcher.clone(); + loop { + inner_phase_watcher_2.changed().await.unwrap(); + if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::UDP)) { + break; + } + } + run_until( + |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::UDP)), + async { + let mut receiver = receiver.lock().await; + loop { + let sending = (receiver.next().await.unwrap(), server_addr); + sink.lock().await.send(sending).await.unwrap(); + } + }, + || async {}, + inner_phase_watcher.clone(), + ).await; + } }, || async {}, phase_watcher, @@ -266,7 +281,7 @@ pub async fn handle_pings( let packet = PingPacket { id }; let packet: [u8; 12] = packet.into(); udp_socket.send_to(&packet, &socket_addr).await.unwrap(); - pending.lock().unwrap().insert(id, handle); + pending.lock().await.insert(id, handle); } }; @@ -277,7 +292,7 @@ pub async fn handle_pings( let packet = PongPacket::try_from(buf.as_slice()).unwrap(); - if let Some(handler) = pending.lock().unwrap().remove(&packet.id) { + if let Some(handler) = pending.lock().await.remove(&packet.id) { handler(packet); } } -- cgit v1.2.1