diff options
Diffstat (limited to 'mumd/src/network.rs')
| -rw-r--r-- | mumd/src/network.rs | 62 |
1 files changed, 62 insertions, 0 deletions
diff --git a/mumd/src/network.rs b/mumd/src/network.rs index 4fb2e77..03bc436 100644 --- a/mumd/src/network.rs +++ b/mumd/src/network.rs @@ -3,6 +3,16 @@ pub mod udp; use std::net::SocketAddr; +use futures::Future; +use futures::FutureExt; +use futures::channel::oneshot; +use futures::join; +use futures::pin_mut; +use futures::select; +use tokio::sync::watch; + +use crate::state::StatePhase; + #[derive(Clone, Debug)] pub struct ConnectionInfo { socket_addr: SocketAddr, @@ -25,3 +35,55 @@ pub enum VoiceStreamType { TCP, UDP, } + +async fn run_until<T, F, G, H>( + phase_checker: impl Fn(StatePhase) -> bool, + mut generator: impl FnMut() -> F, + mut handler: impl FnMut(T) -> G, + mut shutdown: impl FnMut() -> H, + mut phase_watcher: watch::Receiver<StatePhase>, +) where + F: Future<Output = Option<T>>, + G: Future<Output = ()>, + H: Future<Output = ()>, +{ + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + loop { + phase_watcher.changed().await.unwrap(); + if phase_checker(*phase_watcher.borrow()) { + break; + } + } + tx.send(true).unwrap(); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = generator().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + //warn!("Channel closed before disconnect command"); //TODO make me informative + break; + } + Some(Some(data)) => { + handler(data).await; + } + } + } + + shutdown().await; + }; + + join!(main_block, phase_transition_block); +} |
