diff options
Diffstat (limited to 'mumd/src/network/tcp.rs')
| -rw-r--r-- | mumd/src/network/tcp.rs | 58 |
1 files changed, 31 insertions, 27 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 630f46a..cd11690 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -7,18 +7,18 @@ use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::{Clientbound, Serverbound}; +use std::cell::RefCell; +use std::collections::HashMap; use std::convert::{Into, TryInto}; +use std::future::Future; use std::net::SocketAddr; +use std::rc::Rc; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; -use std::collections::HashMap; -use std::future::Future; -use std::rc::Rc; -use std::cell::RefCell; type TcpSender = SplitSink< Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>, @@ -31,7 +31,7 @@ pub(crate) type TcpEventCallback = Box<dyn FnOnce(&TcpEventData)>; #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub enum TcpEvent { - Connected, //fires when the client has connected to a server + Connected, //fires when the client has connected to a server Disconnected, //fires when the client has disconnected from a server } @@ -131,13 +131,13 @@ async fn send_pings( delay_seconds: u64, phase_watcher: watch::Receiver<StatePhase>, ) { - let interval = Rc::new(RefCell::new(time::interval(Duration::from_secs(delay_seconds)))); + let interval = Rc::new(RefCell::new(time::interval(Duration::from_secs( + delay_seconds, + )))); let packet_sender = Rc::new(RefCell::new(packet_sender)); run_until_disconnection( - || async { - Some(interval.borrow_mut().tick().await) - }, + || async { Some(interval.borrow_mut().tick().await) }, |_| async { trace!("Sending ping"); let msg = msgs::Ping::new(); @@ -145,7 +145,8 @@ async fn send_pings( }, || async {}, phase_watcher, - ).await; + ) + .await; debug!("Ping sender process killed"); } @@ -158,9 +159,7 @@ async fn send_packets( let sink = Rc::new(RefCell::new(sink)); let packet_receiver = Rc::new(RefCell::new(packet_receiver)); run_until_disconnection( - || async { - packet_receiver.borrow_mut().recv().await - }, + || async { packet_receiver.borrow_mut().recv().await }, |packet| async { sink.borrow_mut().send(packet).await.unwrap(); }, @@ -171,7 +170,8 @@ async fn send_packets( sink.borrow_mut().close().await.unwrap(); }, phase_watcher, - ).await; + ) + .await; debug!("TCP packet sender killed"); } @@ -188,9 +188,7 @@ async fn listen( let stream = Rc::new(RefCell::new(stream)); run_until_disconnection( - || async { - stream.borrow_mut().next().await - }, + || async { stream.borrow_mut().next().await }, |packet| async { match packet.unwrap() { ControlPacket::TextMessage(msg) => { @@ -289,7 +287,8 @@ async fn listen( } }, phase_watcher, - ).await; + ) + .await; debug!("Killing TCP listener block"); } @@ -301,13 +300,19 @@ async fn register_events( ) { let tcp_event_register_receiver = Rc::new(RefCell::new(tcp_event_register_receiver)); run_until_disconnection( - || async { - tcp_event_register_receiver.borrow_mut().recv().await + || async { tcp_event_register_receiver.borrow_mut().recv().await }, + |(event, handler)| async { + event_data + .lock() + .unwrap() + .entry(event) + .or_default() + .push(handler); }, - |(event, handler)| async { event_data.lock().unwrap().entry(event).or_default().push(handler); }, || async {}, phase_watcher, - ).await; + ) + .await; } async fn run_until_disconnection<T, F, G, H>( @@ -315,11 +320,10 @@ async fn run_until_disconnection<T, F, G, H>( mut handler: impl FnMut(T) -> G, mut shutdown: impl FnMut() -> H, mut phase_watcher: watch::Receiver<StatePhase>, -) - where - F: Future<Output = Option<T>>, - G: Future<Output = ()>, - H: Future<Output = ()>, +) where + F: Future<Output = Option<T>>, + G: Future<Output = ()>, + H: Future<Output = ()>, { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { |
