diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2020-10-14 19:29:34 +0200 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2020-10-14 19:29:34 +0200 |
| commit | 8f32d34f1cf31cfd10d07e623842dd3f7fc86e8e (patch) | |
| tree | 18b85d859f34964cd3cd20572a45a43d0afe8e62 /mumd/src/network | |
| parent | af272afbcd9e0e283b88f37f2bf3d7b4da604321 (diff) | |
| download | mum-8f32d34f1cf31cfd10d07e623842dd3f7fc86e8e.tar.gz | |
cargo fmt
Diffstat (limited to 'mumd/src/network')
| -rw-r--r-- | mumd/src/network/mod.rs | 6 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 82 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 36 |
3 files changed, 86 insertions, 38 deletions
diff --git a/mumd/src/network/mod.rs b/mumd/src/network/mod.rs index 777faad..1a31ee2 100644 --- a/mumd/src/network/mod.rs +++ b/mumd/src/network/mod.rs @@ -11,11 +11,7 @@ pub struct ConnectionInfo { } impl ConnectionInfo { - pub fn new( - socket_addr: SocketAddr, - hostname: String, - accept_invalid_cert: bool, - ) -> Self { + pub fn new(socket_addr: SocketAddr, hostname: String, accept_invalid_cert: bool) -> Self { Self { socket_addr, hostname, diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 0a53266..e096843 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -2,16 +2,16 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; -use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::{Clientbound, Serverbound}; use std::convert::{Into, TryInto}; -use std::net::{SocketAddr}; +use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, watch, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -32,15 +32,21 @@ pub async fn handle( loop { let connection_info = loop { match connection_info_receiver.recv().await { - None => { return; } + None => { + return; + } Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } + Some(Some(connection_info)) => { + break connection_info; + } } }; - let (mut sink, stream) = connect(connection_info.socket_addr, - connection_info.hostname, - connection_info.accept_invalid_cert) - .await; + let (mut sink, stream) = connect( + connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert, + ) + .await; // Handshake (omitting `Version` message for brevity) let state_lock = state.lock().unwrap(); @@ -53,7 +59,12 @@ pub async fn handle( join!( send_pings(packet_sender, 10, phase_watcher.clone()), - listen(Arc::clone(&state), stream, crypt_state_sender.clone(), phase_watcher.clone()), + listen( + Arc::clone(&state), + stream, + crypt_state_sender.clone(), + phase_watcher.clone() + ), send_packets(sink, &mut packet_receiver, phase_watcher), ); @@ -101,7 +112,10 @@ async fn send_pings( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -140,7 +154,10 @@ async fn send_packets( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -190,7 +207,10 @@ async fn listen( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -240,11 +260,13 @@ async fn listen( ControlPacket::ServerSync(msg) => { info!("Logged in"); if let Some(mut sender) = crypt_state_sender.take() { - let _ = sender.send( - crypt_state - .take() - .expect("Server didn't send us any CryptSetup packet!"), - ).await; + let _ = sender + .send( + crypt_state + .take() + .expect("Server didn't send us any CryptSetup packet!"), + ) + .await; } let mut state = state.lock().unwrap(); let server = state.server_mut().unwrap(); @@ -272,20 +294,32 @@ async fn listen( } let server = state.server_mut().unwrap(); let user = server.users().get(&session).unwrap(); - info!("User {} connected to {}", - user.name(), - user.channel()); + info!("User {} connected to {}", user.name(), user.channel()); } ControlPacket::UserRemove(msg) => { info!("User {} left", msg.get_session()); - state.lock().unwrap().audio_mut().remove_client(msg.get_session()); + state + .lock() + .unwrap() + .audio_mut() + .remove_client(msg.get_session()); } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - state.lock().unwrap().server_mut().unwrap().parse_channel_state(msg); //TODO parse initial if initial + state + .lock() + .unwrap() + .server_mut() + .unwrap() + .parse_channel_state(msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { - state.lock().unwrap().server_mut().unwrap().parse_channel_remove(msg); + state + .lock() + .unwrap() + .server_mut() + .unwrap() + .parse_channel_remove(msg); } _ => {} } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 31e33e3..4f96c4c 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -3,7 +3,7 @@ use crate::state::{State, StatePhase}; use log::*; use bytes::Bytes; -use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; @@ -11,7 +11,7 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::{watch, oneshot, mpsc}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>; @@ -27,9 +27,13 @@ pub async fn handle( loop { let connection_info = loop { match connection_info_receiver.recv().await { - None => { return; } + None => { + return; + } Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } + Some(Some(connection_info)) => { + break connection_info; + } } }; let (mut sink, source) = connect(&mut crypt_state).await; @@ -44,7 +48,12 @@ pub async fn handle( let phase_watcher = state.lock().unwrap().phase_receiver(); join!( listen(Arc::clone(&state), source, phase_watcher.clone()), - send_voice(sink, connection_info.socket_addr, phase_watcher, &mut receiver), + send_voice( + sink, + connection_info.socket_addr, + phase_watcher, + &mut receiver + ), ); debug!("Fully disconnected UDP stream, waiting for new connection info"); @@ -78,7 +87,10 @@ async fn listen( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -122,7 +134,11 @@ async fn listen( // position_info, .. } => { - state.lock().unwrap().audio().decode_packet(session_id, payload); + state + .lock() + .unwrap() + .audio() + .decode_packet(session_id, payload); } } } @@ -159,7 +175,10 @@ async fn send_voice( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -206,4 +225,3 @@ async fn send_voice( debug!("UDP sender process killed"); } - |
