aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2021-06-06 23:26:24 +0200
committerGustav Sörnäs <gustav@sornas.net>2021-06-06 23:26:24 +0200
commitbe76c2aa51733a0cf495e92659fbcbe527f41149 (patch)
tree617fb1caa999c076a45233b4bedea6a78192db25 /mumd/src/network
parent7fc5a1a36404ee4cbc09d20c955e6edd3d2ac523 (diff)
downloadmum-be76c2aa51733a0cf495e92659fbcbe527f41149.tar.gz
cargo fmt
Diffstat (limited to 'mumd/src/network')
-rw-r--r--mumd/src/network/tcp.rs93
-rw-r--r--mumd/src/network/udp.rs83
2 files changed, 113 insertions, 63 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index b513797..5cc2bf7 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -1,11 +1,14 @@
-use crate::{error::{ServerSendError, TcpError}, notifications};
use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
+use crate::{
+ error::{ServerSendError, TcpError},
+ notifications,
+};
use log::*;
-use futures_util::{FutureExt, SinkExt, StreamExt};
use futures_util::select;
use futures_util::stream::{SplitSink, SplitStream, Stream};
+use futures_util::{FutureExt, SinkExt, StreamExt};
use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket};
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::voice::VoicePacket;
@@ -73,24 +76,44 @@ impl TcpEventQueue {
/// Registers a new callback to be triggered when an event is fired.
pub fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) {
- self.callbacks.write().unwrap().entry(at).or_default().push(callback);
+ self.callbacks
+ .write()
+ .unwrap()
+ .entry(at)
+ .or_default()
+ .push(callback);
}
/// Registers a new callback to be triggered when an event is fired.
pub fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) {
- self.subscribers.write().unwrap().entry(at).or_default().push(callback);
+ self.subscribers
+ .write()
+ .unwrap()
+ .entry(at)
+ .or_default()
+ .push(callback);
}
/// Fires all callbacks related to a specific TCP event and removes them from the event queue.
/// Also calls all event subscribers, but keeps them in the queue
pub fn resolve<'a>(&self, data: TcpEventData<'a>) {
- if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) {
+ if let Some(vec) = self
+ .callbacks
+ .write()
+ .unwrap()
+ .get_mut(&TcpEvent::from(&data))
+ {
let old = std::mem::take(vec);
for handler in old {
handler(data.clone());
}
}
- if let Some(vec) = self.subscribers.write().unwrap().get_mut(&TcpEvent::from(&data)) {
+ if let Some(vec) = self
+ .subscribers
+ .write()
+ .unwrap()
+ .get_mut(&TcpEvent::from(&data))
+ {
let old = std::mem::take(vec);
for mut e in old {
if e(data.clone()) {
@@ -128,14 +151,18 @@ pub async fn handle(
// Handshake (omitting `Version` message for brevity)
let (username, password) = {
let state_lock = state.read().unwrap();
- (state_lock.username().unwrap().to_string(),
- state_lock.password().map(|x| x.to_string()))
+ (
+ state_lock.username().unwrap().to_string(),
+ state_lock.password().map(|x| x.to_string()),
+ )
};
authenticate(&mut sink, username, password).await?;
let (phase_watcher, input_receiver) = {
let state_lock = state.read().unwrap();
- (state_lock.phase_receiver(),
- state_lock.audio_input().receiver())
+ (
+ state_lock.phase_receiver(),
+ state_lock.audio_input().receiver(),
+ )
};
info!("Logging in...");
@@ -162,7 +189,9 @@ pub async fn handle(
}
},
phase_watcher,
- ).await.unwrap_or(Ok(()))?;
+ )
+ .await
+ .unwrap_or(Ok(()))?;
event_queue.resolve(TcpEventData::Disconnected);
@@ -197,7 +226,7 @@ async fn connect(
async fn authenticate(
sink: &mut TcpSender,
username: String,
- password: Option<String>
+ password: Option<String>,
) -> Result<(), TcpError> {
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
@@ -242,7 +271,10 @@ async fn send_voice(
let mut inner_phase_watcher = phase_watcher.clone();
loop {
inner_phase_watcher.changed().await.unwrap();
- if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::TCP)) {
+ if matches!(
+ *inner_phase_watcher.borrow(),
+ StatePhase::Connected(VoiceStreamType::TCP)
+ ) {
break;
}
}
@@ -257,11 +289,14 @@ async fn send_voice(
.next()
.await
.expect("No audio stream")
- .into())?;
+ .into(),
+ )?;
}
},
inner_phase_watcher.clone(),
- ).await.unwrap_or(Ok::<(), ServerSendError>(()))?;
+ )
+ .await
+ .unwrap_or(Ok::<(), ServerSendError>(()))?;
}
}
@@ -285,18 +320,23 @@ async fn listen(
// We end up here if the login was rejected. We probably want
// to exit before that.
warn!("TCP stream gone");
- state.read().unwrap().broadcast_phase(StatePhase::Disconnected);
+ state
+ .read()
+ .unwrap()
+ .broadcast_phase(StatePhase::Disconnected);
break;
}
};
match packet {
ControlPacket::TextMessage(msg) => {
let mut state = state.write().unwrap();
- let user = state.server()
+ let user = state
+ .server()
.and_then(|server| server.users().get(&msg.get_actor()))
.map(|user| user.name());
if let Some(user) = user {
- notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this
+ notifications::send(format!("{}: {}", user, msg.get_message()));
+ //TODO: probably want a config flag for this
}
state.register_message((msg.get_message().to_owned(), msg.get_actor()));
drop(state);
@@ -345,7 +385,9 @@ async fn listen(
debug!("Login rejected: {:?}", msg);
match msg.get_field_type() {
msgs::Reject_RejectType::WrongServerPW => {
- event_queue.resolve(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword)));
+ event_queue.resolve(TcpEventData::Connected(Err(
+ mumlib::Error::InvalidServerPassword,
+ )));
}
ty => {
warn!("Unhandled reject type: {:?}", ty);
@@ -385,14 +427,11 @@ async fn listen(
// position_info,
..
} => {
- state
- .read()
- .unwrap()
- .audio_output()
- .decode_packet_payload(
- VoiceStreamType::TCP,
- session_id,
- payload);
+ state.read().unwrap().audio_output().decode_packet_payload(
+ VoiceStreamType::TCP,
+ session_id,
+ payload,
+ );
}
}
}
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 0958912..95dcf33 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -2,9 +2,9 @@ use crate::error::UdpError;
use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
-use futures_util::{FutureExt, SinkExt, StreamExt};
use futures_util::future::join4;
use futures_util::stream::{SplitSink, SplitStream, Stream};
+use futures_util::{FutureExt, SinkExt, StreamExt};
use log::*;
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::ping::{PingPacket, PongPacket};
@@ -13,10 +13,13 @@ use mumble_protocol::Serverbound;
use std::collections::{hash_map::Entry, HashMap};
use std::convert::TryFrom;
use std::net::{Ipv6Addr, SocketAddr};
-use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock};
-use tokio::{join, net::UdpSocket};
+use std::sync::{
+ atomic::{AtomicU64, Ordering},
+ Arc, RwLock,
+};
use tokio::sync::{mpsc, oneshot, watch, Mutex};
use tokio::time::{interval, timeout, Duration};
+use tokio::{join, net::UdpSocket};
use tokio_util::udp::UdpFramed;
use super::{run_until, VoiceStreamType};
@@ -53,11 +56,7 @@ pub async fn handle(
run_until(
|phase| matches!(phase, StatePhase::Disconnected),
join4(
- listen(
- Arc::clone(&state),
- Arc::clone(&source),
- &last_ping_recv,
- ),
+ listen(Arc::clone(&state), Arc::clone(&source), &last_ping_recv),
send_voice(
Arc::clone(&sink),
connection_info.socket_addr,
@@ -71,9 +70,11 @@ pub async fn handle(
&last_ping_recv,
),
new_crypt_state(&mut crypt_state_receiver, sink, source),
- ).map(|_| ()),
+ )
+ .map(|_| ()),
phase_watcher,
- ).await;
+ )
+ .await;
debug!("Fully disconnected UDP stream, waiting for new connection info");
}
@@ -83,8 +84,7 @@ async fn connect(
crypt_state: &mut mpsc::Receiver<ClientCryptState>,
) -> Result<(UdpSender, UdpReceiver), UdpError> {
// Bind UDP socket
- let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
- .await?;
+ let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)).await?;
// Wait for initial CryptState
let crypt_state = match crypt_state.recv().await {
@@ -146,11 +146,11 @@ async fn listen(
// position_info,
..
} => {
- state
- .read()
- .unwrap()
- .audio_output()
- .decode_packet_payload(VoiceStreamType::UDP, session_id, payload);
+ state.read().unwrap().audio_output().decode_packet_payload(
+ VoiceStreamType::UDP,
+ session_id,
+ payload,
+ );
}
}
}
@@ -178,12 +178,17 @@ async fn send_pings(
match sink
.lock()
.await
- .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr))
+ .send((
+ VoicePacket::Ping {
+ timestamp: last_recv + 1,
+ },
+ server_addr,
+ ))
.await
{
Ok(_) => {
last_send = Some(last_recv + 1);
- },
+ }
Err(e) => {
debug!("Error sending UDP ping: {}", e);
}
@@ -201,7 +206,10 @@ async fn send_voice(
let mut inner_phase_watcher = phase_watcher.clone();
loop {
inner_phase_watcher.changed().await.unwrap();
- if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::UDP)) {
+ if matches!(
+ *inner_phase_watcher.borrow(),
+ StatePhase::Connected(VoiceStreamType::UDP)
+ ) {
break;
}
}
@@ -215,13 +223,12 @@ async fn send_voice(
}
},
phase_watcher.clone(),
- ).await;
+ )
+ .await;
}
}
-pub async fn handle_pings(
- mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>,
-) {
+pub async fn handle_pings(mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>) {
let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
.await
.expect("Failed to bind UDP socket");
@@ -246,19 +253,23 @@ pub async fn handle_pings(
}
tokio::spawn(async move {
- handle(
- match timeout(Duration::from_secs(1), rx).await {
- Ok(Ok(r)) => Some(r),
- Ok(Err(_)) => {
- warn!("Ping response sender for server {}, ping id {} dropped", socket_addr, id);
- None
- }
- Err(_) => {
- debug!("Server {} timed out when sending ping id {}", socket_addr, id);
- None
- }
+ handle(match timeout(Duration::from_secs(1), rx).await {
+ Ok(Ok(r)) => Some(r),
+ Ok(Err(_)) => {
+ warn!(
+ "Ping response sender for server {}, ping id {} dropped",
+ socket_addr, id
+ );
+ None
}
- );
+ Err(_) => {
+ debug!(
+ "Server {} timed out when sending ping id {}",
+ socket_addr, id
+ );
+ None
+ }
+ });
});
}
};