aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network/udp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'mumd/src/network/udp.rs')
-rw-r--r--mumd/src/network/udp.rs69
1 files changed, 42 insertions, 27 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index d35a255..25ec8d5 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -13,9 +13,9 @@ use std::convert::TryFrom;
use std::net::{Ipv6Addr, SocketAddr};
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc};
use tokio::net::UdpSocket;
-use tokio::sync::{mpsc, oneshot, watch};
+use tokio::sync::{mpsc, oneshot, watch, Mutex};
use tokio::time::{interval, Duration};
use tokio_util::udp::UdpFramed;
@@ -31,7 +31,7 @@ pub async fn handle(
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>,
) {
- let receiver = state.lock().unwrap().audio().input_receiver();
+ let receiver = state.lock().await.audio().input_receiver();
loop {
let connection_info = 'data: loop {
@@ -47,7 +47,7 @@ pub async fn handle(
let sink = Arc::new(Mutex::new(sink));
let source = Arc::new(Mutex::new(source));
- let phase_watcher = state.lock().unwrap().phase_receiver();
+ let phase_watcher = state.lock().await.phase_receiver();
let last_ping_recv = AtomicU64::new(0);
join!(
listen(
@@ -107,8 +107,8 @@ async fn new_crypt_state(
.await
.expect("Failed to bind UDP socket");
let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split();
- *sink.lock().unwrap() = new_sink;
- *source.lock().unwrap() = new_source;
+ *sink.lock().await = new_sink;
+ *source.lock().await = new_source;
}
}
}
@@ -134,13 +134,14 @@ async fn listen(
let rx = rx.fuse();
pin_mut!(rx);
loop {
- let mut source = source.lock().unwrap();
+ let mut source = source.lock().await;
let packet_recv = source.next().fuse();
pin_mut!(packet_recv);
let exitor = select! {
data = packet_recv => Some(data),
_ = rx => None
};
+ drop(source);
match exitor {
None => {
break;
@@ -160,9 +161,10 @@ async fn listen(
};
match packet {
VoicePacket::Ping { timestamp } => {
+ // debug!("Sending UDP voice");
state
- .lock()
- .unwrap()
+ .lock() //TODO clean up unnecessary lock by only updating phase if it should change
+ .await
.broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP));
last_ping_recv.store(timestamp, Ordering::Relaxed);
}
@@ -175,9 +177,9 @@ async fn listen(
} => {
state
.lock()
- .unwrap()
+ .await
.audio()
- .decode_packet(VoiceStreamType::UDP, session_id, payload);
+ .decode_packet_payload(VoiceStreamType::UDP, session_id, payload);
}
}
}
@@ -198,19 +200,21 @@ async fn send_pings(
) {
let mut last_send = None;
let mut interval = interval(Duration::from_millis(1000));
+ interval.tick().await; //this is so we get rid of the first instant resolve
loop {
let last_recv = last_ping_recv.load(Ordering::Relaxed);
if last_send.is_some() && last_send.unwrap() != last_recv {
+ debug!("Sending TCP voice");
state
.lock()
- .unwrap()
+ .await
.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP));
}
match sink
.lock()
- .unwrap()
- .send((VoicePacket::Ping { timestamp: 0 }, server_addr))
+ .await
+ .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr))
.await
{
Ok(_) => {
@@ -228,22 +232,33 @@ async fn send_voice(
sink: Arc<Mutex<UdpSender>>,
server_addr: SocketAddr,
phase_watcher: watch::Receiver<StatePhase>,
- receiver: Arc<tokio::sync::Mutex<Box<(dyn Stream<Item = VoicePacket<Serverbound>> + Unpin)>>>,
+ receiver: Arc<Mutex<Box<(dyn Stream<Item = VoicePacket<Serverbound>> + Unpin)>>>,
) {
let inner_phase_watcher = phase_watcher.clone();
run_until(
|phase| matches!(phase, StatePhase::Disconnected),
async {
- run_until(
- |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::UDP)),
- async {
- debug!("Sending UDP audio");
- sink.lock().unwrap().send((receiver.lock().await.next().await.unwrap(), server_addr)).await.unwrap();
- debug!("Sent UDP audio");
- },
- || async {},
- inner_phase_watcher.clone(),
- ).await;
+ loop {
+ let mut inner_phase_watcher_2 = inner_phase_watcher.clone();
+ loop {
+ inner_phase_watcher_2.changed().await.unwrap();
+ if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::UDP)) {
+ break;
+ }
+ }
+ run_until(
+ |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::UDP)),
+ async {
+ let mut receiver = receiver.lock().await;
+ loop {
+ let sending = (receiver.next().await.unwrap(), server_addr);
+ sink.lock().await.send(sending).await.unwrap();
+ }
+ },
+ || async {},
+ inner_phase_watcher.clone(),
+ ).await;
+ }
},
|| async {},
phase_watcher,
@@ -266,7 +281,7 @@ pub async fn handle_pings(
let packet = PingPacket { id };
let packet: [u8; 12] = packet.into();
udp_socket.send_to(&packet, &socket_addr).await.unwrap();
- pending.lock().unwrap().insert(id, handle);
+ pending.lock().await.insert(id, handle);
}
};
@@ -277,7 +292,7 @@ pub async fn handle_pings(
let packet = PongPacket::try_from(buf.as_slice()).unwrap();
- if let Some(handler) = pending.lock().unwrap().remove(&packet.id) {
+ if let Some(handler) = pending.lock().await.remove(&packet.id) {
handler(packet);
}
}