diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2021-06-06 23:26:24 +0200 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2021-06-06 23:26:24 +0200 |
| commit | be76c2aa51733a0cf495e92659fbcbe527f41149 (patch) | |
| tree | 617fb1caa999c076a45233b4bedea6a78192db25 /mumd/src/network | |
| parent | 7fc5a1a36404ee4cbc09d20c955e6edd3d2ac523 (diff) | |
| download | mum-be76c2aa51733a0cf495e92659fbcbe527f41149.tar.gz | |
cargo fmt
Diffstat (limited to 'mumd/src/network')
| -rw-r--r-- | mumd/src/network/tcp.rs | 93 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 83 |
2 files changed, 113 insertions, 63 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index b513797..5cc2bf7 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,11 +1,14 @@ -use crate::{error::{ServerSendError, TcpError}, notifications}; use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; +use crate::{ + error::{ServerSendError, TcpError}, + notifications, +}; use log::*; -use futures_util::{FutureExt, SinkExt, StreamExt}; use futures_util::select; use futures_util::stream::{SplitSink, SplitStream, Stream}; +use futures_util::{FutureExt, SinkExt, StreamExt}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; @@ -73,24 +76,44 @@ impl TcpEventQueue { /// Registers a new callback to be triggered when an event is fired. pub fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { - self.callbacks.write().unwrap().entry(at).or_default().push(callback); + self.callbacks + .write() + .unwrap() + .entry(at) + .or_default() + .push(callback); } /// Registers a new callback to be triggered when an event is fired. pub fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { - self.subscribers.write().unwrap().entry(at).or_default().push(callback); + self.subscribers + .write() + .unwrap() + .entry(at) + .or_default() + .push(callback); } /// Fires all callbacks related to a specific TCP event and removes them from the event queue. /// Also calls all event subscribers, but keeps them in the queue pub fn resolve<'a>(&self, data: TcpEventData<'a>) { - if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) { + if let Some(vec) = self + .callbacks + .write() + .unwrap() + .get_mut(&TcpEvent::from(&data)) + { let old = std::mem::take(vec); for handler in old { handler(data.clone()); } } - if let Some(vec) = self.subscribers.write().unwrap().get_mut(&TcpEvent::from(&data)) { + if let Some(vec) = self + .subscribers + .write() + .unwrap() + .get_mut(&TcpEvent::from(&data)) + { let old = std::mem::take(vec); for mut e in old { if e(data.clone()) { @@ -128,14 +151,18 @@ pub async fn handle( // Handshake (omitting `Version` message for brevity) let (username, password) = { let state_lock = state.read().unwrap(); - (state_lock.username().unwrap().to_string(), - state_lock.password().map(|x| x.to_string())) + ( + state_lock.username().unwrap().to_string(), + state_lock.password().map(|x| x.to_string()), + ) }; authenticate(&mut sink, username, password).await?; let (phase_watcher, input_receiver) = { let state_lock = state.read().unwrap(); - (state_lock.phase_receiver(), - state_lock.audio_input().receiver()) + ( + state_lock.phase_receiver(), + state_lock.audio_input().receiver(), + ) }; info!("Logging in..."); @@ -162,7 +189,9 @@ pub async fn handle( } }, phase_watcher, - ).await.unwrap_or(Ok(()))?; + ) + .await + .unwrap_or(Ok(()))?; event_queue.resolve(TcpEventData::Disconnected); @@ -197,7 +226,7 @@ async fn connect( async fn authenticate( sink: &mut TcpSender, username: String, - password: Option<String> + password: Option<String>, ) -> Result<(), TcpError> { let mut msg = msgs::Authenticate::new(); msg.set_username(username); @@ -242,7 +271,10 @@ async fn send_voice( let mut inner_phase_watcher = phase_watcher.clone(); loop { inner_phase_watcher.changed().await.unwrap(); - if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::TCP)) { + if matches!( + *inner_phase_watcher.borrow(), + StatePhase::Connected(VoiceStreamType::TCP) + ) { break; } } @@ -257,11 +289,14 @@ async fn send_voice( .next() .await .expect("No audio stream") - .into())?; + .into(), + )?; } }, inner_phase_watcher.clone(), - ).await.unwrap_or(Ok::<(), ServerSendError>(()))?; + ) + .await + .unwrap_or(Ok::<(), ServerSendError>(()))?; } } @@ -285,18 +320,23 @@ async fn listen( // We end up here if the login was rejected. We probably want // to exit before that. warn!("TCP stream gone"); - state.read().unwrap().broadcast_phase(StatePhase::Disconnected); + state + .read() + .unwrap() + .broadcast_phase(StatePhase::Disconnected); break; } }; match packet { ControlPacket::TextMessage(msg) => { let mut state = state.write().unwrap(); - let user = state.server() + let user = state + .server() .and_then(|server| server.users().get(&msg.get_actor())) .map(|user| user.name()); if let Some(user) = user { - notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this + notifications::send(format!("{}: {}", user, msg.get_message())); + //TODO: probably want a config flag for this } state.register_message((msg.get_message().to_owned(), msg.get_actor())); drop(state); @@ -345,7 +385,9 @@ async fn listen( debug!("Login rejected: {:?}", msg); match msg.get_field_type() { msgs::Reject_RejectType::WrongServerPW => { - event_queue.resolve(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword))); + event_queue.resolve(TcpEventData::Connected(Err( + mumlib::Error::InvalidServerPassword, + ))); } ty => { warn!("Unhandled reject type: {:?}", ty); @@ -385,14 +427,11 @@ async fn listen( // position_info, .. } => { - state - .read() - .unwrap() - .audio_output() - .decode_packet_payload( - VoiceStreamType::TCP, - session_id, - payload); + state.read().unwrap().audio_output().decode_packet_payload( + VoiceStreamType::TCP, + session_id, + payload, + ); } } } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 0958912..95dcf33 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -2,9 +2,9 @@ use crate::error::UdpError; use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; -use futures_util::{FutureExt, SinkExt, StreamExt}; use futures_util::future::join4; use futures_util::stream::{SplitSink, SplitStream, Stream}; +use futures_util::{FutureExt, SinkExt, StreamExt}; use log::*; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; @@ -13,10 +13,13 @@ use mumble_protocol::Serverbound; use std::collections::{hash_map::Entry, HashMap}; use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; -use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock}; -use tokio::{join, net::UdpSocket}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, +}; use tokio::sync::{mpsc, oneshot, watch, Mutex}; use tokio::time::{interval, timeout, Duration}; +use tokio::{join, net::UdpSocket}; use tokio_util::udp::UdpFramed; use super::{run_until, VoiceStreamType}; @@ -53,11 +56,7 @@ pub async fn handle( run_until( |phase| matches!(phase, StatePhase::Disconnected), join4( - listen( - Arc::clone(&state), - Arc::clone(&source), - &last_ping_recv, - ), + listen(Arc::clone(&state), Arc::clone(&source), &last_ping_recv), send_voice( Arc::clone(&sink), connection_info.socket_addr, @@ -71,9 +70,11 @@ pub async fn handle( &last_ping_recv, ), new_crypt_state(&mut crypt_state_receiver, sink, source), - ).map(|_| ()), + ) + .map(|_| ()), phase_watcher, - ).await; + ) + .await; debug!("Fully disconnected UDP stream, waiting for new connection info"); } @@ -83,8 +84,7 @@ async fn connect( crypt_state: &mut mpsc::Receiver<ClientCryptState>, ) -> Result<(UdpSender, UdpReceiver), UdpError> { // Bind UDP socket - let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) - .await?; + let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)).await?; // Wait for initial CryptState let crypt_state = match crypt_state.recv().await { @@ -146,11 +146,11 @@ async fn listen( // position_info, .. } => { - state - .read() - .unwrap() - .audio_output() - .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); + state.read().unwrap().audio_output().decode_packet_payload( + VoiceStreamType::UDP, + session_id, + payload, + ); } } } @@ -178,12 +178,17 @@ async fn send_pings( match sink .lock() .await - .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr)) + .send(( + VoicePacket::Ping { + timestamp: last_recv + 1, + }, + server_addr, + )) .await { Ok(_) => { last_send = Some(last_recv + 1); - }, + } Err(e) => { debug!("Error sending UDP ping: {}", e); } @@ -201,7 +206,10 @@ async fn send_voice( let mut inner_phase_watcher = phase_watcher.clone(); loop { inner_phase_watcher.changed().await.unwrap(); - if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::UDP)) { + if matches!( + *inner_phase_watcher.borrow(), + StatePhase::Connected(VoiceStreamType::UDP) + ) { break; } } @@ -215,13 +223,12 @@ async fn send_voice( } }, phase_watcher.clone(), - ).await; + ) + .await; } } -pub async fn handle_pings( - mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>, -) { +pub async fn handle_pings(mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>) { let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await .expect("Failed to bind UDP socket"); @@ -246,19 +253,23 @@ pub async fn handle_pings( } tokio::spawn(async move { - handle( - match timeout(Duration::from_secs(1), rx).await { - Ok(Ok(r)) => Some(r), - Ok(Err(_)) => { - warn!("Ping response sender for server {}, ping id {} dropped", socket_addr, id); - None - } - Err(_) => { - debug!("Server {} timed out when sending ping id {}", socket_addr, id); - None - } + handle(match timeout(Duration::from_secs(1), rx).await { + Ok(Ok(r)) => Some(r), + Ok(Err(_)) => { + warn!( + "Ping response sender for server {}, ping id {} dropped", + socket_addr, id + ); + None } - ); + Err(_) => { + debug!( + "Server {} timed out when sending ping id {}", + socket_addr, id + ); + None + } + }); }); } }; |
