aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network
diff options
context:
space:
mode:
Diffstat (limited to 'mumd/src/network')
-rw-r--r--mumd/src/network/tcp.rs58
1 files changed, 31 insertions, 27 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 630f46a..cd11690 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -7,18 +7,18 @@ use futures_util::stream::{SplitSink, SplitStream};
use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket};
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::{Clientbound, Serverbound};
+use std::cell::RefCell;
+use std::collections::HashMap;
use std::convert::{Into, TryInto};
+use std::future::Future;
use std::net::SocketAddr;
+use std::rc::Rc;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::{self, Duration};
use tokio_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
-use std::collections::HashMap;
-use std::future::Future;
-use std::rc::Rc;
-use std::cell::RefCell;
type TcpSender = SplitSink<
Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>,
@@ -31,7 +31,7 @@ pub(crate) type TcpEventCallback = Box<dyn FnOnce(&TcpEventData)>;
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum TcpEvent {
- Connected, //fires when the client has connected to a server
+ Connected, //fires when the client has connected to a server
Disconnected, //fires when the client has disconnected from a server
}
@@ -131,13 +131,13 @@ async fn send_pings(
delay_seconds: u64,
phase_watcher: watch::Receiver<StatePhase>,
) {
- let interval = Rc::new(RefCell::new(time::interval(Duration::from_secs(delay_seconds))));
+ let interval = Rc::new(RefCell::new(time::interval(Duration::from_secs(
+ delay_seconds,
+ ))));
let packet_sender = Rc::new(RefCell::new(packet_sender));
run_until_disconnection(
- || async {
- Some(interval.borrow_mut().tick().await)
- },
+ || async { Some(interval.borrow_mut().tick().await) },
|_| async {
trace!("Sending ping");
let msg = msgs::Ping::new();
@@ -145,7 +145,8 @@ async fn send_pings(
},
|| async {},
phase_watcher,
- ).await;
+ )
+ .await;
debug!("Ping sender process killed");
}
@@ -158,9 +159,7 @@ async fn send_packets(
let sink = Rc::new(RefCell::new(sink));
let packet_receiver = Rc::new(RefCell::new(packet_receiver));
run_until_disconnection(
- || async {
- packet_receiver.borrow_mut().recv().await
- },
+ || async { packet_receiver.borrow_mut().recv().await },
|packet| async {
sink.borrow_mut().send(packet).await.unwrap();
},
@@ -171,7 +170,8 @@ async fn send_packets(
sink.borrow_mut().close().await.unwrap();
},
phase_watcher,
- ).await;
+ )
+ .await;
debug!("TCP packet sender killed");
}
@@ -188,9 +188,7 @@ async fn listen(
let stream = Rc::new(RefCell::new(stream));
run_until_disconnection(
- || async {
- stream.borrow_mut().next().await
- },
+ || async { stream.borrow_mut().next().await },
|packet| async {
match packet.unwrap() {
ControlPacket::TextMessage(msg) => {
@@ -289,7 +287,8 @@ async fn listen(
}
},
phase_watcher,
- ).await;
+ )
+ .await;
debug!("Killing TCP listener block");
}
@@ -301,13 +300,19 @@ async fn register_events(
) {
let tcp_event_register_receiver = Rc::new(RefCell::new(tcp_event_register_receiver));
run_until_disconnection(
- || async {
- tcp_event_register_receiver.borrow_mut().recv().await
+ || async { tcp_event_register_receiver.borrow_mut().recv().await },
+ |(event, handler)| async {
+ event_data
+ .lock()
+ .unwrap()
+ .entry(event)
+ .or_default()
+ .push(handler);
},
- |(event, handler)| async { event_data.lock().unwrap().entry(event).or_default().push(handler); },
|| async {},
phase_watcher,
- ).await;
+ )
+ .await;
}
async fn run_until_disconnection<T, F, G, H>(
@@ -315,11 +320,10 @@ async fn run_until_disconnection<T, F, G, H>(
mut handler: impl FnMut(T) -> G,
mut shutdown: impl FnMut() -> H,
mut phase_watcher: watch::Receiver<StatePhase>,
-)
- where
- F: Future<Output = Option<T>>,
- G: Future<Output = ()>,
- H: Future<Output = ()>,
+) where
+ F: Future<Output = Option<T>>,
+ G: Future<Output = ()>,
+ H: Future<Output = ()>,
{
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {