diff options
| author | Eskil Q <eskilq@kth.se> | 2021-01-01 14:31:08 +0100 |
|---|---|---|
| committer | Eskil Q <eskilq@kth.se> | 2021-01-01 14:31:08 +0100 |
| commit | 53d1c82eb81b0acb696d2d60ecb8db3fab488105 (patch) | |
| tree | c62d48474596a3ab34bef400c1a2291bfe73b000 /mumd | |
| parent | 9777219aa26dc64c0a3dc3d9d2023b8a1c4295fb (diff) | |
| parent | 4613c6b269d7842645d050bb3482cf7efcfa1946 (diff) | |
| download | mum-53d1c82eb81b0acb696d2d60ecb8db3fab488105.tar.gz | |
Merge branch 'main' into noise-gate
Diffstat (limited to 'mumd')
| -rw-r--r-- | mumd/Cargo.toml | 10 | ||||
| -rw-r--r-- | mumd/src/audio.rs | 4 | ||||
| -rw-r--r-- | mumd/src/audio/input.rs | 2 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 30 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 39 | ||||
| -rw-r--r-- | mumd/src/state.rs | 8 |
6 files changed, 44 insertions, 49 deletions
diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml index e548559..caf1a9b 100644 --- a/mumd/Cargo.toml +++ b/mumd/Cargo.toml @@ -19,8 +19,8 @@ notifications = ["libnotify"] mumlib = { version = "0.3", path = "../mumlib" } argparse = "0.2" -bytes = "0.5" cpal = "0.13" +bytes = "1.0" dasp_interpolate = { version = "0.11", features = ["linear"] } dasp_signal = "0.11" dasp_frame = "0.11" @@ -31,14 +31,14 @@ futures-util = "0.3" hound = "3.4" ipc-channel = "0.14" log = "0.4" -mumble-protocol = "0.3.1" +mumble-protocol = "0.4.0" native-tls = "0.2" openssl = { version = "0.10" } opus = "0.2" serde = { version = "1.0", features = ["derive"] } -tokio = { version = "0.2", features = ["blocking", "macros", "rt-core", "sync", "tcp", "time"] } -tokio-tls = "0.3" -tokio-util = { version = "0.3", features = ["codec", "udp"] } +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "net", "time"] } +tokio-native-tls = "0.3" +tokio-util = { version = "0.6", features = ["codec", "net"] } libnotify = { version = "1.0", optional = true } diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index ee5516a..0998f06 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -322,11 +322,11 @@ impl Audio { } pub fn set_input_volume(&self, input_volume: f32) { - self.input_volume_sender.broadcast(input_volume).unwrap(); + self.input_volume_sender.send(input_volume).unwrap(); } pub fn set_output_volume(&self, output_volume: f32) { - self.output_volume_sender.broadcast(output_volume).unwrap(); + self.output_volume_sender.send(output_volume).unwrap(); } pub fn set_user_volume(&self, id: u32, volume: f32) { diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index d04c728..914891b 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -8,7 +8,7 @@ use log::*; pub fn callback<T: Sample>( mut opus_encoder: opus::Encoder, - mut input_sender: mpsc::Sender<VoicePacketPayload>, + input_sender: mpsc::Sender<VoicePacketPayload>, sample_rate: u32, input_volume_receiver: watch::Receiver<f32>, opus_frame_size_blocks: u32, // blocks of 2.5ms diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index b1743c8..3c96ee1 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -17,7 +17,7 @@ use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{self, Duration}; -use tokio_tls::{TlsConnector, TlsStream}; +use tokio_native_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; type TcpSender = SplitSink< @@ -48,16 +48,13 @@ pub async fn handle( mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, ) { loop { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { - return; - } - Some(None) => {} - Some(Some(connection_info)) => { - break connection_info; + let connection_info = 'data: loop { + while connection_info_receiver.changed().await.is_ok() { + if let Some(data) = connection_info_receiver.borrow().clone() { + break 'data data; } } + return; }; let (mut sink, stream) = connect( connection_info.socket_addr, @@ -164,9 +161,6 @@ async fn send_packets( sink.borrow_mut().send(packet).await.unwrap(); }, || async { - //clears queue of remaining packets - while packet_receiver.borrow_mut().try_recv().is_ok() {} - sink.borrow_mut().close().await.unwrap(); }, phase_watcher, @@ -215,7 +209,7 @@ async fn listen( } ControlPacket::ServerSync(msg) => { info!("Logged in"); - if let Some(mut sender) = crypt_state_sender.borrow_mut().take() { + if let Some(sender) = crypt_state_sender.borrow_mut().take() { let _ = sender .send( crypt_state @@ -323,10 +317,12 @@ async fn run_until_disconnection<T, F, G, H>( { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!( - phase_watcher.recv().await.unwrap(), - StatePhase::Disconnected - ) {} + loop { + phase_watcher.changed().await.unwrap(); + if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) { + break; + } + } tx.send(true).unwrap(); }; diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index b1c202a..b592a60 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -29,16 +29,13 @@ pub async fn handle( let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); loop { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { - return; - } - Some(None) => {} - Some(Some(connection_info)) => { - break connection_info; + let connection_info = 'data: loop { + while connection_info_receiver.changed().await.is_ok() { + if let Some(data) = connection_info_receiver.borrow().clone() { + break 'data data; } } + return; }; let (mut sink, source) = connect(&mut crypt_state_receiver).await; @@ -114,10 +111,12 @@ async fn listen( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!( - phase_watcher.recv().await.unwrap(), - StatePhase::Disconnected - ) {} + loop { + phase_watcher.changed().await.unwrap(); + if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) { + break; + } + } tx.send(true).unwrap(); }; @@ -203,10 +202,12 @@ async fn send_voice( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!( - phase_watcher.recv().await.unwrap(), - StatePhase::Disconnected - ) {} + loop { + phase_watcher.changed().await.unwrap(); + if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) { + break; + } + } tx.send(true).unwrap(); }; @@ -265,22 +266,20 @@ pub async fn handle_pings( .await .expect("Failed to bind UDP socket"); - let (mut receiver, mut sender) = udp_socket.split(); - let pending = Rc::new(Mutex::new(HashMap::new())); let sender_handle = async { while let Some((id, socket_addr, handle)) = ping_request_receiver.recv().await { let packet = PingPacket { id }; let packet: [u8; 12] = packet.into(); - sender.send_to(&packet, &socket_addr).await.unwrap(); + udp_socket.send_to(&packet, &socket_addr).await.unwrap(); pending.lock().unwrap().insert(id, handle); } }; let receiver_handle = async { let mut buf = vec![0; 24]; - while let Ok(read) = receiver.recv(&mut buf).await { + while let Ok(read) = udp_socket.recv(&mut buf).await { assert_eq!(read, 24); let packet = PongPacket::try_from(buf.as_slice()).unwrap(); diff --git a/mumd/src/state.rs b/mumd/src/state.rs index aba0931..85e5449 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -321,7 +321,7 @@ impl State { self.server = Some(server); self.phase_watcher .0 - .broadcast(StatePhase::Connecting) + .send(StatePhase::Connecting) .unwrap(); let socket_addr = match (host.as_ref(), port) @@ -335,7 +335,7 @@ impl State { } }; self.connection_info_sender - .broadcast(Some(ConnectionInfo::new( + .send(Some(ConnectionInfo::new( socket_addr, host, accept_invalid_cert, @@ -366,7 +366,7 @@ impl State { self.phase_watcher .0 - .broadcast(StatePhase::Disconnected) + .send(StatePhase::Disconnected) .unwrap(); self.audio.play_effect(NotificationEvents::ServerDisconnect); now!(Ok(None)) @@ -581,7 +581,7 @@ impl State { pub fn initialized(&self) { self.phase_watcher .0 - .broadcast(StatePhase::Connected) + .send(StatePhase::Connected) .unwrap(); self.audio.play_effect(NotificationEvents::ServerConnect); } |
