diff options
| author | Eskil Queseth <eskilq@kth.se> | 2020-10-14 16:54:27 +0200 |
|---|---|---|
| committer | Eskil Queseth <eskilq@kth.se> | 2020-10-14 16:54:27 +0200 |
| commit | 7fb14d648aacd398f720f60236020dab6bf9fd35 (patch) | |
| tree | 52f4515aba225c25b006bdda82bf971a9a00f4bb /mumd/src/network/udp.rs | |
| parent | dcb71982eab550535298b2d879a3a83820a0798a (diff) | |
| download | mum-7fb14d648aacd398f720f60236020dab6bf9fd35.tar.gz | |
add support for disconnect command
Diffstat (limited to 'mumd/src/network/udp.rs')
| -rw-r--r-- | mumd/src/network/udp.rs | 144 |
1 files changed, 119 insertions, 25 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index cf0305b..ab4ca1d 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,10 +1,9 @@ use crate::network::ConnectionInfo; -use crate::state::State; +use crate::state::{State, StatePhase}; use log::*; use bytes::Bytes; -use futures::channel::oneshot; -use futures::{join, SinkExt, StreamExt}; +use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; @@ -12,7 +11,7 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::watch; +use tokio::sync::{watch, oneshot}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>; @@ -58,17 +57,80 @@ pub async fn handle( send_ping(&mut sink, connection_info.socket_addr).await; let sink = Arc::new(Mutex::new(sink)); + + let phase_watcher = state.lock().unwrap().phase_receiver(); join!( - listen(Arc::clone(&state), source), - send_voice(state, sink, connection_info.socket_addr), + listen(Arc::clone(&state), source, phase_watcher.clone()), + send_voice(state, sink, connection_info.socket_addr, phase_watcher), ); + + debug!("Fully disconnected UPD stream"); } async fn listen( state: Arc<Mutex<State>>, mut source: UdpReceiver, + mut phase_watcher: watch::Receiver<StatePhase>, ) { - while let Some(packet) = source.next().await { + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = source.next().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + warn!("Channel closed before disconnect command"); + break; + } + Some(Some(packet)) => { + let (packet, _src_addr) = match packet { + Ok(packet) => packet, + Err(err) => { + warn!("Got an invalid UDP packet: {}", err); + // To be expected, considering this is the internet, just ignore it + continue; + } + }; + match packet { + VoicePacket::Ping { .. } => { + // Note: A normal application would handle these and only use UDP for voice + // once it has received one. + continue; + } + VoicePacket::Audio { + session_id, + // seq_num, + payload, + // position_info, + .. + } => { + state.lock().unwrap().audio().decode_packet(session_id, payload); + } + } + } + } + } + }; + + join!(main_block, phase_transition_block); + + debug!("UDP listener process killed"); + + /*while let Some(packet) = source.next().await { let (packet, _src_addr) = match packet { Ok(packet) => packet, Err(err) => { @@ -93,7 +155,7 @@ async fn listen( state.lock().unwrap().audio().decode_packet(session_id, payload); } } - } + }*/ } async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { @@ -116,25 +178,57 @@ async fn send_voice( state: Arc<Mutex<State>>, sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr, + mut phase_watcher: watch::Receiver<StatePhase>, ) { let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); - let mut count = 0; - while let Some(payload) = receiver.recv().await { - let reply = VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: count, - payload, - position_info: None, - }; - count += 1; - sink.lock() - .unwrap() - .send((reply, server_addr)) - .await - .unwrap(); - } + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + let mut count = 0; + loop { + let packet_recv = receiver.recv().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + warn!("Channel closed before disconnect command"); + break; + } + Some(Some(payload)) => { + let reply = VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: count, + payload, + position_info: None, + }; + count += 1; + sink.lock() + .unwrap() + .send((reply, server_addr)) + .await + .unwrap(); + } + } + } + }; + + join!(main_block, phase_transition_block); + + debug!("UDP listener process killed"); } |
