aboutsummaryrefslogtreecommitdiffstats
path: root/mumd
diff options
context:
space:
mode:
authorEskil Q <eskilq@kth.se>2021-01-07 12:10:32 +0100
committerEskil Q <eskilq@kth.se>2021-01-07 12:10:32 +0100
commitf6a8a126e67ff1a89dcbdb35033e1f324add50dc (patch)
treebb5c578583f3ed241821ee1cca7ef196352e6271 /mumd
parent8b042801d090e1a17ca72ddb559d92ccbbb41091 (diff)
downloadmum-f6a8a126e67ff1a89dcbdb35033e1f324add50dc.tar.gz
fix UDP shutting down properly
Diffstat (limited to 'mumd')
-rw-r--r--mumd/src/network/udp.rs91
1 files changed, 55 insertions, 36 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index ac67bcb..4167c15 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -59,7 +59,7 @@ pub async fn handle(
send_voice(
Arc::clone(&sink),
connection_info.socket_addr,
- phase_watcher,
+ phase_watcher.clone(),
Arc::clone(&receiver),
),
send_pings(
@@ -67,8 +67,9 @@ pub async fn handle(
Arc::clone(&sink),
connection_info.socket_addr,
&last_ping_recv,
+ phase_watcher.clone(),
),
- new_crypt_state(&mut crypt_state_receiver, sink, source),
+ new_crypt_state(&mut crypt_state_receiver, sink, source, phase_watcher),
);
debug!("Fully disconnected UDP stream, waiting for new connection info");
@@ -99,18 +100,26 @@ async fn new_crypt_state(
crypt_state: &mut mpsc::Receiver<ClientCryptState>,
sink: Arc<Mutex<UdpSender>>,
source: Arc<Mutex<UdpReceiver>>,
+ phase_watcher: watch::Receiver<StatePhase>,
) {
- loop {
- if let Some(crypt_state) = crypt_state.recv().await {
- info!("Received new crypt state");
- let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
- .await
- .expect("Failed to bind UDP socket");
- let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split();
- *sink.lock().await = new_sink;
- *source.lock().await = new_source;
- }
- }
+ run_until(
+ |phase| matches!(phase, StatePhase::Disconnected),
+ async {
+ loop {
+ if let Some(crypt_state) = crypt_state.recv().await {
+ info!("Received new crypt state");
+ let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
+ .await
+ .expect("Failed to bind UDP socket");
+ let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split();
+ *sink.lock().await = new_sink;
+ *source.lock().await = new_source;
+ }
+ }
+ },
+ || async {},
+ phase_watcher,
+ ).await;
}
async fn listen(
@@ -168,34 +177,44 @@ async fn send_pings(
sink: Arc<Mutex<UdpSender>>,
server_addr: SocketAddr,
last_ping_recv: &AtomicU64,
+ phase_watcher: watch::Receiver<StatePhase>,
) {
let mut last_send = None;
let mut interval = interval(Duration::from_millis(1000));
- loop {
- interval.tick().await;
- let last_recv = last_ping_recv.load(Ordering::Relaxed);
- if last_send.is_some() && last_send.unwrap() != last_recv {
- debug!("Sending TCP voice");
- state
- .lock()
- .await
- .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP));
- }
- match sink
- .lock()
- .await
- .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr))
- .await
- {
- Ok(_) => {
- last_send = Some(last_recv + 1);
- },
- Err(e) => {
- debug!("Error sending UDP ping: {}", e);
+ run_until(
+ |phase| matches!(phase, StatePhase::Disconnected),
+ async {
+ loop {
+ interval.tick().await;
+ let last_recv = last_ping_recv.load(Ordering::Relaxed);
+ if last_send.is_some() && last_send.unwrap() != last_recv {
+ debug!("Sending TCP voice");
+ state
+ .lock()
+ .await
+ .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP));
+ }
+ match sink
+ .lock()
+ .await
+ .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr))
+ .await
+ {
+ Ok(_) => {
+ last_send = Some(last_recv + 1);
+ },
+ Err(e) => {
+ debug!("Error sending UDP ping: {}", e);
+ }
+ }
}
- }
- }
+ },
+ || async {},
+ phase_watcher,
+ ).await;
+
+ debug!("UDP ping sender process killed");
}
async fn send_voice(