From c5af1b237027031be310951c36f23f0a0bc760b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Mon, 29 Mar 2021 18:10:41 +0200 Subject: tcp event queue --- mumd/src/network/tcp.rs | 76 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 25 deletions(-) diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 8ce49cb..cd178f8 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -36,9 +36,52 @@ pub enum TcpEvent { Disconnected, //fires when the client has disconnected from a server } +#[derive(Clone)] pub enum TcpEventData<'a> { Connected(&'a msgs::ServerSync), - Disconnected, + _Disconnected, +} + +impl<'a> From<&TcpEventData<'a>> for TcpEvent { + fn from(t: &TcpEventData) -> Self { + match t { + TcpEventData::Connected(_) => TcpEvent::Connected, + TcpEventData::_Disconnected => TcpEvent::Disconnected, + } + } +} + +struct TcpEventQueue { + handlers: Arc>>>, +} + +impl TcpEventQueue { + fn new() -> Self { + Self { + handlers: Arc::new(Mutex::new(HashMap::new())), + } + } + + async fn register(&mut self, at: TcpEvent, callback: TcpEventCallback) { + self.handlers.lock().await.entry(at).or_default().push(callback); + } + + async fn send<'a>(&mut self, data: TcpEventData<'a>) { + if let Some(vec) = self.handlers.lock().await.get_mut(&TcpEvent::from(&data)) { + let old = std::mem::take(vec); + for handler in old { + handler(data.clone()); + } + } + } +} + +impl Clone for TcpEventQueue { + fn clone(&self) -> Self { + Self { + handlers: Arc::clone(&self.handlers), + } + } } pub async fn handle( @@ -73,7 +116,7 @@ pub async fn handle( let phase_watcher = state_lock.phase_receiver(); let input_receiver = state_lock.audio().input_receiver(); drop(state_lock); - let event_queue = Arc::new(Mutex::new(HashMap::new())); + let event_queue = TcpEventQueue::new(); info!("Logging in..."); @@ -85,7 +128,7 @@ pub async fn handle( Arc::clone(&state), stream, crypt_state_sender.clone(), - Arc::clone(&event_queue), + event_queue.clone(), ), send_voice( packet_sender.clone(), @@ -93,18 +136,11 @@ pub async fn handle( phase_watcher.clone(), ), send_packets(sink, &mut packet_receiver), - register_events(&mut tcp_event_register_receiver, Arc::clone(&event_queue)), + register_events(&mut tcp_event_register_receiver, event_queue.clone()), ).map(|_| ()), phase_watcher, ).await; - if let Some(vec) = event_queue.lock().await.get_mut(&TcpEvent::Disconnected) { - let old = std::mem::take(vec); - for handler in old { - handler(TcpEventData::Disconnected); - } - } - debug!("Fully disconnected TCP stream, waiting for new connection info"); } } @@ -209,7 +245,7 @@ async fn listen( state: Arc>, mut stream: TcpReceiver, crypt_state_sender: mpsc::Sender, - event_queue: Arc>>>, + mut event_queue: TcpEventQueue, ) { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); @@ -250,12 +286,7 @@ async fn listen( ) .await; } - if let Some(vec) = event_queue.lock().await.get_mut(&TcpEvent::Connected) { - let old = std::mem::take(vec); - for handler in old { - handler(TcpEventData::Connected(&msg)); - } - } + event_queue.send(TcpEventData::Connected(&msg)).await; let mut state = state.lock().await; let server = state.server_mut().unwrap(); server.parse_server_sync(*msg); @@ -324,15 +355,10 @@ async fn listen( async fn register_events( tcp_event_register_receiver: &mut mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, - event_data: Arc>>>, + mut event_queue: TcpEventQueue, ) { loop { let (event, handler) = tcp_event_register_receiver.recv().await.unwrap(); - event_data - .lock() - .await - .entry(event) - .or_default() - .push(handler); + event_queue.register(event, handler).await; } } -- cgit v1.2.1 From 65016caa8d565942086540edbee95b8af1e75c8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Mon, 29 Mar 2021 18:34:28 +0200 Subject: tcp event connected contains result --- mumd/src/network/tcp.rs | 4 ++-- mumd/src/state.rs | 20 +++++++++++--------- mumlib/src/error.rs | 4 ++-- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index cd178f8..1738b15 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -38,7 +38,7 @@ pub enum TcpEvent { #[derive(Clone)] pub enum TcpEventData<'a> { - Connected(&'a msgs::ServerSync), + Connected(Result<&'a msgs::ServerSync, mumlib::error::Error>), _Disconnected, } @@ -286,7 +286,7 @@ async fn listen( ) .await; } - event_queue.send(TcpEventData::Connected(&msg)).await; + event_queue.send(TcpEventData::Connected(Ok(&msg))).await; let mut state = state.lock().await; let server = state.server_mut().unwrap(); server.parse_server_sync(*msg); diff --git a/mumd/src/state.rs b/mumd/src/state.rs index b279dfd..f725276 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -342,16 +342,18 @@ impl State { accept_invalid_cert, ))) .unwrap(); - at!(TcpEvent::Connected, |e| { + at!(TcpEvent::Connected, |res| { //runs the closure when the client is connected - if let TcpEventData::Connected(msg) = e { - Ok(Some(CommandResponse::ServerConnect { - welcome_message: if msg.has_welcome_text() { - Some(msg.get_welcome_text().to_string()) - } else { - None - }, - })) + if let TcpEventData::Connected(res) = res { + res.map(|msg| { + Some(CommandResponse::ServerConnect { + welcome_message: if msg.has_welcome_text() { + Some(msg.get_welcome_text().to_string()) + } else { + None + }, + }) + }) } else { unreachable!("callback should be provided with a TcpEventData::Connected"); } diff --git a/mumlib/src/error.rs b/mumlib/src/error.rs index 8c66068..820d5f3 100644 --- a/mumlib/src/error.rs +++ b/mumlib/src/error.rs @@ -3,7 +3,7 @@ use std::fmt; pub type Result = std::result::Result; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum Error { DisconnectedError, AlreadyConnectedError, @@ -26,7 +26,7 @@ impl fmt::Display for Error { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub enum ChannelIdentifierError { Invalid, Ambiguous, -- cgit v1.2.1 From 5af72d30bb1b34cbde1c3ba5e73b7c694461ae51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Mon, 29 Mar 2021 21:38:12 +0200 Subject: report invalid server password --- mumd/src/network/tcp.rs | 12 ++++++++++-- mumlib/src/error.rs | 2 ++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 1738b15..29749f1 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -8,6 +8,7 @@ use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPa use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; use mumble_protocol::{Clientbound, Serverbound}; +use mumlib::error::Error; use std::collections::HashMap; use std::convert::{Into, TryInto}; use std::net::SocketAddr; @@ -38,7 +39,7 @@ pub enum TcpEvent { #[derive(Clone)] pub enum TcpEventData<'a> { - Connected(Result<&'a msgs::ServerSync, mumlib::error::Error>), + Connected(Result<&'a msgs::ServerSync, Error>), _Disconnected, } @@ -300,7 +301,14 @@ async fn listen( state.initialized(); } ControlPacket::Reject(msg) => { - warn!("Login rejected: {:?}", msg); + match msg.get_field_type() { + msgs::Reject_RejectType::WrongServerPW => { + event_queue.send(TcpEventData::Connected(Err(Error::InvalidServerPassword))).await; + } + _ => { + warn!("Login rejected: {:?}", msg); + } + } } ControlPacket::UserState(msg) => { state.lock().await.parse_user_state(*msg); diff --git a/mumlib/src/error.rs b/mumlib/src/error.rs index 820d5f3..0259c28 100644 --- a/mumlib/src/error.rs +++ b/mumlib/src/error.rs @@ -10,6 +10,7 @@ pub enum Error { ChannelIdentifierError(String, ChannelIdentifierError), InvalidServerAddrError(String, u16), InvalidUsernameError(String), + InvalidServerPassword, } impl fmt::Display for Error { @@ -22,6 +23,7 @@ impl fmt::Display for Error { write!(f, "Invalid server address: {}: {}", addr, port) } Error::InvalidUsernameError(username) => write!(f, "Invalid username: {}", username), + Error::InvalidServerPassword => write!(f, "Invalid server password") } } } -- cgit v1.2.1 From 6e71b1fee95e5f320bbc27e4148ff48e0d390073 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Mon, 29 Mar 2021 21:41:38 +0200 Subject: remove error suffix on error variants --- mumd/src/state.rs | 26 +++++++++++++------------- mumlib/src/error.rs | 16 ++++++++-------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/mumd/src/state.rs b/mumd/src/state.rs index f725276..e666bcf 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -88,7 +88,7 @@ impl State { match command { Command::ChannelJoin { channel_identifier } => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::DisconnectedError)); + return now!(Err(Error::Disconnected)); } let channels = self.server().unwrap().channels(); @@ -138,7 +138,7 @@ impl State { } Command::ChannelList => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::DisconnectedError)); + return now!(Err(Error::Disconnected)); } let list = channel::into_channel( self.server.as_ref().unwrap().channels(), @@ -152,7 +152,7 @@ impl State { } Command::DeafenSelf(toggle) => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::DisconnectedError)); + return now!(Err(Error::Disconnected)); } let server = self.server().unwrap(); @@ -210,7 +210,7 @@ impl State { } Command::MuteOther(string, toggle) => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::DisconnectedError)); + return now!(Err(Error::Disconnected)); } let id = self @@ -222,7 +222,7 @@ impl State { let (id, user) = match id { Some(id) => (*id.0, id.1), - None => return now!(Err(Error::InvalidUsernameError(string))), + None => return now!(Err(Error::InvalidUsername(string))), }; let action = match toggle { @@ -245,7 +245,7 @@ impl State { } Command::MuteSelf(toggle) => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::DisconnectedError)); + return now!(Err(Error::Disconnected)); } let server = self.server().unwrap(); @@ -313,7 +313,7 @@ impl State { accept_invalid_cert, } => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Disconnected) { - return now!(Err(Error::AlreadyConnectedError)); + return now!(Err(Error::AlreadyConnected)); } let mut server = Server::new(); *server.username_mut() = Some(username); @@ -332,7 +332,7 @@ impl State { Ok(Some(v)) => v, _ => { warn!("Error parsing server addr"); - return now!(Err(Error::InvalidServerAddrError(host, port))); + return now!(Err(Error::InvalidServerAddr(host, port))); } }; connection_info_sender @@ -361,7 +361,7 @@ impl State { } Command::ServerDisconnect => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::DisconnectedError)); + return now!(Err(Error::Disconnected)); } self.server = None; @@ -381,7 +381,7 @@ impl State { .map(|mut e| e.next()) { Ok(Some(v)) => Ok(v), - _ => Err(mumlib::error::Error::InvalidServerAddrError(host, port)), + _ => Err(mumlib::error::Error::InvalidServerAddr(host, port)), } }), Box::new(move |pong| { @@ -395,7 +395,7 @@ impl State { ), Command::Status => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::DisconnectedError)); + return now!(Err(Error::Disconnected)); } let state = self.server.as_ref().unwrap().into(); now!(Ok(Some(CommandResponse::Status { @@ -404,7 +404,7 @@ impl State { } Command::UserVolumeSet(string, volume) => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::DisconnectedError)); + return now!(Err(Error::Disconnected)); } let user_id = match self .server() @@ -414,7 +414,7 @@ impl State { .find(|e| e.1.name() == string) .map(|e| *e.0) { - None => return now!(Err(Error::InvalidUsernameError(string))), + None => return now!(Err(Error::InvalidUsername(string))), Some(v) => v, }; diff --git a/mumlib/src/error.rs b/mumlib/src/error.rs index 0259c28..f6a02a7 100644 --- a/mumlib/src/error.rs +++ b/mumlib/src/error.rs @@ -5,24 +5,24 @@ pub type Result = std::result::Result; #[derive(Clone, Debug, Serialize, Deserialize)] pub enum Error { - DisconnectedError, - AlreadyConnectedError, + Disconnected, + AlreadyConnected, ChannelIdentifierError(String, ChannelIdentifierError), - InvalidServerAddrError(String, u16), - InvalidUsernameError(String), + InvalidServerAddr(String, u16), + InvalidUsername(String), InvalidServerPassword, } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Error::DisconnectedError => write!(f, "Not connected to a server"), - Error::AlreadyConnectedError => write!(f, "Already connected to a server"), + Error::Disconnected=> write!(f, "Not connected to a server"), + Error::AlreadyConnected=> write!(f, "Already connected to a server"), Error::ChannelIdentifierError(id, kind) => write!(f, "{}: {}", kind, id), - Error::InvalidServerAddrError(addr, port) => { + Error::InvalidServerAddr(addr, port) => { write!(f, "Invalid server address: {}: {}", addr, port) } - Error::InvalidUsernameError(username) => write!(f, "Invalid username: {}", username), + Error::InvalidUsername(username) => write!(f, "Invalid username: {}", username), Error::InvalidServerPassword => write!(f, "Invalid server password") } } -- cgit v1.2.1 From 1fe3a3aad1dc346697b8815d2dba15f9e9e3265e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 30 Mar 2021 10:08:34 +0200 Subject: handle faulty tcp stream --- mumd/src/network/tcp.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 29749f1..a1b4597 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -252,8 +252,21 @@ async fn listen( let mut crypt_state_sender = Some(crypt_state_sender); loop { - let packet = stream.next().await.unwrap(); - match packet.unwrap() { + let packet = match stream.next().await { + Some(Ok(packet)) => packet, + Some(Err(e)) => { + error!("TCP error: {:?}", e); + continue; //TODO Break here? Maybe look at the error and handle it + } + None => { + // We end up here if the login was rejected. We probably want + // to exit before that. + warn!("TCP stream gone"); + state.lock().await.broadcast_phase(StatePhase::Disconnected); + break; + } + }; + match packet { ControlPacket::TextMessage(msg) => { info!( "Got message from user with session ID {}: {}", -- cgit v1.2.1 From 7fa6078759b095a0ffb5ede05a933249833c9e6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 30 Mar 2021 10:09:14 +0200 Subject: tcp reject: more descriptive logging --- mumd/src/network/tcp.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index a1b4597..fb5869d 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -314,12 +314,13 @@ async fn listen( state.initialized(); } ControlPacket::Reject(msg) => { + debug!("Login rejected: {:?}", msg); match msg.get_field_type() { msgs::Reject_RejectType::WrongServerPW => { event_queue.send(TcpEventData::Connected(Err(Error::InvalidServerPassword))).await; } - _ => { - warn!("Login rejected: {:?}", msg); + ty => { + warn!("Unhandled reject type: {:?}", ty); } } } -- cgit v1.2.1 From f81e8a403d6317803a23ba0d43c862e0cfea52ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 30 Mar 2021 10:16:15 +0200 Subject: mumlib: re-export error --- mumd/src/main.rs | 2 +- mumd/src/network/tcp.rs | 5 ++--- mumd/src/state.rs | 5 +++-- mumlib/src/lib.rs | 2 ++ 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 26e8d49..276e2ce 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -34,7 +34,7 @@ async fn main() { bincode::serialize_into((&mut command).writer(), &Command::Ping).unwrap(); if let Ok(()) = writer.send(command.freeze()).await { if let Some(Ok(buf)) = reader.next().await { - if let Ok(Ok::, mumlib::error::Error>(Some(CommandResponse::Pong))) = bincode::deserialize(&buf) { + if let Ok(Ok::, mumlib::Error>(Some(CommandResponse::Pong))) = bincode::deserialize(&buf) { error!("Another instance of mumd is already running"); return; } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index fb5869d..bf16110 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -8,7 +8,6 @@ use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPa use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; use mumble_protocol::{Clientbound, Serverbound}; -use mumlib::error::Error; use std::collections::HashMap; use std::convert::{Into, TryInto}; use std::net::SocketAddr; @@ -39,7 +38,7 @@ pub enum TcpEvent { #[derive(Clone)] pub enum TcpEventData<'a> { - Connected(Result<&'a msgs::ServerSync, Error>), + Connected(Result<&'a msgs::ServerSync, mumlib::Error>), _Disconnected, } @@ -317,7 +316,7 @@ async fn listen( debug!("Login rejected: {:?}", msg); match msg.get_field_type() { msgs::Reject_RejectType::WrongServerPW => { - event_queue.send(TcpEventData::Connected(Err(Error::InvalidServerPassword))).await; + event_queue.send(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword))).await; } ty => { warn!("Unhandled reject type: {:?}", ty); diff --git a/mumd/src/state.rs b/mumd/src/state.rs index e666bcf..20fe660 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -15,7 +15,8 @@ use mumble_protocol::ping::PongPacket; use mumble_protocol::voice::Serverbound; use mumlib::command::{Command, CommandResponse}; use mumlib::config::Config; -use mumlib::error::{ChannelIdentifierError, Error}; +use mumlib::error::ChannelIdentifierError; +use mumlib::Error; use crate::state::user::UserDiff; use std::net::{SocketAddr, ToSocketAddrs}; use tokio::sync::{mpsc, watch}; @@ -381,7 +382,7 @@ impl State { .map(|mut e| e.next()) { Ok(Some(v)) => Ok(v), - _ => Err(mumlib::error::Error::InvalidServerAddr(host, port)), + _ => Err(Error::InvalidServerAddr(host, port)), } }), Box::new(move |pong| { diff --git a/mumlib/src/lib.rs b/mumlib/src/lib.rs index bccf073..36edc10 100644 --- a/mumlib/src/lib.rs +++ b/mumlib/src/lib.rs @@ -3,6 +3,8 @@ pub mod config; pub mod error; pub mod state; +pub use error::Error; + use colored::*; use log::*; -- cgit v1.2.1 From a08f0948083cc31e08178819ada6e70a537bcfea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 30 Mar 2021 10:43:48 +0200 Subject: fix indentation --- mumd/src/network/tcp.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index bf16110..3e6e658 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -190,12 +190,12 @@ async fn send_pings( delay_seconds: u64, ) { let mut interval = time::interval(Duration::from_secs(delay_seconds)); - loop { - interval.tick().await; - trace!("Sending TCP ping"); - let msg = msgs::Ping::new(); - packet_sender.send(msg.into()).unwrap(); - } + loop { + interval.tick().await; + trace!("Sending TCP ping"); + let msg = msgs::Ping::new(); + packet_sender.send(msg.into()).unwrap(); + } } async fn send_packets( -- cgit v1.2.1 From b52068eade50758673e29c79e7cb8be3f1b4151f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 30 Mar 2021 13:44:55 +0200 Subject: code review --- mumd/src/network/tcp.rs | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 3e6e658..47b1c20 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -39,18 +39,19 @@ pub enum TcpEvent { #[derive(Clone)] pub enum TcpEventData<'a> { Connected(Result<&'a msgs::ServerSync, mumlib::Error>), - _Disconnected, + Disconnected, } impl<'a> From<&TcpEventData<'a>> for TcpEvent { fn from(t: &TcpEventData) -> Self { match t { TcpEventData::Connected(_) => TcpEvent::Connected, - TcpEventData::_Disconnected => TcpEvent::Disconnected, + TcpEventData::Disconnected => TcpEvent::Disconnected, } } } +#[derive(Clone)] struct TcpEventQueue { handlers: Arc>>>, } @@ -62,11 +63,11 @@ impl TcpEventQueue { } } - async fn register(&mut self, at: TcpEvent, callback: TcpEventCallback) { + async fn register(&self, at: TcpEvent, callback: TcpEventCallback) { self.handlers.lock().await.entry(at).or_default().push(callback); } - async fn send<'a>(&mut self, data: TcpEventData<'a>) { + async fn resolve<'a>(&self, data: TcpEventData<'a>) { if let Some(vec) = self.handlers.lock().await.get_mut(&TcpEvent::from(&data)) { let old = std::mem::take(vec); for handler in old { @@ -76,14 +77,6 @@ impl TcpEventQueue { } } -impl Clone for TcpEventQueue { - fn clone(&self) -> Self { - Self { - handlers: Arc::clone(&self.handlers), - } - } -} - pub async fn handle( state: Arc>, mut connection_info_receiver: watch::Receiver>, @@ -141,6 +134,8 @@ pub async fn handle( phase_watcher, ).await; + event_queue.resolve(TcpEventData::Disconnected).await; + debug!("Fully disconnected TCP stream, waiting for new connection info"); } } @@ -245,7 +240,7 @@ async fn listen( state: Arc>, mut stream: TcpReceiver, crypt_state_sender: mpsc::Sender, - mut event_queue: TcpEventQueue, + event_queue: TcpEventQueue, ) { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); @@ -299,7 +294,7 @@ async fn listen( ) .await; } - event_queue.send(TcpEventData::Connected(Ok(&msg))).await; + event_queue.resolve(TcpEventData::Connected(Ok(&msg))).await; let mut state = state.lock().await; let server = state.server_mut().unwrap(); server.parse_server_sync(*msg); @@ -316,7 +311,7 @@ async fn listen( debug!("Login rejected: {:?}", msg); match msg.get_field_type() { msgs::Reject_RejectType::WrongServerPW => { - event_queue.send(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword))).await; + event_queue.resolve(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword))).await; } ty => { warn!("Unhandled reject type: {:?}", ty); @@ -376,7 +371,7 @@ async fn listen( async fn register_events( tcp_event_register_receiver: &mut mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, - mut event_queue: TcpEventQueue, + event_queue: TcpEventQueue, ) { loop { let (event, handler) = tcp_event_register_receiver.recv().await.unwrap(); -- cgit v1.2.1