From e431ecb6c5c8406bde6a54f40ee2f648cc0cec05 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Mon, 5 Apr 2021 13:39:55 -0300 Subject: Separate the input and output audio --- mumd/src/network/tcp.rs | 4 ++-- mumd/src/network/udp.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 6402a89..d2f0b41 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -108,7 +108,7 @@ pub async fn handle( let password = state_lock.password().map(|x| x.to_string()); authenticate(&mut sink, username, password).await?; let phase_watcher = state_lock.phase_receiver(); - let input_receiver = state_lock.audio().input_receiver(); + let input_receiver = state_lock.audio_input().input_receiver(); drop(state_lock); let event_queue = TcpEventQueue::new(); @@ -358,7 +358,7 @@ async fn listen( state .lock() .await - .audio() + .audio_output() .decode_packet_payload( VoiceStreamType::TCP, session_id, diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 5996e43..a8e190d 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -33,7 +33,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) -> Result<(), UdpError> { - let receiver = state.lock().await.audio().input_receiver(); + let receiver = state.lock().await.audio_input().input_receiver(); loop { let connection_info = 'data: loop { @@ -151,7 +151,7 @@ async fn listen( state .lock() //TODO change so that we only have to lock audio and not the whole state .await - .audio() + .audio_output() .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } -- cgit v1.2.1 From 38270c4a2374c2ccc04597a28fb191af9d86b814 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Mon, 5 Apr 2021 13:50:58 -0300 Subject: Rename audio functions and basic indentation --- mumd/src/network/tcp.rs | 2 +- mumd/src/network/udp.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index d2f0b41..1414318 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -108,7 +108,7 @@ pub async fn handle( let password = state_lock.password().map(|x| x.to_string()); authenticate(&mut sink, username, password).await?; let phase_watcher = state_lock.phase_receiver(); - let input_receiver = state_lock.audio_input().input_receiver(); + let input_receiver = state_lock.audio_input().receiver(); drop(state_lock); let event_queue = TcpEventQueue::new(); diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index a8e190d..8614358 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -33,7 +33,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) -> Result<(), UdpError> { - let receiver = state.lock().await.audio_input().input_receiver(); + let receiver = state.lock().await.audio_input().receiver(); loop { let connection_info = 'data: loop { -- cgit v1.2.1 From a39934e562fd2755fcb7b1ed271bcf3f31aaa0d5 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Mon, 5 Apr 2021 11:04:19 -0300 Subject: Replace State tokio::sync::Mutex by std::sync::RwLock --- mumd/src/network/tcp.rs | 28 ++++++++++++++-------------- mumd/src/network/udp.rs | 28 +++++++++++++++++++--------- 2 files changed, 33 insertions(+), 23 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 1414318..c5eded2 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -10,7 +10,7 @@ use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPa use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; use mumble_protocol::{Clientbound, Serverbound}; -use std::collections::HashMap; +use std::{collections::HashMap, sync::RwLock}; use std::convert::{Into, TryInto}; use std::net::SocketAddr; use std::sync::Arc; @@ -79,7 +79,7 @@ impl TcpEventQueue { } pub async fn handle( - state: Arc>, + state: Arc>, mut connection_info_receiver: watch::Receiver>, crypt_state_sender: mpsc::Sender, packet_sender: mpsc::UnboundedSender>, @@ -103,7 +103,7 @@ pub async fn handle( .await?; // Handshake (omitting `Version` message for brevity) - let state_lock = state.lock().await; + let state_lock = state.read().unwrap(); let username = state_lock.username().unwrap().to_string(); let password = state_lock.password().map(|x| x.to_string()); authenticate(&mut sink, username, password).await?; @@ -241,7 +241,7 @@ async fn send_voice( } async fn listen( - state: Arc>, + state: Arc>, mut stream: TcpReceiver, crypt_state_sender: mpsc::Sender, event_queue: TcpEventQueue, @@ -260,7 +260,7 @@ async fn listen( // We end up here if the login was rejected. We probably want // to exit before that. warn!("TCP stream gone"); - state.lock().await.broadcast_phase(StatePhase::Disconnected); + state.read().unwrap().broadcast_phase(StatePhase::Disconnected); break; } }; @@ -299,7 +299,7 @@ async fn listen( .await; } event_queue.resolve(TcpEventData::Connected(Ok(&msg))).await; - let mut state = state.lock().await; + let mut state = state.write().unwrap(); let server = state.server_mut().unwrap(); server.parse_server_sync(*msg); match &server.welcome_text { @@ -323,24 +323,24 @@ async fn listen( } } ControlPacket::UserState(msg) => { - state.lock().await.parse_user_state(*msg); + state.write().unwrap().parse_user_state(*msg); } ControlPacket::UserRemove(msg) => { - state.lock().await.remove_client(*msg); + state.write().unwrap().remove_client(*msg); } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); state - .lock() - .await + .write() + .unwrap() .server_mut() .unwrap() .parse_channel_state(*msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { state - .lock() - .await + .write() + .unwrap() .server_mut() .unwrap() .parse_channel_remove(*msg); @@ -356,8 +356,8 @@ async fn listen( .. } => { state - .lock() - .await + .read() + .unwrap() .audio_output() .decode_packet_payload( VoiceStreamType::TCP, diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 8614358..f24d4b4 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -9,7 +9,7 @@ use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; use mumble_protocol::voice::VoicePacket; use mumble_protocol::Serverbound; -use std::collections::HashMap; +use std::{collections::HashMap, sync::RwLock}; use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; use std::rc::Rc; @@ -29,11 +29,15 @@ type UdpSender = SplitSink, (VoicePacket>; pub async fn handle( - state: Arc>, + state: Arc>, mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) -> Result<(), UdpError> { +<<<<<<< HEAD let receiver = state.lock().await.audio_input().receiver(); +======= + let receiver = state.read().unwrap().audio().input_receiver(); +>>>>>>> 48f0d38 (Replace State tokio::sync::Mutex by std::sync::RwLock) loop { let connection_info = 'data: loop { @@ -49,7 +53,7 @@ pub async fn handle( let sink = Arc::new(Mutex::new(sink)); let source = Arc::new(Mutex::new(source)); - let phase_watcher = state.lock().await.phase_receiver(); + let phase_watcher = state.read().unwrap().phase_receiver(); let last_ping_recv = AtomicU64::new(0); run_until( @@ -119,7 +123,7 @@ async fn new_crypt_state( } async fn listen( - state: Arc>, + state: Arc>, source: Arc>, last_ping_recv: &AtomicU64, ) { @@ -136,8 +140,8 @@ async fn listen( match packet { VoicePacket::Ping { timestamp } => { state - .lock() //TODO clean up unnecessary lock by only updating phase if it should change - .await + .read() + .unwrap() .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP)); last_ping_recv.store(timestamp, Ordering::Relaxed); } @@ -149,9 +153,15 @@ async fn listen( .. } => { state +<<<<<<< HEAD .lock() //TODO change so that we only have to lock audio and not the whole state .await .audio_output() +======= + .read() + .unwrap() + .audio() +>>>>>>> 48f0d38 (Replace State tokio::sync::Mutex by std::sync::RwLock) .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } @@ -159,7 +169,7 @@ async fn listen( } async fn send_pings( - state: Arc>, + state: Arc>, sink: Arc>, server_addr: SocketAddr, last_ping_recv: &AtomicU64, @@ -173,8 +183,8 @@ async fn send_pings( if last_send.is_some() && last_send.unwrap() != last_recv { debug!("Sending TCP voice"); state - .lock() - .await + .read() + .unwrap() .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); } match sink -- cgit v1.2.1 From 14b0ce912735243e566a95838e9ed0c93a2e7f3e Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Fri, 9 Apr 2021 15:02:51 -0300 Subject: Resolved merge upstream conflicts --- mumd/src/network/udp.rs | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index f24d4b4..11a3f27 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -33,11 +33,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) -> Result<(), UdpError> { -<<<<<<< HEAD - let receiver = state.lock().await.audio_input().receiver(); -======= - let receiver = state.read().unwrap().audio().input_receiver(); ->>>>>>> 48f0d38 (Replace State tokio::sync::Mutex by std::sync::RwLock) + let receiver = state.read().unwrap().audio_input().receiver(); loop { let connection_info = 'data: loop { @@ -153,15 +149,9 @@ async fn listen( .. } => { state -<<<<<<< HEAD - .lock() //TODO change so that we only have to lock audio and not the whole state - .await - .audio_output() -======= .read() .unwrap() - .audio() ->>>>>>> 48f0d38 (Replace State tokio::sync::Mutex by std::sync::RwLock) + .audio_output() .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } -- cgit v1.2.1 From 7c5f60f210bfd05ea22d3a65f04e245989fdaade Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Tue, 6 Apr 2021 15:18:55 -0300 Subject: Resolved merge upstream conflicts --- mumd/src/network/tcp.rs | 20 ++++++++++++-------- mumd/src/network/udp.rs | 4 ++-- 2 files changed, 14 insertions(+), 10 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index c5eded2..7606987 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -10,10 +10,10 @@ use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPa use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; use mumble_protocol::{Clientbound, Serverbound}; -use std::{collections::HashMap, sync::RwLock}; +use std::collections::HashMap; use std::convert::{Into, TryInto}; use std::net::SocketAddr; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use tokio::net::TcpStream; use tokio::sync::{mpsc, watch, Mutex}; use tokio::time::{self, Duration}; @@ -103,13 +103,17 @@ pub async fn handle( .await?; // Handshake (omitting `Version` message for brevity) - let state_lock = state.read().unwrap(); - let username = state_lock.username().unwrap().to_string(); - let password = state_lock.password().map(|x| x.to_string()); + let (username, password) = { + let state_lock = state.read().unwrap(); + (state_lock.username().unwrap().to_string(), + state_lock.password().map(|x| x.to_string())) + }; authenticate(&mut sink, username, password).await?; - let phase_watcher = state_lock.phase_receiver(); - let input_receiver = state_lock.audio_input().receiver(); - drop(state_lock); + let (phase_watcher, input_receiver) = { + let state_lock = state.read().unwrap(); + (state_lock.phase_receiver(), + state_lock.audio_input().receiver()) + }; let event_queue = TcpEventQueue::new(); info!("Logging in..."); diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 11a3f27..3ca77af 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -9,12 +9,12 @@ use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; use mumble_protocol::voice::VoicePacket; use mumble_protocol::Serverbound; -use std::{collections::HashMap, sync::RwLock}; +use std::collections::HashMap; use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use tokio::{join, net::UdpSocket}; use tokio::sync::{mpsc, watch, Mutex}; use tokio::time::{interval, Duration}; -- cgit v1.2.1 From ab649ca21286baf182c18d11450bf58d25af0c84 Mon Sep 17 00:00:00 2001 From: Rubens Brandao Date: Sat, 10 Apr 2021 07:39:46 -0300 Subject: Ops, forgot to fix upstream conflicts --- mumd/src/network/udp.rs | 8 -------- 1 file changed, 8 deletions(-) (limited to 'mumd/src/network') diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 086e072..3ca77af 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -33,11 +33,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver>, mut crypt_state_receiver: mpsc::Receiver, ) -> Result<(), UdpError> { -<<<<<<< HEAD let receiver = state.read().unwrap().audio_input().receiver(); -======= - let receiver = state.read().unwrap().audio().input_receiver(); ->>>>>>> main loop { let connection_info = 'data: loop { @@ -155,11 +151,7 @@ async fn listen( state .read() .unwrap() -<<<<<<< HEAD .audio_output() -======= - .audio() ->>>>>>> main .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } -- cgit v1.2.1