From 55644de7b35421997198c9dec4a8bba5dfb8dd8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 5 Jan 2021 12:47:04 +0100 Subject: add voice stream type --- mumd/src/network.rs | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'mumd/src/network.rs') diff --git a/mumd/src/network.rs b/mumd/src/network.rs index 1a31ee2..4fb2e77 100644 --- a/mumd/src/network.rs +++ b/mumd/src/network.rs @@ -19,3 +19,9 @@ impl ConnectionInfo { } } } + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +pub enum VoiceStreamType { + TCP, + UDP, +} -- cgit v1.2.1 From ab038b58b4440804cdfded56167ce72b599d87c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 5 Jan 2021 17:08:48 +0100 Subject: yikes --- mumd/src/network.rs | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) (limited to 'mumd/src/network.rs') diff --git a/mumd/src/network.rs b/mumd/src/network.rs index 4fb2e77..03bc436 100644 --- a/mumd/src/network.rs +++ b/mumd/src/network.rs @@ -3,6 +3,16 @@ pub mod udp; use std::net::SocketAddr; +use futures::Future; +use futures::FutureExt; +use futures::channel::oneshot; +use futures::join; +use futures::pin_mut; +use futures::select; +use tokio::sync::watch; + +use crate::state::StatePhase; + #[derive(Clone, Debug)] pub struct ConnectionInfo { socket_addr: SocketAddr, @@ -25,3 +35,55 @@ pub enum VoiceStreamType { TCP, UDP, } + +async fn run_until( + phase_checker: impl Fn(StatePhase) -> bool, + mut generator: impl FnMut() -> F, + mut handler: impl FnMut(T) -> G, + mut shutdown: impl FnMut() -> H, + mut phase_watcher: watch::Receiver, +) where + F: Future>, + G: Future, + H: Future, +{ + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + loop { + phase_watcher.changed().await.unwrap(); + if phase_checker(*phase_watcher.borrow()) { + break; + } + } + tx.send(true).unwrap(); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = generator().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + //warn!("Channel closed before disconnect command"); //TODO make me informative + break; + } + Some(Some(data)) => { + handler(data).await; + } + } + } + + shutdown().await; + }; + + join!(main_block, phase_transition_block); +} -- cgit v1.2.1 From 02e6f2b84d72294b29a1698c1b73fbb5697815da Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Wed, 6 Jan 2021 18:31:49 +0100 Subject: clean up network::run_until --- mumd/src/network.rs | 41 ++++++++++++++--------------------------- 1 file changed, 14 insertions(+), 27 deletions(-) (limited to 'mumd/src/network.rs') diff --git a/mumd/src/network.rs b/mumd/src/network.rs index 03bc436..75b983e 100644 --- a/mumd/src/network.rs +++ b/mumd/src/network.rs @@ -10,6 +10,7 @@ use futures::join; use futures::pin_mut; use futures::select; use tokio::sync::watch; +use log::*; use crate::state::StatePhase; @@ -36,16 +37,14 @@ pub enum VoiceStreamType { UDP, } -async fn run_until( +async fn run_until( phase_checker: impl Fn(StatePhase) -> bool, - mut generator: impl FnMut() -> F, - mut handler: impl FnMut(T) -> G, - mut shutdown: impl FnMut() -> H, + fut: F, + mut shutdown: impl FnMut() -> G, mut phase_watcher: watch::Receiver, ) where - F: Future>, + F: Future, G: Future, - H: Future, { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { @@ -55,32 +54,20 @@ async fn run_until( break; } } - tx.send(true).unwrap(); + if tx.send(true).is_err() { + warn!("future resolved before it could be cancelled"); + } }; let main_block = async { let rx = rx.fuse(); pin_mut!(rx); - loop { - let packet_recv = generator().fuse(); - pin_mut!(packet_recv); - let exitor = select! { - data = packet_recv => Some(data), - _ = rx => None - }; - match exitor { - None => { - break; - } - Some(None) => { - //warn!("Channel closed before disconnect command"); //TODO make me informative - break; - } - Some(Some(data)) => { - handler(data).await; - } - } - } + let fut = fut.fuse(); + pin_mut!(fut); + select! { + _ = fut => (), + _ = rx => (), + }; shutdown().await; }; -- cgit v1.2.1 From 8b042801d090e1a17ca72ddb559d92ccbbb41091 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Thu, 7 Jan 2021 12:02:43 +0100 Subject: update according to feedback --- mumd/src/network.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'mumd/src/network.rs') diff --git a/mumd/src/network.rs b/mumd/src/network.rs index 75b983e..9463ad7 100644 --- a/mumd/src/network.rs +++ b/mumd/src/network.rs @@ -1,16 +1,15 @@ pub mod tcp; pub mod udp; -use std::net::SocketAddr; - use futures::Future; use futures::FutureExt; use futures::channel::oneshot; use futures::join; use futures::pin_mut; use futures::select; -use tokio::sync::watch; use log::*; +use std::net::SocketAddr; +use tokio::sync::watch; use crate::state::StatePhase; -- cgit v1.2.1 From 62d3e3d6bf3842a1aad28874a69992b0b880137e Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Thu, 7 Jan 2021 12:41:43 +0100 Subject: remove shutdown function on run_until it wasn't used and there are other ways of accomplishing the same thing --- mumd/src/network.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'mumd/src/network.rs') diff --git a/mumd/src/network.rs b/mumd/src/network.rs index 9463ad7..6c67b3a 100644 --- a/mumd/src/network.rs +++ b/mumd/src/network.rs @@ -36,14 +36,12 @@ pub enum VoiceStreamType { UDP, } -async fn run_until( +async fn run_until( phase_checker: impl Fn(StatePhase) -> bool, fut: F, - mut shutdown: impl FnMut() -> G, mut phase_watcher: watch::Receiver, ) where F: Future, - G: Future, { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { @@ -67,8 +65,6 @@ async fn run_until( _ = fut => (), _ = rx => (), }; - - shutdown().await; }; join!(main_block, phase_transition_block); -- cgit v1.2.1