use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt, Stream}; use futures_util::stream::{SplitSink, SplitStream}; use log::*; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; use mumble_protocol::voice::VoicePacket; use mumble_protocol::Serverbound; use std::collections::HashMap; use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{interval, Duration}; use tokio_util::udp::UdpFramed; use super::{run_until, VoiceStreamType}; pub type PingRequest = (u64, SocketAddr, Box); type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; type UdpReceiver = SplitStream>; pub async fn handle( state: Arc>, mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) { let receiver = state.lock().unwrap().audio().input_receiver(); loop { let connection_info = 'data: loop { while connection_info_receiver.changed().await.is_ok() { if let Some(data) = connection_info_receiver.borrow().clone() { break 'data data; } } return; }; let (sink, source) = connect(&mut crypt_state_receiver).await; let sink = Arc::new(Mutex::new(sink)); let source = Arc::new(Mutex::new(source)); let phase_watcher = state.lock().unwrap().phase_receiver(); let last_ping_recv = AtomicU64::new(0); join!( listen( Arc::clone(&state), Arc::clone(&source), phase_watcher.clone(), &last_ping_recv, ), send_voice( Arc::clone(&sink), connection_info.socket_addr, phase_watcher, Arc::clone(&receiver), ), send_pings( Arc::clone(&state), Arc::clone(&sink), connection_info.socket_addr, &last_ping_recv, ), new_crypt_state(&mut crypt_state_receiver, sink, source), ); debug!("Fully disconnected UDP stream, waiting for new connection info"); } } async fn connect( crypt_state: &mut mpsc::Receiver, ) -> (UdpSender, UdpReceiver) { // Bind UDP socket let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await .expect("Failed to bind UDP socket"); // Wait for initial CryptState let crypt_state = match crypt_state.recv().await { Some(crypt_state) => crypt_state, // disconnected before we received the CryptSetup packet, oh well None => panic!("Disconnect before crypt packet received"), //TODO exit gracefully }; debug!("UDP connected"); // Wrap the raw UDP packets in Mumble's crypto and voice codec (CryptState does both) UdpFramed::new(udp_socket, crypt_state).split() } async fn new_crypt_state( crypt_state: &mut mpsc::Receiver, sink: Arc>, source: Arc>, ) { loop { if let Some(crypt_state) = crypt_state.recv().await { info!("Received new crypt state"); let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await .expect("Failed to bind UDP socket"); let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split(); *sink.lock().unwrap() = new_sink; *source.lock().unwrap() = new_source; } } } async fn listen( state: Arc>, source: Arc>, mut phase_watcher: watch::Receiver, last_ping_recv: &AtomicU64, ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { loop { phase_watcher.changed().await.unwrap(); if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) { break; } } tx.send(true).unwrap(); }; let main_block = async { let rx = rx.fuse(); pin_mut!(rx); loop { let mut source = source.lock().unwrap(); let packet_recv = source.next().fuse(); pin_mut!(packet_recv); let exitor = select! { data = packet_recv => Some(data), _ = rx => None }; match exitor { None => { break; } Some(None) => { warn!("Channel closed before disconnect command"); break; } Some(Some(packet)) => { let (packet, _src_addr) = match packet { Ok(packet) => packet, Err(err) => { warn!("Got an invalid UDP packet: {}", err); // To be expected, considering this is the internet, just ignore it continue; } }; match packet { VoicePacket::Ping { timestamp } => { state .lock() .unwrap() .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP)); last_ping_recv.store(timestamp, Ordering::Relaxed); } VoicePacket::Audio { session_id, // seq_num, payload, // position_info, .. } => { state .lock() .unwrap() .audio() .decode_packet(VoiceStreamType::UDP, session_id, payload); } } } } } }; join!(main_block, phase_transition_block); debug!("UDP listener process killed"); } async fn send_pings( state: Arc>, sink: Arc>, server_addr: SocketAddr, last_ping_recv: &AtomicU64, ) { let mut last_send = None; let mut interval = interval(Duration::from_millis(1000)); loop { let last_recv = last_ping_recv.load(Ordering::Relaxed); if last_send.is_some() && last_send.unwrap() != last_recv { state .lock() .unwrap() .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); } match sink .lock() .unwrap() .send((VoicePacket::Ping { timestamp: 0 }, server_addr)) .await { Ok(_) => { last_send = Some(last_recv + 1); }, Err(e) => { debug!("Error sending UDP ping: {}", e); } } interval.tick().await; } } async fn send_voice( sink: Arc>, server_addr: SocketAddr, phase_watcher: watch::Receiver, receiver: Arc> + Unpin)>>>, ) { let inner_phase_watcher = phase_watcher.clone(); run_until( |phase| matches!(phase, StatePhase::Disconnected), || async { run_until( |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::UDP)), || async { sink.lock().unwrap().send((receiver.lock().unwrap().next().await.unwrap(), server_addr)).await.unwrap(); Some(Some(())) }, |_| async {}, || async {}, inner_phase_watcher.clone(), ).await; Some(Some(())) }, |_| async {}, || async {}, phase_watcher, ).await; debug!("UDP sender process killed"); } pub async fn handle_pings( mut ping_request_receiver: mpsc::UnboundedReceiver, ) { let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await .expect("Failed to bind UDP socket"); let pending = Rc::new(Mutex::new(HashMap::new())); let sender_handle = async { while let Some((id, socket_addr, handle)) = ping_request_receiver.recv().await { let packet = PingPacket { id }; let packet: [u8; 12] = packet.into(); udp_socket.send_to(&packet, &socket_addr).await.unwrap(); pending.lock().unwrap().insert(id, handle); } }; let receiver_handle = async { let mut buf = vec![0; 24]; while let Ok(read) = udp_socket.recv(&mut buf).await { assert_eq!(read, 24); let packet = PongPacket::try_from(buf.as_slice()).unwrap(); if let Some(handler) = pending.lock().unwrap().remove(&packet.id) { handler(packet); } } }; debug!("Waiting for ping requests"); join!(sender_handle, receiver_handle); }