diff options
| author | Eskil Queseth <eskilq@kth.se> | 2020-10-14 17:45:04 +0200 |
|---|---|---|
| committer | Eskil Queseth <eskilq@kth.se> | 2020-10-14 17:45:04 +0200 |
| commit | ab0cdc240c65fdc6b764ed17f6611786d449acc3 (patch) | |
| tree | bbad07ff338616c17208cf257eb3c6d359eb857e /mumd/src/network/udp.rs | |
| parent | e4406676a28f2dfb756f8f9e38a4242166f19c0e (diff) | |
| download | mum-ab0cdc240c65fdc6b764ed17f6611786d449acc3.tar.gz | |
add support for reconnecting to server
Diffstat (limited to 'mumd/src/network/udp.rs')
| -rw-r--r-- | mumd/src/network/udp.rs | 105 |
1 files changed, 40 insertions, 65 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index a757a2b..45e6e80 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -11,14 +11,48 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::{watch, oneshot}; +use tokio::sync::{watch, oneshot, mpsc}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>; type UdpReceiver = SplitStream<UdpFramed<ClientCryptState>>; +pub async fn handle( + state: Arc<Mutex<State>>, + mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, + mut crypt_state: mpsc::Receiver<ClientCryptState>, +) { + let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); + + loop { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; + let (mut sink, source) = connect(&mut crypt_state).await; + + // Note: A normal application would also send periodic Ping packets, and its own audio + // via UDP. We instead trick the server into accepting us by sending it one + // dummy voice packet. + send_ping(&mut sink, connection_info.socket_addr).await; + + let sink = Arc::new(Mutex::new(sink)); + + let phase_watcher = state.lock().unwrap().phase_receiver(); + join!( + listen(Arc::clone(&state), source, phase_watcher.clone()), + send_voice(sink, connection_info.socket_addr, phase_watcher, &mut receiver), + ); + + debug!("Fully disconnected UDP stream, waiting for new connection info"); + } +} + pub async fn connect( - crypt_state: oneshot::Receiver<ClientCryptState>, + crypt_state: &mut mpsc::Receiver<ClientCryptState>, ) -> (UdpSender, UdpReceiver) { // Bind UDP socket let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) @@ -26,10 +60,10 @@ pub async fn connect( .expect("Failed to bind UDP socket"); // Wait for initial CryptState - let crypt_state = match crypt_state.await { - Ok(crypt_state) => crypt_state, + let crypt_state = match crypt_state.recv().await { + Some(crypt_state) => crypt_state, // disconnected before we received the CryptSetup packet, oh well - Err(_) => panic!("disconnect before crypt packet received"), //TODO exit gracefully + None => panic!("disconnect before crypt packet received"), //TODO exit gracefully }; debug!("UDP connected"); @@ -37,36 +71,6 @@ pub async fn connect( UdpFramed::new(udp_socket, crypt_state).split() } -pub async fn handle( - state: Arc<Mutex<State>>, - mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, - crypt_state: oneshot::Receiver<ClientCryptState>, -) { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { return; } - Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } - } - }; - let (mut sink, source) = connect(crypt_state).await; - - // Note: A normal application would also send periodic Ping packets, and its own audio - // via UDP. We instead trick the server into accepting us by sending it one - // dummy voice packet. - send_ping(&mut sink, connection_info.socket_addr).await; - - let sink = Arc::new(Mutex::new(sink)); - - let phase_watcher = state.lock().unwrap().phase_receiver(); - join!( - listen(Arc::clone(&state), source, phase_watcher.clone()), - send_voice(state, sink, connection_info.socket_addr, phase_watcher), - ); - - debug!("Fully disconnected UPD stream"); -} - async fn listen( state: Arc<Mutex<State>>, mut source: UdpReceiver, @@ -129,33 +133,6 @@ async fn listen( join!(main_block, phase_transition_block); debug!("UDP listener process killed"); - - /*while let Some(packet) = source.next().await { - let (packet, _src_addr) = match packet { - Ok(packet) => packet, - Err(err) => { - warn!("Got an invalid UDP packet: {}", err); - // To be expected, considering this is the internet, just ignore it - continue; - } - }; - match packet { - VoicePacket::Ping { .. } => { - // Note: A normal application would handle these and only use UDP for voice - // once it has received one. - continue; - } - VoicePacket::Audio { - session_id, - // seq_num, - payload, - // position_info, - .. - } => { - state.lock().unwrap().audio().decode_packet(session_id, payload); - } - } - }*/ } async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { @@ -175,13 +152,11 @@ async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { } async fn send_voice( - state: Arc<Mutex<State>>, sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr, mut phase_watcher: watch::Receiver<StatePhase>, + receiver: &mut mpsc::Receiver<VoicePacketPayload>, ) { - let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); - let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} |
