aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network/udp.rs
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2021-01-05 17:08:48 +0100
committerGustav Sörnäs <gustav@sornas.net>2021-01-05 17:08:48 +0100
commitab038b58b4440804cdfded56167ce72b599d87c8 (patch)
tree5ef4e7328a4c1a57c5006b4bf1af6709a64dafd4 /mumd/src/network/udp.rs
parent6c59a37fbfce72a92581b362048b509dcb67dae1 (diff)
downloadmum-ab038b58b4440804cdfded56167ce72b599d87c8.tar.gz
yikes
Diffstat (limited to 'mumd/src/network/udp.rs')
-rw-r--r--mumd/src/network/udp.rs73
1 files changed, 25 insertions, 48 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 1bc012d..9435e94 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -19,7 +19,7 @@ use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::{interval, Duration};
use tokio_util::udp::UdpFramed;
-use super::VoiceStreamType;
+use super::{run_until, VoiceStreamType};
pub type PingRequest = (u64, SocketAddr, Box<dyn FnOnce(PongPacket)>);
@@ -31,7 +31,7 @@ pub async fn handle(
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>,
) {
- let receiver = state.lock().unwrap().audio_mut().take_receiver();
+ let receiver = state.lock().unwrap().audio().input_receiver();
loop {
let connection_info = 'data: loop {
@@ -49,7 +49,6 @@ pub async fn handle(
let phase_watcher = state.lock().unwrap().phase_receiver();
let last_ping_recv = AtomicU64::new(0);
- let mut audio_receiver_lock = receiver.lock().unwrap();
join!(
listen(
Arc::clone(&state),
@@ -61,7 +60,7 @@ pub async fn handle(
Arc::clone(&sink),
connection_info.socket_addr,
phase_watcher,
- &mut *audio_receiver_lock,
+ Arc::clone(&receiver),
),
send_pings(
Arc::clone(&state),
@@ -228,51 +227,29 @@ async fn send_pings(
async fn send_voice(
sink: Arc<Mutex<UdpSender>>,
server_addr: SocketAddr,
- mut phase_watcher: watch::Receiver<StatePhase>,
- receiver: &mut (dyn Stream<Item = VoicePacket<Serverbound>> + Unpin),
+ phase_watcher: watch::Receiver<StatePhase>,
+ receiver: Arc<Mutex<Box<(dyn Stream<Item = VoicePacket<Serverbound>> + Unpin)>>>,
) {
- pin_mut!(receiver);
- let (tx, rx) = oneshot::channel();
- let phase_transition_block = async {
- loop {
- phase_watcher.changed().await.unwrap();
- if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) {
- break;
- }
- }
- tx.send(true).unwrap();
- };
-
- let main_block = async {
- let rx = rx.fuse();
- pin_mut!(rx);
- loop {
- let packet_recv = receiver.next().fuse();
- pin_mut!(packet_recv);
- let exitor = select! {
- data = packet_recv => Some(data),
- _ = rx => None
- };
- match exitor {
- None => {
- break;
- }
- Some(None) => {
- warn!("Channel closed before disconnect command");
- break;
- }
- Some(Some(reply)) => {
- sink.lock()
- .unwrap()
- .send((reply, server_addr))
- .await
- .unwrap();
- }
- }
- }
- };
-
- join!(main_block, phase_transition_block);
+ let inner_phase_watcher = phase_watcher.clone();
+ run_until(
+ |phase| matches!(phase, StatePhase::Disconnected),
+ || async {
+ run_until(
+ |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::UDP)),
+ || async {
+ sink.lock().unwrap().send((receiver.lock().unwrap().next().await.unwrap(), server_addr)).await.unwrap();
+ Some(Some(()))
+ },
+ |_| async {},
+ || async {},
+ inner_phase_watcher.clone(),
+ ).await;
+ Some(Some(()))
+ },
+ |_| async {},
+ || async {},
+ phase_watcher,
+ ).await;
debug!("UDP sender process killed");
}