aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network/udp.rs
diff options
context:
space:
mode:
authorEskil Queseth <eskilq@kth.se>2020-10-14 16:54:27 +0200
committerEskil Queseth <eskilq@kth.se>2020-10-14 16:54:27 +0200
commit7fb14d648aacd398f720f60236020dab6bf9fd35 (patch)
tree52f4515aba225c25b006bdda82bf971a9a00f4bb /mumd/src/network/udp.rs
parentdcb71982eab550535298b2d879a3a83820a0798a (diff)
downloadmum-7fb14d648aacd398f720f60236020dab6bf9fd35.tar.gz
add support for disconnect command
Diffstat (limited to 'mumd/src/network/udp.rs')
-rw-r--r--mumd/src/network/udp.rs144
1 files changed, 119 insertions, 25 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index cf0305b..ab4ca1d 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -1,10 +1,9 @@
use crate::network::ConnectionInfo;
-use crate::state::State;
+use crate::state::{State, StatePhase};
use log::*;
use bytes::Bytes;
-use futures::channel::oneshot;
-use futures::{join, SinkExt, StreamExt};
+use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt};
use futures_util::stream::{SplitSink, SplitStream};
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::voice::{VoicePacket, VoicePacketPayload};
@@ -12,7 +11,7 @@ use mumble_protocol::Serverbound;
use std::net::{Ipv6Addr, SocketAddr};
use std::sync::{Arc, Mutex};
use tokio::net::UdpSocket;
-use tokio::sync::watch;
+use tokio::sync::{watch, oneshot};
use tokio_util::udp::UdpFramed;
type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>;
@@ -58,17 +57,80 @@ pub async fn handle(
send_ping(&mut sink, connection_info.socket_addr).await;
let sink = Arc::new(Mutex::new(sink));
+
+ let phase_watcher = state.lock().unwrap().phase_receiver();
join!(
- listen(Arc::clone(&state), source),
- send_voice(state, sink, connection_info.socket_addr),
+ listen(Arc::clone(&state), source, phase_watcher.clone()),
+ send_voice(state, sink, connection_info.socket_addr, phase_watcher),
);
+
+ debug!("Fully disconnected UPD stream");
}
async fn listen(
state: Arc<Mutex<State>>,
mut source: UdpReceiver,
+ mut phase_watcher: watch::Receiver<StatePhase>,
) {
- while let Some(packet) = source.next().await {
+ let (tx, rx) = oneshot::channel();
+ let phase_transition_block = async {
+ while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {}
+ tx.send(true);
+ };
+
+ let main_block = async {
+ let rx = rx.fuse();
+ pin_mut!(rx);
+ loop {
+ let packet_recv = source.next().fuse();
+ pin_mut!(packet_recv);
+ let exitor = select! {
+ data = packet_recv => Some(data),
+ _ = rx => None
+ };
+ match exitor {
+ None => {
+ break;
+ }
+ Some(None) => {
+ warn!("Channel closed before disconnect command");
+ break;
+ }
+ Some(Some(packet)) => {
+ let (packet, _src_addr) = match packet {
+ Ok(packet) => packet,
+ Err(err) => {
+ warn!("Got an invalid UDP packet: {}", err);
+ // To be expected, considering this is the internet, just ignore it
+ continue;
+ }
+ };
+ match packet {
+ VoicePacket::Ping { .. } => {
+ // Note: A normal application would handle these and only use UDP for voice
+ // once it has received one.
+ continue;
+ }
+ VoicePacket::Audio {
+ session_id,
+ // seq_num,
+ payload,
+ // position_info,
+ ..
+ } => {
+ state.lock().unwrap().audio().decode_packet(session_id, payload);
+ }
+ }
+ }
+ }
+ }
+ };
+
+ join!(main_block, phase_transition_block);
+
+ debug!("UDP listener process killed");
+
+ /*while let Some(packet) = source.next().await {
let (packet, _src_addr) = match packet {
Ok(packet) => packet,
Err(err) => {
@@ -93,7 +155,7 @@ async fn listen(
state.lock().unwrap().audio().decode_packet(session_id, payload);
}
}
- }
+ }*/
}
async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) {
@@ -116,25 +178,57 @@ async fn send_voice(
state: Arc<Mutex<State>>,
sink: Arc<Mutex<UdpSender>>,
server_addr: SocketAddr,
+ mut phase_watcher: watch::Receiver<StatePhase>,
) {
let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap();
- let mut count = 0;
- while let Some(payload) = receiver.recv().await {
- let reply = VoicePacket::Audio {
- _dst: std::marker::PhantomData,
- target: 0, // normal speech
- session_id: (), // unused for server-bound packets
- seq_num: count,
- payload,
- position_info: None,
- };
- count += 1;
- sink.lock()
- .unwrap()
- .send((reply, server_addr))
- .await
- .unwrap();
- }
+ let (tx, rx) = oneshot::channel();
+ let phase_transition_block = async {
+ while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {}
+ tx.send(true);
+ };
+
+ let main_block = async {
+ let rx = rx.fuse();
+ pin_mut!(rx);
+ let mut count = 0;
+ loop {
+ let packet_recv = receiver.recv().fuse();
+ pin_mut!(packet_recv);
+ let exitor = select! {
+ data = packet_recv => Some(data),
+ _ = rx => None
+ };
+ match exitor {
+ None => {
+ break;
+ }
+ Some(None) => {
+ warn!("Channel closed before disconnect command");
+ break;
+ }
+ Some(Some(payload)) => {
+ let reply = VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0, // normal speech
+ session_id: (), // unused for server-bound packets
+ seq_num: count,
+ payload,
+ position_info: None,
+ };
+ count += 1;
+ sink.lock()
+ .unwrap()
+ .send((reply, server_addr))
+ .await
+ .unwrap();
+ }
+ }
+ }
+ };
+
+ join!(main_block, phase_transition_block);
+
+ debug!("UDP listener process killed");
}