aboutsummaryrefslogtreecommitdiffstats
path: root/mumd
diff options
context:
space:
mode:
Diffstat (limited to 'mumd')
-rw-r--r--mumd/src/network/udp.rs110
1 files changed, 41 insertions, 69 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 25ec8d5..441d08b 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -1,7 +1,7 @@
use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
-use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt, Stream};
+use futures::{join, SinkExt, StreamExt, Stream};
use futures_util::stream::{SplitSink, SplitStream};
use log::*;
use mumble_protocol::crypt::ClientCryptState;
@@ -15,7 +15,7 @@ use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc};
use tokio::net::UdpSocket;
-use tokio::sync::{mpsc, oneshot, watch, Mutex};
+use tokio::sync::{mpsc, watch, Mutex};
use tokio::time::{interval, Duration};
use tokio_util::udp::UdpFramed;
@@ -116,78 +116,50 @@ async fn new_crypt_state(
async fn listen(
state: Arc<Mutex<State>>,
source: Arc<Mutex<UdpReceiver>>,
- mut phase_watcher: watch::Receiver<StatePhase>,
+ phase_watcher: watch::Receiver<StatePhase>,
last_ping_recv: &AtomicU64,
) {
- let (tx, rx) = oneshot::channel();
- let phase_transition_block = async {
- loop {
- phase_watcher.changed().await.unwrap();
- if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) {
- break;
- }
- }
- tx.send(true).unwrap();
- };
-
- let main_block = async {
- let rx = rx.fuse();
- pin_mut!(rx);
- loop {
- let mut source = source.lock().await;
- let packet_recv = source.next().fuse();
- pin_mut!(packet_recv);
- let exitor = select! {
- data = packet_recv => Some(data),
- _ = rx => None
- };
- drop(source);
- match exitor {
- None => {
- break;
- }
- Some(None) => {
- warn!("Channel closed before disconnect command");
- break;
- }
- Some(Some(packet)) => {
- let (packet, _src_addr) = match packet {
- Ok(packet) => packet,
- Err(err) => {
- warn!("Got an invalid UDP packet: {}", err);
- // To be expected, considering this is the internet, just ignore it
- continue;
- }
- };
- match packet {
- VoicePacket::Ping { timestamp } => {
- // debug!("Sending UDP voice");
- state
- .lock() //TODO clean up unnecessary lock by only updating phase if it should change
- .await
- .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP));
- last_ping_recv.store(timestamp, Ordering::Relaxed);
- }
- VoicePacket::Audio {
- session_id,
- // seq_num,
- payload,
- // position_info,
- ..
- } => {
- state
- .lock()
- .await
- .audio()
- .decode_packet_payload(VoiceStreamType::UDP, session_id, payload);
- }
+ run_until(
+ |phase| matches!(phase, StatePhase::Disconnected),
+ async {
+ loop {
+ let packet = source.lock().await.next().await.unwrap();
+ let (packet, _src_addr) = match packet {
+ Ok(packet) => packet,
+ Err(err) => {
+ warn!("Got an invalid UDP packet: {}", err);
+ // To be expected, considering this is the internet, just ignore it
+ continue;
+ }
+ };
+ match packet {
+ VoicePacket::Ping { timestamp } => {
+ // debug!("Sending UDP voice");
+ state
+ .lock() //TODO clean up unnecessary lock by only updating phase if it should change
+ .await
+ .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP));
+ last_ping_recv.store(timestamp, Ordering::Relaxed);
+ }
+ VoicePacket::Audio {
+ session_id,
+ // seq_num,
+ payload,
+ // position_info,
+ ..
+ } => {
+ state
+ .lock()
+ .await
+ .audio()
+ .decode_packet_payload(VoiceStreamType::UDP, session_id, payload);
}
}
}
- }
- };
-
- join!(main_block, phase_transition_block);
+ },
+ || async {},
+ phase_watcher
+ ).await;
debug!("UDP listener process killed");
}