aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2020-10-14 19:29:34 +0200
committerGustav Sörnäs <gustav@sornas.net>2020-10-14 19:29:34 +0200
commit8f32d34f1cf31cfd10d07e623842dd3f7fc86e8e (patch)
tree18b85d859f34964cd3cd20572a45a43d0afe8e62 /mumd/src/network
parentaf272afbcd9e0e283b88f37f2bf3d7b4da604321 (diff)
downloadmum-8f32d34f1cf31cfd10d07e623842dd3f7fc86e8e.tar.gz
cargo fmt
Diffstat (limited to 'mumd/src/network')
-rw-r--r--mumd/src/network/mod.rs6
-rw-r--r--mumd/src/network/tcp.rs82
-rw-r--r--mumd/src/network/udp.rs36
3 files changed, 86 insertions, 38 deletions
diff --git a/mumd/src/network/mod.rs b/mumd/src/network/mod.rs
index 777faad..1a31ee2 100644
--- a/mumd/src/network/mod.rs
+++ b/mumd/src/network/mod.rs
@@ -11,11 +11,7 @@ pub struct ConnectionInfo {
}
impl ConnectionInfo {
- pub fn new(
- socket_addr: SocketAddr,
- hostname: String,
- accept_invalid_cert: bool,
- ) -> Self {
+ pub fn new(socket_addr: SocketAddr, hostname: String, accept_invalid_cert: bool) -> Self {
Self {
socket_addr,
hostname,
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 0a53266..e096843 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -2,16 +2,16 @@ use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
use log::*;
-use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt};
+use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt};
use futures_util::stream::{SplitSink, SplitStream};
use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket};
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::{Clientbound, Serverbound};
use std::convert::{Into, TryInto};
-use std::net::{SocketAddr};
+use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
-use tokio::sync::{mpsc, watch, oneshot};
+use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::{self, Duration};
use tokio_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
@@ -32,15 +32,21 @@ pub async fn handle(
loop {
let connection_info = loop {
match connection_info_receiver.recv().await {
- None => { return; }
+ None => {
+ return;
+ }
Some(None) => {}
- Some(Some(connection_info)) => { break connection_info; }
+ Some(Some(connection_info)) => {
+ break connection_info;
+ }
}
};
- let (mut sink, stream) = connect(connection_info.socket_addr,
- connection_info.hostname,
- connection_info.accept_invalid_cert)
- .await;
+ let (mut sink, stream) = connect(
+ connection_info.socket_addr,
+ connection_info.hostname,
+ connection_info.accept_invalid_cert,
+ )
+ .await;
// Handshake (omitting `Version` message for brevity)
let state_lock = state.lock().unwrap();
@@ -53,7 +59,12 @@ pub async fn handle(
join!(
send_pings(packet_sender, 10, phase_watcher.clone()),
- listen(Arc::clone(&state), stream, crypt_state_sender.clone(), phase_watcher.clone()),
+ listen(
+ Arc::clone(&state),
+ stream,
+ crypt_state_sender.clone(),
+ phase_watcher.clone()
+ ),
send_packets(sink, &mut packet_receiver, phase_watcher),
);
@@ -101,7 +112,10 @@ async fn send_pings(
) {
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
- while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {}
+ while !matches!(
+ phase_watcher.recv().await.unwrap(),
+ StatePhase::Disconnected
+ ) {}
tx.send(true).unwrap();
};
@@ -140,7 +154,10 @@ async fn send_packets(
) {
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
- while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {}
+ while !matches!(
+ phase_watcher.recv().await.unwrap(),
+ StatePhase::Disconnected
+ ) {}
tx.send(true).unwrap();
};
@@ -190,7 +207,10 @@ async fn listen(
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
- while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {}
+ while !matches!(
+ phase_watcher.recv().await.unwrap(),
+ StatePhase::Disconnected
+ ) {}
tx.send(true).unwrap();
};
@@ -240,11 +260,13 @@ async fn listen(
ControlPacket::ServerSync(msg) => {
info!("Logged in");
if let Some(mut sender) = crypt_state_sender.take() {
- let _ = sender.send(
- crypt_state
- .take()
- .expect("Server didn't send us any CryptSetup packet!"),
- ).await;
+ let _ = sender
+ .send(
+ crypt_state
+ .take()
+ .expect("Server didn't send us any CryptSetup packet!"),
+ )
+ .await;
}
let mut state = state.lock().unwrap();
let server = state.server_mut().unwrap();
@@ -272,20 +294,32 @@ async fn listen(
}
let server = state.server_mut().unwrap();
let user = server.users().get(&session).unwrap();
- info!("User {} connected to {}",
- user.name(),
- user.channel());
+ info!("User {} connected to {}", user.name(), user.channel());
}
ControlPacket::UserRemove(msg) => {
info!("User {} left", msg.get_session());
- state.lock().unwrap().audio_mut().remove_client(msg.get_session());
+ state
+ .lock()
+ .unwrap()
+ .audio_mut()
+ .remove_client(msg.get_session());
}
ControlPacket::ChannelState(msg) => {
debug!("Channel state received");
- state.lock().unwrap().server_mut().unwrap().parse_channel_state(msg); //TODO parse initial if initial
+ state
+ .lock()
+ .unwrap()
+ .server_mut()
+ .unwrap()
+ .parse_channel_state(msg); //TODO parse initial if initial
}
ControlPacket::ChannelRemove(msg) => {
- state.lock().unwrap().server_mut().unwrap().parse_channel_remove(msg);
+ state
+ .lock()
+ .unwrap()
+ .server_mut()
+ .unwrap()
+ .parse_channel_remove(msg);
}
_ => {}
}
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 31e33e3..4f96c4c 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -3,7 +3,7 @@ use crate::state::{State, StatePhase};
use log::*;
use bytes::Bytes;
-use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt};
+use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt};
use futures_util::stream::{SplitSink, SplitStream};
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::voice::{VoicePacket, VoicePacketPayload};
@@ -11,7 +11,7 @@ use mumble_protocol::Serverbound;
use std::net::{Ipv6Addr, SocketAddr};
use std::sync::{Arc, Mutex};
use tokio::net::UdpSocket;
-use tokio::sync::{watch, oneshot, mpsc};
+use tokio::sync::{mpsc, oneshot, watch};
use tokio_util::udp::UdpFramed;
type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>;
@@ -27,9 +27,13 @@ pub async fn handle(
loop {
let connection_info = loop {
match connection_info_receiver.recv().await {
- None => { return; }
+ None => {
+ return;
+ }
Some(None) => {}
- Some(Some(connection_info)) => { break connection_info; }
+ Some(Some(connection_info)) => {
+ break connection_info;
+ }
}
};
let (mut sink, source) = connect(&mut crypt_state).await;
@@ -44,7 +48,12 @@ pub async fn handle(
let phase_watcher = state.lock().unwrap().phase_receiver();
join!(
listen(Arc::clone(&state), source, phase_watcher.clone()),
- send_voice(sink, connection_info.socket_addr, phase_watcher, &mut receiver),
+ send_voice(
+ sink,
+ connection_info.socket_addr,
+ phase_watcher,
+ &mut receiver
+ ),
);
debug!("Fully disconnected UDP stream, waiting for new connection info");
@@ -78,7 +87,10 @@ async fn listen(
) {
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
- while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {}
+ while !matches!(
+ phase_watcher.recv().await.unwrap(),
+ StatePhase::Disconnected
+ ) {}
tx.send(true).unwrap();
};
@@ -122,7 +134,11 @@ async fn listen(
// position_info,
..
} => {
- state.lock().unwrap().audio().decode_packet(session_id, payload);
+ state
+ .lock()
+ .unwrap()
+ .audio()
+ .decode_packet(session_id, payload);
}
}
}
@@ -159,7 +175,10 @@ async fn send_voice(
) {
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
- while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {}
+ while !matches!(
+ phase_watcher.recv().await.unwrap(),
+ StatePhase::Disconnected
+ ) {}
tx.send(true).unwrap();
};
@@ -206,4 +225,3 @@ async fn send_voice(
debug!("UDP sender process killed");
}
-