aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2021-04-06 20:48:15 +0200
committerGitHub <noreply@github.com>2021-04-06 20:48:15 +0200
commit9c116d48765ae9c567a9588e64995c404c9c26ab (patch)
treea65da0e2a48b5f1f02e963c87edf8515851255ac
parent06a765afc73ec1f8f2af27f4ea2730ddaaf05852 (diff)
parente01383af1c417666d42a802e44a1d1e98bbcf14e (diff)
downloadmum-9c116d48765ae9c567a9588e64995c404c9c26ab.tar.gz
Merge pull request #82 from rbran/main
-rw-r--r--mumd/src/client.rs6
-rw-r--r--mumd/src/command.rs8
-rw-r--r--mumd/src/network/tcp.rs42
-rw-r--r--mumd/src/network/udp.rs24
4 files changed, 42 insertions, 38 deletions
diff --git a/mumd/src/client.rs b/mumd/src/client.rs
index c1a0152..9c2c2a0 100644
--- a/mumd/src/client.rs
+++ b/mumd/src/client.rs
@@ -6,8 +6,8 @@ use crate::state::State;
use futures_util::{select, FutureExt};
use mumble_protocol::{Serverbound, control::ControlPacket, crypt::ClientCryptState};
use mumlib::command::{Command, CommandResponse};
-use std::sync::Arc;
-use tokio::sync::{Mutex, mpsc, oneshot, watch};
+use std::sync::{Arc, RwLock};
+use tokio::sync::{mpsc, oneshot, watch};
pub async fn handle(
state: State,
@@ -27,7 +27,7 @@ pub async fn handle(
let (response_sender, response_receiver) =
mpsc::unbounded_channel();
- let state = Arc::new(Mutex::new(state));
+ let state = Arc::new(RwLock::new(state));
select! {
r = tcp::handle(
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index 3e462b1..7eec388 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -8,11 +8,11 @@ use crate::state::{ExecutionContext, State};
use log::*;
use mumble_protocol::{Serverbound, control::ControlPacket};
use mumlib::command::{Command, CommandResponse};
-use std::sync::Arc;
-use tokio::sync::{mpsc, oneshot, watch, Mutex};
+use std::sync::{Arc, RwLock};
+use tokio::sync::{mpsc, oneshot, watch};
pub async fn handle(
- state: Arc<Mutex<State>>,
+ state: Arc<RwLock<State>>,
mut command_receiver: mpsc::UnboundedReceiver<(
Command,
oneshot::Sender<mumlib::error::Result<Option<CommandResponse>>>,
@@ -25,7 +25,7 @@ pub async fn handle(
debug!("Begin listening for commands");
while let Some((command, response_sender)) = command_receiver.recv().await {
debug!("Received command {:?}", command);
- let mut state = state.lock().await;
+ let mut state = state.write().unwrap();
let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender);
drop(state);
match event {
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 6402a89..02477dc 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -13,7 +13,7 @@ use mumble_protocol::{Clientbound, Serverbound};
use std::collections::HashMap;
use std::convert::{Into, TryInto};
use std::net::SocketAddr;
-use std::sync::Arc;
+use std::sync::{Arc, RwLock};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, watch, Mutex};
use tokio::time::{self, Duration};
@@ -79,7 +79,7 @@ impl TcpEventQueue {
}
pub async fn handle(
- state: Arc<Mutex<State>>,
+ state: Arc<RwLock<State>>,
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
crypt_state_sender: mpsc::Sender<ClientCryptState>,
packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
@@ -103,13 +103,17 @@ pub async fn handle(
.await?;
// Handshake (omitting `Version` message for brevity)
- let state_lock = state.lock().await;
- let username = state_lock.username().unwrap().to_string();
- let password = state_lock.password().map(|x| x.to_string());
+ let (username, password) = {
+ let state_lock = state.read().unwrap();
+ (state_lock.username().unwrap().to_string(),
+ state_lock.password().map(|x| x.to_string()))
+ };
authenticate(&mut sink, username, password).await?;
- let phase_watcher = state_lock.phase_receiver();
- let input_receiver = state_lock.audio().input_receiver();
- drop(state_lock);
+ let (phase_watcher, input_receiver) = {
+ let state_lock = state.read().unwrap();
+ (state_lock.phase_receiver(),
+ state_lock.audio().input_receiver())
+ };
let event_queue = TcpEventQueue::new();
info!("Logging in...");
@@ -241,7 +245,7 @@ async fn send_voice(
}
async fn listen(
- state: Arc<Mutex<State>>,
+ state: Arc<RwLock<State>>,
mut stream: TcpReceiver,
crypt_state_sender: mpsc::Sender<ClientCryptState>,
event_queue: TcpEventQueue,
@@ -260,7 +264,7 @@ async fn listen(
// We end up here if the login was rejected. We probably want
// to exit before that.
warn!("TCP stream gone");
- state.lock().await.broadcast_phase(StatePhase::Disconnected);
+ state.read().unwrap().broadcast_phase(StatePhase::Disconnected);
break;
}
};
@@ -299,7 +303,7 @@ async fn listen(
.await;
}
event_queue.resolve(TcpEventData::Connected(Ok(&msg))).await;
- let mut state = state.lock().await;
+ let mut state = state.write().unwrap();
let server = state.server_mut().unwrap();
server.parse_server_sync(*msg);
match &server.welcome_text {
@@ -323,24 +327,24 @@ async fn listen(
}
}
ControlPacket::UserState(msg) => {
- state.lock().await.parse_user_state(*msg);
+ state.write().unwrap().parse_user_state(*msg);
}
ControlPacket::UserRemove(msg) => {
- state.lock().await.remove_client(*msg);
+ state.write().unwrap().remove_client(*msg);
}
ControlPacket::ChannelState(msg) => {
debug!("Channel state received");
state
- .lock()
- .await
+ .write()
+ .unwrap()
.server_mut()
.unwrap()
.parse_channel_state(*msg); //TODO parse initial if initial
}
ControlPacket::ChannelRemove(msg) => {
state
- .lock()
- .await
+ .write()
+ .unwrap()
.server_mut()
.unwrap()
.parse_channel_remove(*msg);
@@ -356,8 +360,8 @@ async fn listen(
..
} => {
state
- .lock()
- .await
+ .read()
+ .unwrap()
.audio()
.decode_packet_payload(
VoiceStreamType::TCP,
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 5996e43..cc085b5 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -14,7 +14,7 @@ use std::convert::TryFrom;
use std::net::{Ipv6Addr, SocketAddr};
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::Arc;
+use std::sync::{Arc, RwLock};
use tokio::{join, net::UdpSocket};
use tokio::sync::{mpsc, watch, Mutex};
use tokio::time::{interval, Duration};
@@ -29,11 +29,11 @@ type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound
type UdpReceiver = SplitStream<UdpFramed<ClientCryptState>>;
pub async fn handle(
- state: Arc<Mutex<State>>,
+ state: Arc<RwLock<State>>,
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>,
) -> Result<(), UdpError> {
- let receiver = state.lock().await.audio().input_receiver();
+ let receiver = state.read().unwrap().audio().input_receiver();
loop {
let connection_info = 'data: loop {
@@ -49,7 +49,7 @@ pub async fn handle(
let sink = Arc::new(Mutex::new(sink));
let source = Arc::new(Mutex::new(source));
- let phase_watcher = state.lock().await.phase_receiver();
+ let phase_watcher = state.read().unwrap().phase_receiver();
let last_ping_recv = AtomicU64::new(0);
run_until(
@@ -119,7 +119,7 @@ async fn new_crypt_state(
}
async fn listen(
- state: Arc<Mutex<State>>,
+ state: Arc<RwLock<State>>,
source: Arc<Mutex<UdpReceiver>>,
last_ping_recv: &AtomicU64,
) {
@@ -136,8 +136,8 @@ async fn listen(
match packet {
VoicePacket::Ping { timestamp } => {
state
- .lock() //TODO clean up unnecessary lock by only updating phase if it should change
- .await
+ .read()
+ .unwrap()
.broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP));
last_ping_recv.store(timestamp, Ordering::Relaxed);
}
@@ -149,8 +149,8 @@ async fn listen(
..
} => {
state
- .lock() //TODO change so that we only have to lock audio and not the whole state
- .await
+ .read()
+ .unwrap()
.audio()
.decode_packet_payload(VoiceStreamType::UDP, session_id, payload);
}
@@ -159,7 +159,7 @@ async fn listen(
}
async fn send_pings(
- state: Arc<Mutex<State>>,
+ state: Arc<RwLock<State>>,
sink: Arc<Mutex<UdpSender>>,
server_addr: SocketAddr,
last_ping_recv: &AtomicU64,
@@ -173,8 +173,8 @@ async fn send_pings(
if last_send.is_some() && last_send.unwrap() != last_recv {
debug!("Sending TCP voice");
state
- .lock()
- .await
+ .read()
+ .unwrap()
.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP));
}
match sink