use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; use bytes::Bytes; use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; use mumble_protocol::Serverbound; use std::collections::HashMap; use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; use std::rc::Rc; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; type UdpReceiver = SplitStream>; pub async fn handle( state: Arc>, mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) { let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); loop { let connection_info = 'data: loop { while let Ok(()) = connection_info_receiver.changed().await { if let Some(data) = connection_info_receiver.borrow().clone() { break 'data data; } } return; // match connection_info_receiver.recv().await { // None => { // return; // } // Some(None) => {} // Some(Some(connection_info)) => { // break connection_info; // } // } }; let (mut sink, source) = connect(&mut crypt_state_receiver).await; // Note: A normal application would also send periodic Ping packets, and its own audio // via UDP. We instead trick the server into accepting us by sending it one // dummy voice packet. send_ping(&mut sink, connection_info.socket_addr).await; let sink = Arc::new(Mutex::new(sink)); let source = Arc::new(Mutex::new(source)); let phase_watcher = state.lock().unwrap().phase_receiver(); join!( listen(Arc::clone(&state), Arc::clone(&source), phase_watcher.clone()), send_voice( Arc::clone(&sink), connection_info.socket_addr, phase_watcher, &mut receiver ), new_crypt_state(&mut crypt_state_receiver, sink, source) ); debug!("Fully disconnected UDP stream, waiting for new connection info"); } } async fn connect( crypt_state: &mut mpsc::Receiver, ) -> (UdpSender, UdpReceiver) { // Bind UDP socket let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await .expect("Failed to bind UDP socket"); // Wait for initial CryptState let crypt_state = match crypt_state.recv().await { Some(crypt_state) => crypt_state, // disconnected before we received the CryptSetup packet, oh well None => panic!("Disconnect before crypt packet received"), //TODO exit gracefully }; debug!("UDP connected"); // Wrap the raw UDP packets in Mumble's crypto and voice codec (CryptState does both) UdpFramed::new(udp_socket, crypt_state).split() } async fn new_crypt_state( crypt_state: &mut mpsc::Receiver, sink: Arc>, source: Arc>, ) { loop { match crypt_state.recv().await { Some(crypt_state) => { info!("Received new crypt state"); let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await .expect("Failed to bind UDP socket"); let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split(); *sink.lock().unwrap() = new_sink; *source.lock().unwrap() = new_source; }, None => {}, } } } async fn listen( state: Arc>, source: Arc>, mut phase_watcher: watch::Receiver, ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { loop { phase_watcher.changed().await.unwrap(); if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) { break; } } // while !matches!( // phase_watcher.recv().await.unwrap(), // StatePhase::Disconnected // ) {} tx.send(true).unwrap(); }; let main_block = async { let rx = rx.fuse(); pin_mut!(rx); loop { let mut source = source.lock().unwrap(); let packet_recv = source.next().fuse(); pin_mut!(packet_recv); let exitor = select! { data = packet_recv => Some(data), _ = rx => None }; match exitor { None => { break; } Some(None) => { warn!("Channel closed before disconnect command"); break; } Some(Some(packet)) => { let (packet, _src_addr) = match packet { Ok(packet) => packet, Err(err) => { warn!("Got an invalid UDP packet: {}", err); // To be expected, considering this is the internet, just ignore it continue; } }; match packet { VoicePacket::Ping { .. } => { // Note: A normal application would handle these and only use UDP for voice // once it has received one. continue; } VoicePacket::Audio { session_id, // seq_num, payload, // position_info, .. } => { state .lock() .unwrap() .audio() .decode_packet(session_id, payload); } } } } } }; join!(main_block, phase_transition_block); debug!("UDP listener process killed"); } async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { sink.send(( VoicePacket::Audio { _dst: std::marker::PhantomData, target: 0, session_id: (), seq_num: 0, payload: VoicePacketPayload::Opus(Bytes::from([0u8; 128].as_ref()), true), position_info: None, }, server_addr, )) .await .unwrap(); } async fn send_voice( sink: Arc>, server_addr: SocketAddr, mut phase_watcher: watch::Receiver, receiver: &mut mpsc::Receiver, ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { loop { phase_watcher.changed().await.unwrap(); if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) { break; } } // while !matches!( // phase_watcher.recv().await.unwrap(), // StatePhase::Disconnected // ) {} tx.send(true).unwrap(); }; let main_block = async { let rx = rx.fuse(); pin_mut!(rx); let mut count = 0; loop { let packet_recv = receiver.recv().fuse(); pin_mut!(packet_recv); let exitor = select! { data = packet_recv => Some(data), _ = rx => None }; match exitor { None => { break; } Some(None) => { warn!("Channel closed before disconnect command"); break; } Some(Some(payload)) => { let reply = VoicePacket::Audio { _dst: std::marker::PhantomData, target: 0, // normal speech session_id: (), // unused for server-bound packets seq_num: count, payload, position_info: None, }; count += 1; sink.lock() .unwrap() .send((reply, server_addr)) .await .unwrap(); } } } }; join!(main_block, phase_transition_block); debug!("UDP sender process killed"); } pub async fn handle_pings( mut ping_request_receiver: mpsc::UnboundedReceiver<( u64, SocketAddr, Box, )>, ) { let udp_socket = Arc::new(UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await .expect("Failed to bind UDP socket")); // let (mut receiver, mut sender) = udp_socket.split(); let pending = Rc::new(Mutex::new(HashMap::new())); let sender_handle = async { while let Some((id, socket_addr, handle)) = ping_request_receiver.recv().await { let packet = PingPacket { id }; let packet: [u8; 12] = packet.into(); udp_socket.send_to(&packet, &socket_addr).await.unwrap(); pending.lock().unwrap().insert(id, handle); } }; let receiver_handle = async { let mut buf = vec![0; 24]; while let Ok(read) = udp_socket.recv(&mut buf).await { assert_eq!(read, 24); let packet = PongPacket::try_from(buf.as_slice()).unwrap(); if let Some(handler) = pending.lock().unwrap().remove(&packet.id) { handler(packet); } } }; debug!("Waiting for ping requests"); join!(sender_handle, receiver_handle); }