diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2020-10-14 19:29:34 +0200 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2020-10-14 19:29:34 +0200 |
| commit | 8f32d34f1cf31cfd10d07e623842dd3f7fc86e8e (patch) | |
| tree | 18b85d859f34964cd3cd20572a45a43d0afe8e62 /mumd/src/network/tcp.rs | |
| parent | af272afbcd9e0e283b88f37f2bf3d7b4da604321 (diff) | |
| download | mum-8f32d34f1cf31cfd10d07e623842dd3f7fc86e8e.tar.gz | |
cargo fmt
Diffstat (limited to 'mumd/src/network/tcp.rs')
| -rw-r--r-- | mumd/src/network/tcp.rs | 82 |
1 files changed, 58 insertions, 24 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 0a53266..e096843 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -2,16 +2,16 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; -use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::{Clientbound, Serverbound}; use std::convert::{Into, TryInto}; -use std::net::{SocketAddr}; +use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, watch, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -32,15 +32,21 @@ pub async fn handle( loop { let connection_info = loop { match connection_info_receiver.recv().await { - None => { return; } + None => { + return; + } Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } + Some(Some(connection_info)) => { + break connection_info; + } } }; - let (mut sink, stream) = connect(connection_info.socket_addr, - connection_info.hostname, - connection_info.accept_invalid_cert) - .await; + let (mut sink, stream) = connect( + connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert, + ) + .await; // Handshake (omitting `Version` message for brevity) let state_lock = state.lock().unwrap(); @@ -53,7 +59,12 @@ pub async fn handle( join!( send_pings(packet_sender, 10, phase_watcher.clone()), - listen(Arc::clone(&state), stream, crypt_state_sender.clone(), phase_watcher.clone()), + listen( + Arc::clone(&state), + stream, + crypt_state_sender.clone(), + phase_watcher.clone() + ), send_packets(sink, &mut packet_receiver, phase_watcher), ); @@ -101,7 +112,10 @@ async fn send_pings( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -140,7 +154,10 @@ async fn send_packets( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -190,7 +207,10 @@ async fn listen( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -240,11 +260,13 @@ async fn listen( ControlPacket::ServerSync(msg) => { info!("Logged in"); if let Some(mut sender) = crypt_state_sender.take() { - let _ = sender.send( - crypt_state - .take() - .expect("Server didn't send us any CryptSetup packet!"), - ).await; + let _ = sender + .send( + crypt_state + .take() + .expect("Server didn't send us any CryptSetup packet!"), + ) + .await; } let mut state = state.lock().unwrap(); let server = state.server_mut().unwrap(); @@ -272,20 +294,32 @@ async fn listen( } let server = state.server_mut().unwrap(); let user = server.users().get(&session).unwrap(); - info!("User {} connected to {}", - user.name(), - user.channel()); + info!("User {} connected to {}", user.name(), user.channel()); } ControlPacket::UserRemove(msg) => { info!("User {} left", msg.get_session()); - state.lock().unwrap().audio_mut().remove_client(msg.get_session()); + state + .lock() + .unwrap() + .audio_mut() + .remove_client(msg.get_session()); } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - state.lock().unwrap().server_mut().unwrap().parse_channel_state(msg); //TODO parse initial if initial + state + .lock() + .unwrap() + .server_mut() + .unwrap() + .parse_channel_state(msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { - state.lock().unwrap().server_mut().unwrap().parse_channel_remove(msg); + state + .lock() + .unwrap() + .server_mut() + .unwrap() + .parse_channel_remove(msg); } _ => {} } |
