aboutsummaryrefslogtreecommitdiffstats
path: root/mumd
diff options
context:
space:
mode:
authorEskil Q <eskilq@kth.se>2021-01-01 14:31:08 +0100
committerEskil Q <eskilq@kth.se>2021-01-01 14:31:08 +0100
commit53d1c82eb81b0acb696d2d60ecb8db3fab488105 (patch)
treec62d48474596a3ab34bef400c1a2291bfe73b000 /mumd
parent9777219aa26dc64c0a3dc3d9d2023b8a1c4295fb (diff)
parent4613c6b269d7842645d050bb3482cf7efcfa1946 (diff)
downloadmum-53d1c82eb81b0acb696d2d60ecb8db3fab488105.tar.gz
Merge branch 'main' into noise-gate
Diffstat (limited to 'mumd')
-rw-r--r--mumd/Cargo.toml10
-rw-r--r--mumd/src/audio.rs4
-rw-r--r--mumd/src/audio/input.rs2
-rw-r--r--mumd/src/network/tcp.rs30
-rw-r--r--mumd/src/network/udp.rs39
-rw-r--r--mumd/src/state.rs8
6 files changed, 44 insertions, 49 deletions
diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml
index e548559..caf1a9b 100644
--- a/mumd/Cargo.toml
+++ b/mumd/Cargo.toml
@@ -19,8 +19,8 @@ notifications = ["libnotify"]
mumlib = { version = "0.3", path = "../mumlib" }
argparse = "0.2"
-bytes = "0.5"
cpal = "0.13"
+bytes = "1.0"
dasp_interpolate = { version = "0.11", features = ["linear"] }
dasp_signal = "0.11"
dasp_frame = "0.11"
@@ -31,14 +31,14 @@ futures-util = "0.3"
hound = "3.4"
ipc-channel = "0.14"
log = "0.4"
-mumble-protocol = "0.3.1"
+mumble-protocol = "0.4.0"
native-tls = "0.2"
openssl = { version = "0.10" }
opus = "0.2"
serde = { version = "1.0", features = ["derive"] }
-tokio = { version = "0.2", features = ["blocking", "macros", "rt-core", "sync", "tcp", "time"] }
-tokio-tls = "0.3"
-tokio-util = { version = "0.3", features = ["codec", "udp"] }
+tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "net", "time"] }
+tokio-native-tls = "0.3"
+tokio-util = { version = "0.6", features = ["codec", "net"] }
libnotify = { version = "1.0", optional = true }
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index ee5516a..0998f06 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -322,11 +322,11 @@ impl Audio {
}
pub fn set_input_volume(&self, input_volume: f32) {
- self.input_volume_sender.broadcast(input_volume).unwrap();
+ self.input_volume_sender.send(input_volume).unwrap();
}
pub fn set_output_volume(&self, output_volume: f32) {
- self.output_volume_sender.broadcast(output_volume).unwrap();
+ self.output_volume_sender.send(output_volume).unwrap();
}
pub fn set_user_volume(&self, id: u32, volume: f32) {
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
index d04c728..914891b 100644
--- a/mumd/src/audio/input.rs
+++ b/mumd/src/audio/input.rs
@@ -8,7 +8,7 @@ use log::*;
pub fn callback<T: Sample>(
mut opus_encoder: opus::Encoder,
- mut input_sender: mpsc::Sender<VoicePacketPayload>,
+ input_sender: mpsc::Sender<VoicePacketPayload>,
sample_rate: u32,
input_volume_receiver: watch::Receiver<f32>,
opus_frame_size_blocks: u32, // blocks of 2.5ms
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index b1743c8..3c96ee1 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -17,7 +17,7 @@ use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::{self, Duration};
-use tokio_tls::{TlsConnector, TlsStream};
+use tokio_native_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
type TcpSender = SplitSink<
@@ -48,16 +48,13 @@ pub async fn handle(
mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>,
) {
loop {
- let connection_info = loop {
- match connection_info_receiver.recv().await {
- None => {
- return;
- }
- Some(None) => {}
- Some(Some(connection_info)) => {
- break connection_info;
+ let connection_info = 'data: loop {
+ while connection_info_receiver.changed().await.is_ok() {
+ if let Some(data) = connection_info_receiver.borrow().clone() {
+ break 'data data;
}
}
+ return;
};
let (mut sink, stream) = connect(
connection_info.socket_addr,
@@ -164,9 +161,6 @@ async fn send_packets(
sink.borrow_mut().send(packet).await.unwrap();
},
|| async {
- //clears queue of remaining packets
- while packet_receiver.borrow_mut().try_recv().is_ok() {}
-
sink.borrow_mut().close().await.unwrap();
},
phase_watcher,
@@ -215,7 +209,7 @@ async fn listen(
}
ControlPacket::ServerSync(msg) => {
info!("Logged in");
- if let Some(mut sender) = crypt_state_sender.borrow_mut().take() {
+ if let Some(sender) = crypt_state_sender.borrow_mut().take() {
let _ = sender
.send(
crypt_state
@@ -323,10 +317,12 @@ async fn run_until_disconnection<T, F, G, H>(
{
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
- while !matches!(
- phase_watcher.recv().await.unwrap(),
- StatePhase::Disconnected
- ) {}
+ loop {
+ phase_watcher.changed().await.unwrap();
+ if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) {
+ break;
+ }
+ }
tx.send(true).unwrap();
};
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index b1c202a..b592a60 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -29,16 +29,13 @@ pub async fn handle(
let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap();
loop {
- let connection_info = loop {
- match connection_info_receiver.recv().await {
- None => {
- return;
- }
- Some(None) => {}
- Some(Some(connection_info)) => {
- break connection_info;
+ let connection_info = 'data: loop {
+ while connection_info_receiver.changed().await.is_ok() {
+ if let Some(data) = connection_info_receiver.borrow().clone() {
+ break 'data data;
}
}
+ return;
};
let (mut sink, source) = connect(&mut crypt_state_receiver).await;
@@ -114,10 +111,12 @@ async fn listen(
) {
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
- while !matches!(
- phase_watcher.recv().await.unwrap(),
- StatePhase::Disconnected
- ) {}
+ loop {
+ phase_watcher.changed().await.unwrap();
+ if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) {
+ break;
+ }
+ }
tx.send(true).unwrap();
};
@@ -203,10 +202,12 @@ async fn send_voice(
) {
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
- while !matches!(
- phase_watcher.recv().await.unwrap(),
- StatePhase::Disconnected
- ) {}
+ loop {
+ phase_watcher.changed().await.unwrap();
+ if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) {
+ break;
+ }
+ }
tx.send(true).unwrap();
};
@@ -265,22 +266,20 @@ pub async fn handle_pings(
.await
.expect("Failed to bind UDP socket");
- let (mut receiver, mut sender) = udp_socket.split();
-
let pending = Rc::new(Mutex::new(HashMap::new()));
let sender_handle = async {
while let Some((id, socket_addr, handle)) = ping_request_receiver.recv().await {
let packet = PingPacket { id };
let packet: [u8; 12] = packet.into();
- sender.send_to(&packet, &socket_addr).await.unwrap();
+ udp_socket.send_to(&packet, &socket_addr).await.unwrap();
pending.lock().unwrap().insert(id, handle);
}
};
let receiver_handle = async {
let mut buf = vec![0; 24];
- while let Ok(read) = receiver.recv(&mut buf).await {
+ while let Ok(read) = udp_socket.recv(&mut buf).await {
assert_eq!(read, 24);
let packet = PongPacket::try_from(buf.as_slice()).unwrap();
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index aba0931..85e5449 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -321,7 +321,7 @@ impl State {
self.server = Some(server);
self.phase_watcher
.0
- .broadcast(StatePhase::Connecting)
+ .send(StatePhase::Connecting)
.unwrap();
let socket_addr = match (host.as_ref(), port)
@@ -335,7 +335,7 @@ impl State {
}
};
self.connection_info_sender
- .broadcast(Some(ConnectionInfo::new(
+ .send(Some(ConnectionInfo::new(
socket_addr,
host,
accept_invalid_cert,
@@ -366,7 +366,7 @@ impl State {
self.phase_watcher
.0
- .broadcast(StatePhase::Disconnected)
+ .send(StatePhase::Disconnected)
.unwrap();
self.audio.play_effect(NotificationEvents::ServerDisconnect);
now!(Ok(None))
@@ -581,7 +581,7 @@ impl State {
pub fn initialized(&self) {
self.phase_watcher
.0
- .broadcast(StatePhase::Connected)
+ .send(StatePhase::Connected)
.unwrap();
self.audio.play_effect(NotificationEvents::ServerConnect);
}