From 8b5546bef973652778a60280ab2ebbc528ad1e48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Sat, 28 Nov 2020 01:24:26 +0100 Subject: possibly handle new crypt states --- mumd/src/network/udp.rs | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) (limited to 'mumd/src/network/udp.rs') diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index f97807d..b303f8c 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -24,7 +24,7 @@ type UdpReceiver = SplitStream>; pub async fn handle( state: Arc>, mut connection_info_receiver: watch::Receiver>, - mut crypt_state: mpsc::Receiver, + mut crypt_state_receiver: mpsc::Receiver, ) { let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); @@ -40,7 +40,7 @@ pub async fn handle( } } }; - let (mut sink, source) = connect(&mut crypt_state).await; + let (mut sink, source) = connect(&mut crypt_state_receiver).await; // Note: A normal application would also send periodic Ping packets, and its own audio // via UDP. We instead trick the server into accepting us by sending it one @@ -48,23 +48,25 @@ pub async fn handle( send_ping(&mut sink, connection_info.socket_addr).await; let sink = Arc::new(Mutex::new(sink)); + let source = Arc::new(Mutex::new(source)); let phase_watcher = state.lock().unwrap().phase_receiver(); join!( - listen(Arc::clone(&state), source, phase_watcher.clone()), + listen(Arc::clone(&state), Arc::clone(&source), phase_watcher.clone()), send_voice( - sink, + Arc::clone(&sink), connection_info.socket_addr, phase_watcher, &mut receiver ), + new_crypt_state(&mut crypt_state_receiver, sink, source) ); debug!("Fully disconnected UDP stream, waiting for new connection info"); } } -pub async fn connect( +async fn connect( crypt_state: &mut mpsc::Receiver, ) -> (UdpSender, UdpReceiver) { // Bind UDP socket @@ -84,9 +86,30 @@ pub async fn connect( UdpFramed::new(udp_socket, crypt_state).split() } +async fn new_crypt_state( + crypt_state: &mut mpsc::Receiver, + sink: Arc>, + source: Arc>, +) { + loop { + match crypt_state.recv().await { + Some(crypt_state) => { + info!("Received new crypt state"); + let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) + .await + .expect("Failed to bind UDP socket"); + let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split(); + *sink.lock().unwrap() = new_sink; + *source.lock().unwrap() = new_source; + }, + None => {}, + } + } +} + async fn listen( state: Arc>, - mut source: UdpReceiver, + source: Arc>, mut phase_watcher: watch::Receiver, ) { let (tx, rx) = oneshot::channel(); @@ -102,6 +125,7 @@ async fn listen( let rx = rx.fuse(); pin_mut!(rx); loop { + let mut source = source.lock().unwrap(); let packet_recv = source.next().fuse(); pin_mut!(packet_recv); let exitor = select! { -- cgit v1.2.1