From 48ce14064d355ad0ed89e59b1d4b10256c85be6a Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Fri, 25 Dec 2020 15:24:19 +0100 Subject: initial tokio 1.0 commit --- mumd/src/audio.rs | 4 ++-- mumd/src/network/tcp.rs | 41 +++++++++++++++++++++------------ mumd/src/network/udp.rs | 60 ++++++++++++++++++++++++++++++++----------------- mumd/src/state.rs | 8 +++---- 4 files changed, 72 insertions(+), 41 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 812bb4c..9e8bd6e 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -305,11 +305,11 @@ impl Audio { } pub fn set_input_volume(&self, input_volume: f32) { - self.input_volume_sender.broadcast(input_volume).unwrap(); + self.input_volume_sender.send(input_volume).unwrap(); } pub fn set_output_volume(&self, output_volume: f32) { - self.output_volume_sender.broadcast(output_volume).unwrap(); + self.output_volume_sender.send(output_volume).unwrap(); } pub fn set_user_volume(&self, id: u32, volume: f32) { diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index b1743c8..9024ef3 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -17,8 +17,9 @@ use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{self, Duration}; -use tokio_tls::{TlsConnector, TlsStream}; +use tokio_native_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; +// use tokio_util::codec::decoder::Decoder; type TcpSender = SplitSink< Framed, ControlCodec>, @@ -48,16 +49,22 @@ pub async fn handle( mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, ) { loop { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { - return; - } - Some(None) => {} - Some(Some(connection_info)) => { - break connection_info; + let connection_info = 'data: loop { + while let Ok(()) = connection_info_receiver.changed().await { + if let Some(data) = connection_info_receiver.borrow().clone() { + break 'data data; } } + return; + // match connection_info_receiver.changed().await { + // None => { + // return; + // } + // Some(None) => {} + // Some(Some(connection_info)) => { + // break connection_info; + // } + // } }; let (mut sink, stream) = connect( connection_info.socket_addr, @@ -165,7 +172,7 @@ async fn send_packets( }, || async { //clears queue of remaining packets - while packet_receiver.borrow_mut().try_recv().is_ok() {} + // while packet_receiver.borrow_mut().try_recv().is_ok() {} sink.borrow_mut().close().await.unwrap(); }, @@ -323,10 +330,16 @@ async fn run_until_disconnection( { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!( - phase_watcher.recv().await.unwrap(), - StatePhase::Disconnected - ) {} + loop { + phase_watcher.changed().await.unwrap(); + if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) { + break; + } + } + // while !matches!( + // phase_watcher.recv().await.unwrap(), + // StatePhase::Disconnected + // ) {} tx.send(true).unwrap(); }; diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index b1c202a..55a47a3 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -29,16 +29,22 @@ pub async fn handle( let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); loop { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { - return; - } - Some(None) => {} - Some(Some(connection_info)) => { - break connection_info; + let connection_info = 'data: loop { + while let Ok(()) = connection_info_receiver.changed().await { + if let Some(data) = connection_info_receiver.borrow().clone() { + break 'data data; } } + return; + // match connection_info_receiver.recv().await { + // None => { + // return; + // } + // Some(None) => {} + // Some(Some(connection_info)) => { + // break connection_info; + // } + // } }; let (mut sink, source) = connect(&mut crypt_state_receiver).await; @@ -114,10 +120,16 @@ async fn listen( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!( - phase_watcher.recv().await.unwrap(), - StatePhase::Disconnected - ) {} + loop { + phase_watcher.changed().await.unwrap(); + if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) { + break; + } + } + // while !matches!( + // phase_watcher.recv().await.unwrap(), + // StatePhase::Disconnected + // ) {} tx.send(true).unwrap(); }; @@ -203,10 +215,16 @@ async fn send_voice( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!( - phase_watcher.recv().await.unwrap(), - StatePhase::Disconnected - ) {} + loop { + phase_watcher.changed().await.unwrap(); + if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) { + break; + } + } + // while !matches!( + // phase_watcher.recv().await.unwrap(), + // StatePhase::Disconnected + // ) {} tx.send(true).unwrap(); }; @@ -261,11 +279,11 @@ pub async fn handle_pings( Box, )>, ) { - let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) + let udp_socket = Arc::new(UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await - .expect("Failed to bind UDP socket"); + .expect("Failed to bind UDP socket")); - let (mut receiver, mut sender) = udp_socket.split(); + // let (mut receiver, mut sender) = udp_socket.split(); let pending = Rc::new(Mutex::new(HashMap::new())); @@ -273,14 +291,14 @@ pub async fn handle_pings( while let Some((id, socket_addr, handle)) = ping_request_receiver.recv().await { let packet = PingPacket { id }; let packet: [u8; 12] = packet.into(); - sender.send_to(&packet, &socket_addr).await.unwrap(); + udp_socket.send_to(&packet, &socket_addr).await.unwrap(); pending.lock().unwrap().insert(id, handle); } }; let receiver_handle = async { let mut buf = vec![0; 24]; - while let Ok(read) = receiver.recv(&mut buf).await { + while let Ok(read) = udp_socket.recv(&mut buf).await { assert_eq!(read, 24); let packet = PongPacket::try_from(buf.as_slice()).unwrap(); diff --git a/mumd/src/state.rs b/mumd/src/state.rs index aba0931..85e5449 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -321,7 +321,7 @@ impl State { self.server = Some(server); self.phase_watcher .0 - .broadcast(StatePhase::Connecting) + .send(StatePhase::Connecting) .unwrap(); let socket_addr = match (host.as_ref(), port) @@ -335,7 +335,7 @@ impl State { } }; self.connection_info_sender - .broadcast(Some(ConnectionInfo::new( + .send(Some(ConnectionInfo::new( socket_addr, host, accept_invalid_cert, @@ -366,7 +366,7 @@ impl State { self.phase_watcher .0 - .broadcast(StatePhase::Disconnected) + .send(StatePhase::Disconnected) .unwrap(); self.audio.play_effect(NotificationEvents::ServerDisconnect); now!(Ok(None)) @@ -581,7 +581,7 @@ impl State { pub fn initialized(&self) { self.phase_watcher .0 - .broadcast(StatePhase::Connected) + .send(StatePhase::Connected) .unwrap(); self.audio.play_effect(NotificationEvents::ServerConnect); } -- cgit v1.2.1 From 4758e817b1372d1e7764f84981b1651436aac1fd Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Fri, 25 Dec 2020 15:27:04 +0100 Subject: solve compiler warnings --- mumd/src/audio/input.rs | 2 +- mumd/src/network/tcp.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 7405fdb..fe0d21f 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -7,7 +7,7 @@ use tokio::sync::{mpsc, watch}; pub fn callback( mut opus_encoder: opus::Encoder, - mut input_sender: mpsc::Sender, + input_sender: mpsc::Sender, sample_rate: u32, input_volume_receiver: watch::Receiver, opus_frame_size_blocks: u32, // blocks of 2.5ms diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 9024ef3..f79655a 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -222,7 +222,7 @@ async fn listen( } ControlPacket::ServerSync(msg) => { info!("Logged in"); - if let Some(mut sender) = crypt_state_sender.borrow_mut().take() { + if let Some(sender) = crypt_state_sender.borrow_mut().take() { let _ = sender .send( crypt_state -- cgit v1.2.1 From 01e0cecb52ed6f33ca2003441646afb538304c89 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Fri, 25 Dec 2020 22:10:52 +0100 Subject: remove dead code --- mumd/src/network/tcp.rs | 17 ----------------- mumd/src/network/udp.rs | 23 ++--------------------- 2 files changed, 2 insertions(+), 38 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index f79655a..c1d7a54 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -19,7 +19,6 @@ use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{self, Duration}; use tokio_native_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; -// use tokio_util::codec::decoder::Decoder; type TcpSender = SplitSink< Framed, ControlCodec>, @@ -56,15 +55,6 @@ pub async fn handle( } } return; - // match connection_info_receiver.changed().await { - // None => { - // return; - // } - // Some(None) => {} - // Some(Some(connection_info)) => { - // break connection_info; - // } - // } }; let (mut sink, stream) = connect( connection_info.socket_addr, @@ -171,9 +161,6 @@ async fn send_packets( sink.borrow_mut().send(packet).await.unwrap(); }, || async { - //clears queue of remaining packets - // while packet_receiver.borrow_mut().try_recv().is_ok() {} - sink.borrow_mut().close().await.unwrap(); }, phase_watcher, @@ -336,10 +323,6 @@ async fn run_until_disconnection( break; } } - // while !matches!( - // phase_watcher.recv().await.unwrap(), - // StatePhase::Disconnected - // ) {} tx.send(true).unwrap(); }; diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 55a47a3..d3ca1c9 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -36,15 +36,6 @@ pub async fn handle( } } return; - // match connection_info_receiver.recv().await { - // None => { - // return; - // } - // Some(None) => {} - // Some(Some(connection_info)) => { - // break connection_info; - // } - // } }; let (mut sink, source) = connect(&mut crypt_state_receiver).await; @@ -126,10 +117,6 @@ async fn listen( break; } } - // while !matches!( - // phase_watcher.recv().await.unwrap(), - // StatePhase::Disconnected - // ) {} tx.send(true).unwrap(); }; @@ -221,10 +208,6 @@ async fn send_voice( break; } } - // while !matches!( - // phase_watcher.recv().await.unwrap(), - // StatePhase::Disconnected - // ) {} tx.send(true).unwrap(); }; @@ -279,11 +262,9 @@ pub async fn handle_pings( Box, )>, ) { - let udp_socket = Arc::new(UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) + let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await - .expect("Failed to bind UDP socket")); - - // let (mut receiver, mut sender) = udp_socket.split(); + .expect("Failed to bind UDP socket"); let pending = Rc::new(Mutex::new(HashMap::new())); -- cgit v1.2.1 From 8335bd3b6ca5b87f23aa191722e04fe1c78acc79 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 26 Dec 2020 07:46:37 +0100 Subject: minor code cleanup --- mumd/src/network/tcp.rs | 2 +- mumd/src/network/udp.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index c1d7a54..3c96ee1 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -49,7 +49,7 @@ pub async fn handle( ) { loop { let connection_info = 'data: loop { - while let Ok(()) = connection_info_receiver.changed().await { + while connection_info_receiver.changed().await.is_ok() { if let Some(data) = connection_info_receiver.borrow().clone() { break 'data data; } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index d3ca1c9..b592a60 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -30,7 +30,7 @@ pub async fn handle( loop { let connection_info = 'data: loop { - while let Ok(()) = connection_info_receiver.changed().await { + while connection_info_receiver.changed().await.is_ok() { if let Some(data) = connection_info_receiver.borrow().clone() { break 'data data; } -- cgit v1.2.1