From b583f6dbe521e01e879e16605026997dfa10c3d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 02:31:09 +0200 Subject: join different channels Co-authored-by: Eskil Queseth --- mumd/src/network/tcp.rs | 57 +++++++++++++++++++++++++++---------------------- mumd/src/network/udp.rs | 48 ++++++++++++++++++++--------------------- 2 files changed, 56 insertions(+), 49 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index dde98aa..fa4c4b6 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,5 +1,5 @@ -use crate::audio::Audio; -use crate::state::Server; +use crate::state::State; +use crate::command::Command; use log::*; use futures::channel::oneshot; @@ -12,6 +12,7 @@ use std::convert::{Into, TryInto}; use std::net::{SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -24,25 +25,26 @@ type TcpReceiver = SplitStream, ControlCodec>>; pub async fn handle( - server: Arc>, + state: Arc>, server_addr: SocketAddr, server_host: String, - username: String, accept_invalid_cert: bool, crypt_state_sender: oneshot::Sender, - audio: Arc>, + packet_receiver: mpsc::Receiver>, ) { let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; let sink = Arc::new(Mutex::new(sink)); + // Handshake (omitting `Version` message for brevity) - authenticate(Arc::clone(&sink), username).await; + authenticate(Arc::clone(&sink), state.lock().unwrap().username().to_string()).await; info!("Logging in..."); join!( send_pings(Arc::clone(&sink), 10), - listen(server, sink, stream, crypt_state_sender, audio), + listen(state, stream, crypt_state_sender), + send_packets(sink, packet_receiver), ); } @@ -72,6 +74,7 @@ async fn connect( ClientControlCodec::new().framed(tls_stream).split() } +//TODO &mut sink? async fn authenticate(sink: Arc>, username: String) { let mut msg = msgs::Authenticate::new(); msg.set_username(username); @@ -79,6 +82,7 @@ async fn authenticate(sink: Arc>, username: String) { sink.lock().unwrap().send(msg.into()).await.unwrap(); } +//TODO move somewhere else (main?) and send through packet_sender async fn send_pings(sink: Arc>, delay_seconds: u64) { let mut interval = time::interval(Duration::from_secs(delay_seconds)); loop { @@ -89,12 +93,18 @@ async fn send_pings(sink: Arc>, delay_seconds: u64) { } } +async fn send_packets(sink: Arc>, + mut packet_receiver: mpsc::Receiver>) { + + while let Some(packet) = packet_receiver.recv().await { + sink.lock().unwrap().send(packet).await.unwrap(); + } +} + async fn listen( - server: Arc>, - sink: Arc>, + state: Arc>, mut stream: TcpReceiver, crypt_state_sender: oneshot::Sender, - audio: Arc>, ) { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); @@ -102,18 +112,12 @@ async fn listen( while let Some(packet) = stream.next().await { //TODO handle types separately match packet.unwrap() { - ControlPacket::TextMessage(mut msg) => { + ControlPacket::TextMessage(msg) => { info!( "Got message from user with session ID {}: {}", msg.get_actor(), msg.get_message() ); - // Send reply back to server - let mut response = msgs::TextMessage::new(); - response.mut_session().push(msg.get_actor()); - response.set_message(msg.take_message()); - let mut lock = sink.lock().unwrap(); - lock.send(response.into()).await.unwrap(); } ControlPacket::CryptSetup(msg) => { debug!("Crypt setup"); @@ -139,7 +143,8 @@ async fn listen( .expect("Server didn't send us any CryptSetup packet!"), ); } - let mut server = server.lock().unwrap(); + let mut state = state.lock().unwrap(); + let server = state.server_mut(); server.parse_server_sync(msg); match &server.welcome_text { Some(s) => info!("Welcome: {}", s), @@ -148,16 +153,18 @@ async fn listen( for (_, channel) in server.channels() { info!("Found channel {}", channel.name()); } - sink.lock().unwrap().send(msgs::UserList::new().into()).await.unwrap(); + //TODO start listening for packets to send here + state.handle_command(Command::ChannelJoin{channel_id: 1}).await; } ControlPacket::Reject(msg) => { warn!("Login rejected: {:?}", msg); } ControlPacket::UserState(msg) => { - audio.lock().unwrap().add_client(msg.get_session()); - let mut server = server.lock().unwrap(); + let mut state = state.lock().unwrap(); let session = msg.get_session(); - server.parse_user_state(msg); + state.audio_mut().add_client(msg.get_session()); //TODO + state.parse_initial_user_state(msg); //TODO only if actually initiating state + let server = state.server_mut(); let user = server.users().get(&session).unwrap(); info!("User {} connected to {}", user.name(), @@ -165,14 +172,14 @@ async fn listen( } ControlPacket::UserRemove(msg) => { info!("User {} left", msg.get_session()); - audio.lock().unwrap().remove_client(msg.get_session()); + state.lock().unwrap().audio_mut().remove_client(msg.get_session()); } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - server.lock().unwrap().parse_channel_state(msg); + state.lock().unwrap().server_mut().parse_channel_state(msg); } ControlPacket::ChannelRemove(msg) => { - server.lock().unwrap().parse_channel_remove(msg); + state.lock().unwrap().server_mut().parse_channel_remove(msg); } _ => {} } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 39f16b6..5f76501 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,4 +1,4 @@ -use crate::audio::Audio; +use crate::state::State; use log::*; use bytes::Bytes; @@ -36,10 +36,28 @@ pub async fn connect( UdpFramed::new(udp_socket, crypt_state).split() } +pub async fn handle( + state: Arc>, + server_addr: SocketAddr, + crypt_state: oneshot::Receiver, +) { + let (mut sink, source) = connect(crypt_state).await; + + // Note: A normal application would also send periodic Ping packets, and its own audio + // via UDP. We instead trick the server into accepting us by sending it one + // dummy voice packet. + send_ping(&mut sink, server_addr).await; + + let sink = Arc::new(Mutex::new(sink)); + join!( + listen(Arc::clone(&state), source), + send_voice(state, sink, server_addr) + ); +} + async fn listen( - _sink: Arc>, + state: Arc>, mut source: UdpReceiver, - audio: Arc>, ) { while let Some(packet) = source.next().await { let (packet, _src_addr) = match packet { @@ -63,7 +81,7 @@ async fn listen( // position_info, .. } => { - audio.lock().unwrap().decode_packet(session_id, payload); + state.lock().unwrap().audio().decode_packet(session_id, payload); } } } @@ -86,11 +104,11 @@ async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { } async fn send_voice( + state: Arc>, sink: Arc>, server_addr: SocketAddr, - audio: Arc>, ) { - let mut receiver = audio.lock().unwrap().take_receiver().unwrap(); + let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); let mut count = 0; while let Some(payload) = receiver.recv().await { @@ -111,21 +129,3 @@ async fn send_voice( } } -pub async fn handle( - server_addr: SocketAddr, - crypt_state: oneshot::Receiver, - audio: Arc>, -) { - let (mut sink, source) = connect(crypt_state).await; - - // Note: A normal application would also send periodic Ping packets, and its own audio - // via UDP. We instead trick the server into accepting us by sending it one - // dummy voice packet. - send_ping(&mut sink, server_addr).await; - - let sink = Arc::new(Mutex::new(sink)); - join!( - listen(Arc::clone(&sink), source, Arc::clone(&audio)), - send_voice(sink, server_addr, audio) - ); -} -- cgit v1.2.1 From 503f6c90395682bf5d7fd3fb8a79bfcfc3c2f329 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 17:05:22 +0200 Subject: wait for complete state before sending commands --- mumd/src/network/tcp.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index fa4c4b6..72a2840 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -30,7 +30,7 @@ pub async fn handle( server_host: String, accept_invalid_cert: bool, crypt_state_sender: oneshot::Sender, - packet_receiver: mpsc::Receiver>, + packet_receiver: mpsc::UnboundedReceiver>, ) { let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; let sink = Arc::new(Mutex::new(sink)); @@ -94,7 +94,7 @@ async fn send_pings(sink: Arc>, delay_seconds: u64) { } async fn send_packets(sink: Arc>, - mut packet_receiver: mpsc::Receiver>) { + mut packet_receiver: mpsc::UnboundedReceiver>) { while let Some(packet) = packet_receiver.recv().await { sink.lock().unwrap().send(packet).await.unwrap(); @@ -153,8 +153,7 @@ async fn listen( for (_, channel) in server.channels() { info!("Found channel {}", channel.name()); } - //TODO start listening for packets to send here - state.handle_command(Command::ChannelJoin{channel_id: 1}).await; + state.initialized(); } ControlPacket::Reject(msg) => { warn!("Login rejected: {:?}", msg); -- cgit v1.2.1 From 321d0400bb8760ab215a602cc74f36a2a7dd6788 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 17:12:46 +0200 Subject: respect if we're initializing when parsing user state --- mumd/src/network/tcp.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 72a2840..3fc36a3 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -162,7 +162,11 @@ async fn listen( let mut state = state.lock().unwrap(); let session = msg.get_session(); state.audio_mut().add_client(msg.get_session()); //TODO - state.parse_initial_user_state(msg); //TODO only if actually initiating state + if *state.initialized_receiver().borrow() { + state.server_mut().parse_user_state(msg); + } else { + state.parse_initial_user_state(msg); + } let server = state.server_mut(); let user = server.users().get(&session).unwrap(); info!("User {} connected to {}", @@ -175,7 +179,7 @@ async fn listen( } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - state.lock().unwrap().server_mut().parse_channel_state(msg); + state.lock().unwrap().server_mut().parse_channel_state(msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { state.lock().unwrap().server_mut().parse_channel_remove(msg); -- cgit v1.2.1 From ccd7cbac5e8080240988b01cc9f2e64af9082f5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 23:58:51 +0200 Subject: send tcp pings via packet sender Co-authored-by: Eskil Queseth --- mumd/src/network/tcp.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 3fc36a3..f86447b 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -82,14 +82,14 @@ async fn authenticate(sink: Arc>, username: String) { sink.lock().unwrap().send(msg.into()).await.unwrap(); } -//TODO move somewhere else (main?) and send through packet_sender -async fn send_pings(sink: Arc>, delay_seconds: u64) { +async fn send_pings(packet_sender: mpsc::UnboundedSender>, + delay_seconds: u64) { let mut interval = time::interval(Duration::from_secs(delay_seconds)); loop { interval.tick().await; trace!("Sending ping"); let msg = msgs::Ping::new(); - sink.lock().unwrap().send(msg.into()).await.unwrap(); + packet_sender.send(msg.into()).unwrap(); } } -- cgit v1.2.1 From 6693c994f1baa6fc08787ef47759f58834f6c3c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 23:59:16 +0200 Subject: remove shared mutability of tcp sink Co-authored-by: Eskil Queseth --- mumd/src/network/tcp.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index f86447b..6f60b63 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,5 +1,4 @@ use crate::state::State; -use crate::command::Command; use log::*; use futures::channel::oneshot; @@ -32,17 +31,15 @@ pub async fn handle( crypt_state_sender: oneshot::Sender, packet_receiver: mpsc::UnboundedReceiver>, ) { - let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; - let sink = Arc::new(Mutex::new(sink)); - + let (mut sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; // Handshake (omitting `Version` message for brevity) - authenticate(Arc::clone(&sink), state.lock().unwrap().username().to_string()).await; + authenticate(&mut sink, state.lock().unwrap().username().to_string()).await; info!("Logging in..."); join!( - send_pings(Arc::clone(&sink), 10), + send_pings(state.lock().unwrap().packet_sender(), 10), listen(state, stream, crypt_state_sender), send_packets(sink, packet_receiver), ); @@ -74,12 +71,11 @@ async fn connect( ClientControlCodec::new().framed(tls_stream).split() } -//TODO &mut sink? -async fn authenticate(sink: Arc>, username: String) { +async fn authenticate(sink: &mut TcpSender, username: String) { let mut msg = msgs::Authenticate::new(); msg.set_username(username); msg.set_opus(true); - sink.lock().unwrap().send(msg.into()).await.unwrap(); + sink.send(msg.into()).await.unwrap(); } async fn send_pings(packet_sender: mpsc::UnboundedSender>, @@ -93,11 +89,11 @@ async fn send_pings(packet_sender: mpsc::UnboundedSender>, +async fn send_packets(mut sink: TcpSender, mut packet_receiver: mpsc::UnboundedReceiver>) { while let Some(packet) = packet_receiver.recv().await { - sink.lock().unwrap().send(packet).await.unwrap(); + sink.send(packet).await.unwrap(); } } -- cgit v1.2.1 From 3d8009a0201fba0bdc464fae0797d3bb3bcf69f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 01:48:07 +0200 Subject: wip handle more commands (panics) --- mumd/src/network/mod.rs | 23 +++++++++++++++++++++++ mumd/src/network/tcp.rs | 29 +++++++++++++++++++---------- mumd/src/network/udp.rs | 15 ++++++++++++--- 3 files changed, 54 insertions(+), 13 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/mod.rs b/mumd/src/network/mod.rs index f7a6a76..777faad 100644 --- a/mumd/src/network/mod.rs +++ b/mumd/src/network/mod.rs @@ -1,2 +1,25 @@ pub mod tcp; pub mod udp; + +use std::net::SocketAddr; + +#[derive(Clone, Debug)] +pub struct ConnectionInfo { + socket_addr: SocketAddr, + hostname: String, + accept_invalid_cert: bool, +} + +impl ConnectionInfo { + pub fn new( + socket_addr: SocketAddr, + hostname: String, + accept_invalid_cert: bool, + ) -> Self { + Self { + socket_addr, + hostname, + accept_invalid_cert, + } + } +} diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 6f60b63..9fb5ae4 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,4 +1,5 @@ -use crate::state::State; +use crate::network::ConnectionInfo; +use crate::state::{State, StatePhase}; use log::*; use futures::channel::oneshot; @@ -11,7 +12,7 @@ use std::convert::{Into, TryInto}; use std::net::{SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -25,16 +26,24 @@ type TcpReceiver = pub async fn handle( state: Arc>, - server_addr: SocketAddr, - server_host: String, - accept_invalid_cert: bool, + mut connection_info_receiver: watch::Receiver>, crypt_state_sender: oneshot::Sender, packet_receiver: mpsc::UnboundedReceiver>, ) { - let (mut sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; + let (mut sink, stream) = connect(connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert) + .await; // Handshake (omitting `Version` message for brevity) - authenticate(&mut sink, state.lock().unwrap().username().to_string()).await; + authenticate(&mut sink, state.lock().unwrap().username().unwrap().to_string()).await; info!("Logging in..."); @@ -158,10 +167,10 @@ async fn listen( let mut state = state.lock().unwrap(); let session = msg.get_session(); state.audio_mut().add_client(msg.get_session()); //TODO - if *state.initialized_receiver().borrow() { - state.server_mut().parse_user_state(msg); - } else { + if *state.phase_receiver().borrow() == StatePhase::Connecting { state.parse_initial_user_state(msg); + } else { + state.server_mut().parse_user_state(msg); } let server = state.server_mut(); let user = server.users().get(&session).unwrap(); diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 5f76501..cf0305b 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,3 +1,4 @@ +use crate::network::ConnectionInfo; use crate::state::State; use log::*; @@ -11,6 +12,7 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; +use tokio::sync::watch; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; @@ -38,20 +40,27 @@ pub async fn connect( pub async fn handle( state: Arc>, - server_addr: SocketAddr, + mut connection_info_receiver: watch::Receiver>, crypt_state: oneshot::Receiver, ) { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; let (mut sink, source) = connect(crypt_state).await; // Note: A normal application would also send periodic Ping packets, and its own audio // via UDP. We instead trick the server into accepting us by sending it one // dummy voice packet. - send_ping(&mut sink, server_addr).await; + send_ping(&mut sink, connection_info.socket_addr).await; let sink = Arc::new(Mutex::new(sink)); join!( listen(Arc::clone(&state), source), - send_voice(state, sink, server_addr) + send_voice(state, sink, connection_info.socket_addr), ); } -- cgit v1.2.1 From 7fb14d648aacd398f720f60236020dab6bf9fd35 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 16:54:27 +0200 Subject: add support for disconnect command --- mumd/src/network/tcp.rs | 291 +++++++++++++++++++++++++++++++++--------------- mumd/src/network/udp.rs | 144 +++++++++++++++++++----- 2 files changed, 318 insertions(+), 117 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 9fb5ae4..0aca19e 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -2,8 +2,8 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; -use futures::channel::oneshot; -use futures::{join, SinkExt, StreamExt}; +use tokio::sync::oneshot; +use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; @@ -16,6 +16,7 @@ use tokio::sync::{mpsc, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; +use futures_util::core_reexport::cell::RefCell; type TcpSender = SplitSink< Framed, ControlCodec>, @@ -43,15 +44,21 @@ pub async fn handle( .await; // Handshake (omitting `Version` message for brevity) - authenticate(&mut sink, state.lock().unwrap().username().unwrap().to_string()).await; + let mut state_lock = state.lock().unwrap(); + authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; + let phase_watcher = state_lock.phase_receiver(); + let packet_sender = state_lock.packet_sender(); + drop(state_lock); info!("Logging in..."); join!( - send_pings(state.lock().unwrap().packet_sender(), 10), - listen(state, stream, crypt_state_sender), - send_packets(sink, packet_receiver), + send_pings(packet_sender, 10, phase_watcher.clone()), + listen(state, stream, crypt_state_sender, phase_watcher.clone()), + send_packets(sink, packet_receiver, phase_watcher), ); + + debug!("Fully disconnected TCP stream"); } async fn connect( @@ -87,109 +94,209 @@ async fn authenticate(sink: &mut TcpSender, username: String) { sink.send(msg.into()).await.unwrap(); } -async fn send_pings(packet_sender: mpsc::UnboundedSender>, - delay_seconds: u64) { +async fn send_pings( + packet_sender: mpsc::UnboundedSender>, + delay_seconds: u64, + mut phase_watcher: watch::Receiver, +) { + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + let mut interval = time::interval(Duration::from_secs(delay_seconds)); - loop { - interval.tick().await; - trace!("Sending ping"); - let msg = msgs::Ping::new(); - packet_sender.send(msg.into()).unwrap(); - } + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let interval_waiter = interval.tick().fuse(); + pin_mut!(interval_waiter); + let exitor = select! { + data = interval_waiter => Some(data), + _ = rx => None + }; + + match exitor { + Some(_) => { + trace!("Sending ping"); + let msg = msgs::Ping::new(); + packet_sender.send(msg.into()).unwrap(); + } + None => break, + } + } + }; + + join!(main_block, phase_transition_block); + + debug!("Ping sender process killed"); } -async fn send_packets(mut sink: TcpSender, - mut packet_receiver: mpsc::UnboundedReceiver>) { +async fn send_packets( + mut sink: TcpSender, + mut packet_receiver: mpsc::UnboundedReceiver>, + mut phase_watcher: watch::Receiver, +) { + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = packet_receiver.recv().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + warn!("Channel closed before disconnect command"); + break; + } + Some(Some(packet)) => { + sink.send(packet).await.unwrap(); + } + } + } - while let Some(packet) = packet_receiver.recv().await { - sink.send(packet).await.unwrap(); - } + //clears queue of remaining packets + while let Ok(_) = packet_receiver.try_recv() {} + + sink.close().await.unwrap(); + }; + + join!(main_block, phase_transition_block); + + debug!("TCP packet sender killed"); } async fn listen( state: Arc>, mut stream: TcpReceiver, crypt_state_sender: oneshot::Sender, + mut phase_watcher: watch::Receiver, ) { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); - while let Some(packet) = stream.next().await { - //TODO handle types separately - match packet.unwrap() { - ControlPacket::TextMessage(msg) => { - info!( - "Got message from user with session ID {}: {}", - msg.get_actor(), - msg.get_message() - ); - } - ControlPacket::CryptSetup(msg) => { - debug!("Crypt setup"); - // Wait until we're fully connected before initiating UDP voice - crypt_state = Some(ClientCryptState::new_from( - msg.get_key() - .try_into() - .expect("Server sent private key with incorrect size"), - msg.get_client_nonce() - .try_into() - .expect("Server sent client_nonce with incorrect size"), - msg.get_server_nonce() - .try_into() - .expect("Server sent server_nonce with incorrect size"), - )); - } - ControlPacket::ServerSync(msg) => { - info!("Logged in"); - if let Some(sender) = crypt_state_sender.take() { - let _ = sender.send( - crypt_state - .take() - .expect("Server didn't send us any CryptSetup packet!"), - ); - } - let mut state = state.lock().unwrap(); - let server = state.server_mut(); - server.parse_server_sync(msg); - match &server.welcome_text { - Some(s) => info!("Welcome: {}", s), - None => info!("No welcome received"), + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + + let listener_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = stream.next().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; } - for (_, channel) in server.channels() { - info!("Found channel {}", channel.name()); + Some(None) => { + warn!("Channel closed before disconnect command"); + break; } - state.initialized(); - } - ControlPacket::Reject(msg) => { - warn!("Login rejected: {:?}", msg); - } - ControlPacket::UserState(msg) => { - let mut state = state.lock().unwrap(); - let session = msg.get_session(); - state.audio_mut().add_client(msg.get_session()); //TODO - if *state.phase_receiver().borrow() == StatePhase::Connecting { - state.parse_initial_user_state(msg); - } else { - state.server_mut().parse_user_state(msg); + Some(Some(packet)) => { + //TODO handle types separately + match packet.unwrap() { + ControlPacket::TextMessage(msg) => { + info!( + "Got message from user with session ID {}: {}", + msg.get_actor(), + msg.get_message() + ); + } + ControlPacket::CryptSetup(msg) => { + debug!("Crypt setup"); + // Wait until we're fully connected before initiating UDP voice + crypt_state = Some(ClientCryptState::new_from( + msg.get_key() + .try_into() + .expect("Server sent private key with incorrect size"), + msg.get_client_nonce() + .try_into() + .expect("Server sent client_nonce with incorrect size"), + msg.get_server_nonce() + .try_into() + .expect("Server sent server_nonce with incorrect size"), + )); + } + ControlPacket::ServerSync(msg) => { + info!("Logged in"); + if let Some(sender) = crypt_state_sender.take() { + let _ = sender.send( + crypt_state + .take() + .expect("Server didn't send us any CryptSetup packet!"), + ); + } + let mut state = state.lock().unwrap(); + let server = state.server_mut(); + server.parse_server_sync(msg); + match &server.welcome_text { + Some(s) => info!("Welcome: {}", s), + None => info!("No welcome received"), + } + for (_, channel) in server.channels() { + info!("Found channel {}", channel.name()); + } + state.initialized(); + } + ControlPacket::Reject(msg) => { + warn!("Login rejected: {:?}", msg); + } + ControlPacket::UserState(msg) => { + let mut state = state.lock().unwrap(); + let session = msg.get_session(); + state.audio_mut().add_client(msg.get_session()); //TODO + if *state.phase_receiver().borrow() == StatePhase::Connecting { + state.parse_initial_user_state(msg); + } else { + state.server_mut().parse_user_state(msg); + } + let server = state.server_mut(); + let user = server.users().get(&session).unwrap(); + info!("User {} connected to {}", + user.name(), + user.channel()); + } + ControlPacket::UserRemove(msg) => { + info!("User {} left", msg.get_session()); + state.lock().unwrap().audio_mut().remove_client(msg.get_session()); + } + ControlPacket::ChannelState(msg) => { + debug!("Channel state received"); + state.lock().unwrap().server_mut().parse_channel_state(msg); //TODO parse initial if initial + } + ControlPacket::ChannelRemove(msg) => { + state.lock().unwrap().server_mut().parse_channel_remove(msg); + } + _ => {} + } } - let server = state.server_mut(); - let user = server.users().get(&session).unwrap(); - info!("User {} connected to {}", - user.name(), - user.channel()); - } - ControlPacket::UserRemove(msg) => { - info!("User {} left", msg.get_session()); - state.lock().unwrap().audio_mut().remove_client(msg.get_session()); - } - ControlPacket::ChannelState(msg) => { - debug!("Channel state received"); - state.lock().unwrap().server_mut().parse_channel_state(msg); //TODO parse initial if initial - } - ControlPacket::ChannelRemove(msg) => { - state.lock().unwrap().server_mut().parse_channel_remove(msg); } - _ => {} } - } + + //TODO? clean up stream + }; + + join!(phase_transition_block, listener_block); + + debug!("Killing TCP listener block"); } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index cf0305b..ab4ca1d 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,10 +1,9 @@ use crate::network::ConnectionInfo; -use crate::state::State; +use crate::state::{State, StatePhase}; use log::*; use bytes::Bytes; -use futures::channel::oneshot; -use futures::{join, SinkExt, StreamExt}; +use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; @@ -12,7 +11,7 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::watch; +use tokio::sync::{watch, oneshot}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; @@ -58,17 +57,80 @@ pub async fn handle( send_ping(&mut sink, connection_info.socket_addr).await; let sink = Arc::new(Mutex::new(sink)); + + let phase_watcher = state.lock().unwrap().phase_receiver(); join!( - listen(Arc::clone(&state), source), - send_voice(state, sink, connection_info.socket_addr), + listen(Arc::clone(&state), source, phase_watcher.clone()), + send_voice(state, sink, connection_info.socket_addr, phase_watcher), ); + + debug!("Fully disconnected UPD stream"); } async fn listen( state: Arc>, mut source: UdpReceiver, + mut phase_watcher: watch::Receiver, ) { - while let Some(packet) = source.next().await { + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = source.next().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + warn!("Channel closed before disconnect command"); + break; + } + Some(Some(packet)) => { + let (packet, _src_addr) = match packet { + Ok(packet) => packet, + Err(err) => { + warn!("Got an invalid UDP packet: {}", err); + // To be expected, considering this is the internet, just ignore it + continue; + } + }; + match packet { + VoicePacket::Ping { .. } => { + // Note: A normal application would handle these and only use UDP for voice + // once it has received one. + continue; + } + VoicePacket::Audio { + session_id, + // seq_num, + payload, + // position_info, + .. + } => { + state.lock().unwrap().audio().decode_packet(session_id, payload); + } + } + } + } + } + }; + + join!(main_block, phase_transition_block); + + debug!("UDP listener process killed"); + + /*while let Some(packet) = source.next().await { let (packet, _src_addr) = match packet { Ok(packet) => packet, Err(err) => { @@ -93,7 +155,7 @@ async fn listen( state.lock().unwrap().audio().decode_packet(session_id, payload); } } - } + }*/ } async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { @@ -116,25 +178,57 @@ async fn send_voice( state: Arc>, sink: Arc>, server_addr: SocketAddr, + mut phase_watcher: watch::Receiver, ) { let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); - let mut count = 0; - while let Some(payload) = receiver.recv().await { - let reply = VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: count, - payload, - position_info: None, - }; - count += 1; - sink.lock() - .unwrap() - .send((reply, server_addr)) - .await - .unwrap(); - } + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + let mut count = 0; + loop { + let packet_recv = receiver.recv().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + warn!("Channel closed before disconnect command"); + break; + } + Some(Some(payload)) => { + let reply = VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: count, + payload, + position_info: None, + }; + count += 1; + sink.lock() + .unwrap() + .send((reply, server_addr)) + .await + .unwrap(); + } + } + } + }; + + join!(main_block, phase_transition_block); + + debug!("UDP listener process killed"); } -- cgit v1.2.1 From e4406676a28f2dfb756f8f9e38a4242166f19c0e Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 17:02:49 +0200 Subject: resolve some compiler warnings --- mumd/src/network/tcp.rs | 9 ++++----- mumd/src/network/udp.rs | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 0aca19e..1e0feee 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -16,7 +16,6 @@ use tokio::sync::{mpsc, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; -use futures_util::core_reexport::cell::RefCell; type TcpSender = SplitSink< Framed, ControlCodec>, @@ -44,7 +43,7 @@ pub async fn handle( .await; // Handshake (omitting `Version` message for brevity) - let mut state_lock = state.lock().unwrap(); + let state_lock = state.lock().unwrap(); authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; let phase_watcher = state_lock.phase_receiver(); let packet_sender = state_lock.packet_sender(); @@ -102,7 +101,7 @@ async fn send_pings( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} - tx.send(true); + tx.send(true).unwrap(); }; let mut interval = time::interval(Duration::from_secs(delay_seconds)); @@ -141,7 +140,7 @@ async fn send_packets( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} - tx.send(true); + tx.send(true).unwrap(); }; let main_block = async { @@ -191,7 +190,7 @@ async fn listen( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} - tx.send(true); + tx.send(true).unwrap(); }; let listener_block = async { diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index ab4ca1d..a757a2b 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -75,7 +75,7 @@ async fn listen( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} - tx.send(true); + tx.send(true).unwrap(); }; let main_block = async { @@ -185,7 +185,7 @@ async fn send_voice( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} - tx.send(true); + tx.send(true).unwrap(); }; let main_block = async { -- cgit v1.2.1 From ab0cdc240c65fdc6b764ed17f6611786d449acc3 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 17:45:04 +0200 Subject: add support for reconnecting to server --- mumd/src/network/tcp.rs | 65 +++++++++++++++--------------- mumd/src/network/udp.rs | 105 ++++++++++++++++++------------------------------ 2 files changed, 73 insertions(+), 97 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 1e0feee..d45b49d 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -2,7 +2,6 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; -use tokio::sync::oneshot; use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; @@ -12,7 +11,7 @@ use std::convert::{Into, TryInto}; use std::net::{SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, oneshot}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -27,37 +26,39 @@ type TcpReceiver = pub async fn handle( state: Arc>, mut connection_info_receiver: watch::Receiver>, - crypt_state_sender: oneshot::Sender, - packet_receiver: mpsc::UnboundedReceiver>, + crypt_state_sender: mpsc::Sender, + mut packet_receiver: mpsc::UnboundedReceiver>, ) { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { return; } - Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } - } - }; - let (mut sink, stream) = connect(connection_info.socket_addr, - connection_info.hostname, - connection_info.accept_invalid_cert) - .await; + loop { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; + let (mut sink, stream) = connect(connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert) + .await; - // Handshake (omitting `Version` message for brevity) - let state_lock = state.lock().unwrap(); - authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; - let phase_watcher = state_lock.phase_receiver(); - let packet_sender = state_lock.packet_sender(); - drop(state_lock); + // Handshake (omitting `Version` message for brevity) + let state_lock = state.lock().unwrap(); + authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; + let phase_watcher = state_lock.phase_receiver(); + let packet_sender = state_lock.packet_sender(); + drop(state_lock); - info!("Logging in..."); + info!("Logging in..."); - join!( - send_pings(packet_sender, 10, phase_watcher.clone()), - listen(state, stream, crypt_state_sender, phase_watcher.clone()), - send_packets(sink, packet_receiver, phase_watcher), - ); + join!( + send_pings(packet_sender, 10, phase_watcher.clone()), + listen(Arc::clone(&state), stream, crypt_state_sender.clone(), phase_watcher.clone()), + send_packets(sink, &mut packet_receiver, phase_watcher), + ); - debug!("Fully disconnected TCP stream"); + debug!("Fully disconnected TCP stream, waiting for new connection info"); + } } async fn connect( @@ -134,7 +135,7 @@ async fn send_pings( async fn send_packets( mut sink: TcpSender, - mut packet_receiver: mpsc::UnboundedReceiver>, + packet_receiver: &mut mpsc::UnboundedReceiver>, mut phase_watcher: watch::Receiver, ) { let (tx, rx) = oneshot::channel(); @@ -181,7 +182,7 @@ async fn send_packets( async fn listen( state: Arc>, mut stream: TcpReceiver, - crypt_state_sender: oneshot::Sender, + crypt_state_sender: mpsc::Sender, mut phase_watcher: watch::Receiver, ) { let mut crypt_state = None; @@ -238,12 +239,12 @@ async fn listen( } ControlPacket::ServerSync(msg) => { info!("Logged in"); - if let Some(sender) = crypt_state_sender.take() { + if let Some(mut sender) = crypt_state_sender.take() { let _ = sender.send( crypt_state .take() .expect("Server didn't send us any CryptSetup packet!"), - ); + ).await; } let mut state = state.lock().unwrap(); let server = state.server_mut(); diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index a757a2b..45e6e80 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -11,14 +11,48 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::{watch, oneshot}; +use tokio::sync::{watch, oneshot, mpsc}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; type UdpReceiver = SplitStream>; +pub async fn handle( + state: Arc>, + mut connection_info_receiver: watch::Receiver>, + mut crypt_state: mpsc::Receiver, +) { + let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); + + loop { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; + let (mut sink, source) = connect(&mut crypt_state).await; + + // Note: A normal application would also send periodic Ping packets, and its own audio + // via UDP. We instead trick the server into accepting us by sending it one + // dummy voice packet. + send_ping(&mut sink, connection_info.socket_addr).await; + + let sink = Arc::new(Mutex::new(sink)); + + let phase_watcher = state.lock().unwrap().phase_receiver(); + join!( + listen(Arc::clone(&state), source, phase_watcher.clone()), + send_voice(sink, connection_info.socket_addr, phase_watcher, &mut receiver), + ); + + debug!("Fully disconnected UDP stream, waiting for new connection info"); + } +} + pub async fn connect( - crypt_state: oneshot::Receiver, + crypt_state: &mut mpsc::Receiver, ) -> (UdpSender, UdpReceiver) { // Bind UDP socket let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) @@ -26,10 +60,10 @@ pub async fn connect( .expect("Failed to bind UDP socket"); // Wait for initial CryptState - let crypt_state = match crypt_state.await { - Ok(crypt_state) => crypt_state, + let crypt_state = match crypt_state.recv().await { + Some(crypt_state) => crypt_state, // disconnected before we received the CryptSetup packet, oh well - Err(_) => panic!("disconnect before crypt packet received"), //TODO exit gracefully + None => panic!("disconnect before crypt packet received"), //TODO exit gracefully }; debug!("UDP connected"); @@ -37,36 +71,6 @@ pub async fn connect( UdpFramed::new(udp_socket, crypt_state).split() } -pub async fn handle( - state: Arc>, - mut connection_info_receiver: watch::Receiver>, - crypt_state: oneshot::Receiver, -) { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { return; } - Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } - } - }; - let (mut sink, source) = connect(crypt_state).await; - - // Note: A normal application would also send periodic Ping packets, and its own audio - // via UDP. We instead trick the server into accepting us by sending it one - // dummy voice packet. - send_ping(&mut sink, connection_info.socket_addr).await; - - let sink = Arc::new(Mutex::new(sink)); - - let phase_watcher = state.lock().unwrap().phase_receiver(); - join!( - listen(Arc::clone(&state), source, phase_watcher.clone()), - send_voice(state, sink, connection_info.socket_addr, phase_watcher), - ); - - debug!("Fully disconnected UPD stream"); -} - async fn listen( state: Arc>, mut source: UdpReceiver, @@ -129,33 +133,6 @@ async fn listen( join!(main_block, phase_transition_block); debug!("UDP listener process killed"); - - /*while let Some(packet) = source.next().await { - let (packet, _src_addr) = match packet { - Ok(packet) => packet, - Err(err) => { - warn!("Got an invalid UDP packet: {}", err); - // To be expected, considering this is the internet, just ignore it - continue; - } - }; - match packet { - VoicePacket::Ping { .. } => { - // Note: A normal application would handle these and only use UDP for voice - // once it has received one. - continue; - } - VoicePacket::Audio { - session_id, - // seq_num, - payload, - // position_info, - .. - } => { - state.lock().unwrap().audio().decode_packet(session_id, payload); - } - } - }*/ } async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { @@ -175,13 +152,11 @@ async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { } async fn send_voice( - state: Arc>, sink: Arc>, server_addr: SocketAddr, mut phase_watcher: watch::Receiver, + receiver: &mut mpsc::Receiver, ) { - let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); - let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} -- cgit v1.2.1 From 17b84132b72cb45785738270ba5982889d447222 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 19:04:11 +0200 Subject: update server so that we clear information once we leave a server --- mumd/src/network/tcp.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index d45b49d..0a53266 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -247,7 +247,7 @@ async fn listen( ).await; } let mut state = state.lock().unwrap(); - let server = state.server_mut(); + let server = state.server_mut().unwrap(); server.parse_server_sync(msg); match &server.welcome_text { Some(s) => info!("Welcome: {}", s), @@ -268,9 +268,9 @@ async fn listen( if *state.phase_receiver().borrow() == StatePhase::Connecting { state.parse_initial_user_state(msg); } else { - state.server_mut().parse_user_state(msg); + state.server_mut().unwrap().parse_user_state(msg); } - let server = state.server_mut(); + let server = state.server_mut().unwrap(); let user = server.users().get(&session).unwrap(); info!("User {} connected to {}", user.name(), @@ -282,10 +282,10 @@ async fn listen( } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - state.lock().unwrap().server_mut().parse_channel_state(msg); //TODO parse initial if initial + state.lock().unwrap().server_mut().unwrap().parse_channel_state(msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { - state.lock().unwrap().server_mut().parse_channel_remove(msg); + state.lock().unwrap().server_mut().unwrap().parse_channel_remove(msg); } _ => {} } -- cgit v1.2.1 From af272afbcd9e0e283b88f37f2bf3d7b4da604321 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 19:18:13 +0200 Subject: update debug message --- mumd/src/network/udp.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 45e6e80..31e33e3 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -63,7 +63,7 @@ pub async fn connect( let crypt_state = match crypt_state.recv().await { Some(crypt_state) => crypt_state, // disconnected before we received the CryptSetup packet, oh well - None => panic!("disconnect before crypt packet received"), //TODO exit gracefully + None => panic!("Disconnect before crypt packet received"), //TODO exit gracefully }; debug!("UDP connected"); @@ -204,6 +204,6 @@ async fn send_voice( join!(main_block, phase_transition_block); - debug!("UDP listener process killed"); + debug!("UDP sender process killed"); } -- cgit v1.2.1 From 8f32d34f1cf31cfd10d07e623842dd3f7fc86e8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 19:29:34 +0200 Subject: cargo fmt --- mumd/src/network/mod.rs | 6 +--- mumd/src/network/tcp.rs | 82 ++++++++++++++++++++++++++++++++++--------------- mumd/src/network/udp.rs | 36 ++++++++++++++++------ 3 files changed, 86 insertions(+), 38 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/mod.rs b/mumd/src/network/mod.rs index 777faad..1a31ee2 100644 --- a/mumd/src/network/mod.rs +++ b/mumd/src/network/mod.rs @@ -11,11 +11,7 @@ pub struct ConnectionInfo { } impl ConnectionInfo { - pub fn new( - socket_addr: SocketAddr, - hostname: String, - accept_invalid_cert: bool, - ) -> Self { + pub fn new(socket_addr: SocketAddr, hostname: String, accept_invalid_cert: bool) -> Self { Self { socket_addr, hostname, diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 0a53266..e096843 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -2,16 +2,16 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; -use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::{Clientbound, Serverbound}; use std::convert::{Into, TryInto}; -use std::net::{SocketAddr}; +use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, watch, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -32,15 +32,21 @@ pub async fn handle( loop { let connection_info = loop { match connection_info_receiver.recv().await { - None => { return; } + None => { + return; + } Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } + Some(Some(connection_info)) => { + break connection_info; + } } }; - let (mut sink, stream) = connect(connection_info.socket_addr, - connection_info.hostname, - connection_info.accept_invalid_cert) - .await; + let (mut sink, stream) = connect( + connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert, + ) + .await; // Handshake (omitting `Version` message for brevity) let state_lock = state.lock().unwrap(); @@ -53,7 +59,12 @@ pub async fn handle( join!( send_pings(packet_sender, 10, phase_watcher.clone()), - listen(Arc::clone(&state), stream, crypt_state_sender.clone(), phase_watcher.clone()), + listen( + Arc::clone(&state), + stream, + crypt_state_sender.clone(), + phase_watcher.clone() + ), send_packets(sink, &mut packet_receiver, phase_watcher), ); @@ -101,7 +112,10 @@ async fn send_pings( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -140,7 +154,10 @@ async fn send_packets( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -190,7 +207,10 @@ async fn listen( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -240,11 +260,13 @@ async fn listen( ControlPacket::ServerSync(msg) => { info!("Logged in"); if let Some(mut sender) = crypt_state_sender.take() { - let _ = sender.send( - crypt_state - .take() - .expect("Server didn't send us any CryptSetup packet!"), - ).await; + let _ = sender + .send( + crypt_state + .take() + .expect("Server didn't send us any CryptSetup packet!"), + ) + .await; } let mut state = state.lock().unwrap(); let server = state.server_mut().unwrap(); @@ -272,20 +294,32 @@ async fn listen( } let server = state.server_mut().unwrap(); let user = server.users().get(&session).unwrap(); - info!("User {} connected to {}", - user.name(), - user.channel()); + info!("User {} connected to {}", user.name(), user.channel()); } ControlPacket::UserRemove(msg) => { info!("User {} left", msg.get_session()); - state.lock().unwrap().audio_mut().remove_client(msg.get_session()); + state + .lock() + .unwrap() + .audio_mut() + .remove_client(msg.get_session()); } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - state.lock().unwrap().server_mut().unwrap().parse_channel_state(msg); //TODO parse initial if initial + state + .lock() + .unwrap() + .server_mut() + .unwrap() + .parse_channel_state(msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { - state.lock().unwrap().server_mut().unwrap().parse_channel_remove(msg); + state + .lock() + .unwrap() + .server_mut() + .unwrap() + .parse_channel_remove(msg); } _ => {} } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 31e33e3..4f96c4c 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -3,7 +3,7 @@ use crate::state::{State, StatePhase}; use log::*; use bytes::Bytes; -use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; @@ -11,7 +11,7 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::{watch, oneshot, mpsc}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; @@ -27,9 +27,13 @@ pub async fn handle( loop { let connection_info = loop { match connection_info_receiver.recv().await { - None => { return; } + None => { + return; + } Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } + Some(Some(connection_info)) => { + break connection_info; + } } }; let (mut sink, source) = connect(&mut crypt_state).await; @@ -44,7 +48,12 @@ pub async fn handle( let phase_watcher = state.lock().unwrap().phase_receiver(); join!( listen(Arc::clone(&state), source, phase_watcher.clone()), - send_voice(sink, connection_info.socket_addr, phase_watcher, &mut receiver), + send_voice( + sink, + connection_info.socket_addr, + phase_watcher, + &mut receiver + ), ); debug!("Fully disconnected UDP stream, waiting for new connection info"); @@ -78,7 +87,10 @@ async fn listen( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -122,7 +134,11 @@ async fn listen( // position_info, .. } => { - state.lock().unwrap().audio().decode_packet(session_id, payload); + state + .lock() + .unwrap() + .audio() + .decode_packet(session_id, payload); } } } @@ -159,7 +175,10 @@ async fn send_voice( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -206,4 +225,3 @@ async fn send_voice( debug!("UDP sender process killed"); } - -- cgit v1.2.1 From afd537e085ddf2c92fb1f1879a72d290010fa570 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 19:42:28 +0200 Subject: cargo clippy --- mumd/src/network/tcp.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index e096843..6a369e5 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -186,7 +186,7 @@ async fn send_packets( } //clears queue of remaining packets - while let Ok(_) = packet_receiver.try_recv() {} + while packet_receiver.try_recv().is_ok() {} sink.close().await.unwrap(); }; @@ -270,12 +270,12 @@ async fn listen( } let mut state = state.lock().unwrap(); let server = state.server_mut().unwrap(); - server.parse_server_sync(msg); + server.parse_server_sync(*msg); match &server.welcome_text { Some(s) => info!("Welcome: {}", s), None => info!("No welcome received"), } - for (_, channel) in server.channels() { + for channel in server.channels().values() { info!("Found channel {}", channel.name()); } state.initialized(); @@ -288,9 +288,9 @@ async fn listen( let session = msg.get_session(); state.audio_mut().add_client(msg.get_session()); //TODO if *state.phase_receiver().borrow() == StatePhase::Connecting { - state.parse_initial_user_state(msg); + state.parse_initial_user_state(*msg); } else { - state.server_mut().unwrap().parse_user_state(msg); + state.server_mut().unwrap().parse_user_state(*msg); } let server = state.server_mut().unwrap(); let user = server.users().get(&session).unwrap(); @@ -311,7 +311,7 @@ async fn listen( .unwrap() .server_mut() .unwrap() - .parse_channel_state(msg); //TODO parse initial if initial + .parse_channel_state(*msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { state @@ -319,7 +319,7 @@ async fn listen( .unwrap() .server_mut() .unwrap() - .parse_channel_remove(msg); + .parse_channel_remove(*msg); } _ => {} } -- cgit v1.2.1