From 19267cb7ac28ce51674baa9516ebb36074709d4f Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Tue, 18 May 2021 02:36:47 +0200 Subject: add ability for backend to keep track of messages --- mumd/src/network/tcp.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'mumd/src/network/tcp.rs') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 7606987..2a97b4a 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -269,12 +269,9 @@ async fn listen( } }; match packet { - ControlPacket::TextMessage(msg) => { - info!( - "Got message from user with session ID {}: {}", - msg.get_actor(), - msg.get_message() - ); + ControlPacket::TextMessage(mut msg) => { + let mut state = state.write().unwrap(); + state.register_message((msg.take_message(), msg.get_actor())); } ControlPacket::CryptSetup(msg) => { debug!("Crypt setup"); -- cgit v1.2.1 From 6a03656f963bb59c6a6a56e3933f05f9da850ca8 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Tue, 18 May 2021 02:47:16 +0200 Subject: add notification on message --- mumd/src/network/tcp.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'mumd/src/network/tcp.rs') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 2a97b4a..3696c58 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,4 +1,4 @@ -use crate::error::{ServerSendError, TcpError}; +use crate::{error::{ServerSendError, TcpError}, notifications}; use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; @@ -271,6 +271,12 @@ async fn listen( match packet { ControlPacket::TextMessage(mut msg) => { let mut state = state.write().unwrap(); + let user = state.server() + .and_then(|server| server.users().get(&msg.get_actor())) + .map(|user| user.name()); + if let Some(user) = user { + notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this + } state.register_message((msg.take_message(), msg.get_actor())); } ControlPacket::CryptSetup(msg) => { -- cgit v1.2.1 From e73a442171b364291ed5399bf4f86e8f9a3091ee Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 01:23:28 +0200 Subject: change Mutex to RwLock and de-async --- mumd/src/network/tcp.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'mumd/src/network/tcp.rs') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 3696c58..18a053b 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -54,22 +54,22 @@ impl<'a> From<&TcpEventData<'a>> for TcpEvent { #[derive(Clone)] struct TcpEventQueue { - handlers: Arc>>>, + handlers: Arc>>>, } impl TcpEventQueue { fn new() -> Self { Self { - handlers: Arc::new(Mutex::new(HashMap::new())), + handlers: Arc::new(RwLock::new(HashMap::new())), } } - async fn register(&self, at: TcpEvent, callback: TcpEventCallback) { - self.handlers.lock().await.entry(at).or_default().push(callback); + fn register(&self, at: TcpEvent, callback: TcpEventCallback) { + self.handlers.write().unwrap().entry(at).or_default().push(callback); } - async fn resolve<'a>(&self, data: TcpEventData<'a>) { - if let Some(vec) = self.handlers.lock().await.get_mut(&TcpEvent::from(&data)) { + fn resolve<'a>(&self, data: TcpEventData<'a>) { + if let Some(vec) = self.handlers.write().unwrap().get_mut(&TcpEvent::from(&data)) { let old = std::mem::take(vec); for handler in old { handler(data.clone()); @@ -143,7 +143,7 @@ pub async fn handle( phase_watcher, ).await.unwrap_or(Ok(()))?; - event_queue.resolve(TcpEventData::Disconnected).await; + event_queue.resolve(TcpEventData::Disconnected); debug!("Fully disconnected TCP stream, waiting for new connection info"); } @@ -305,7 +305,7 @@ async fn listen( ) .await; } - event_queue.resolve(TcpEventData::Connected(Ok(&msg))).await; + event_queue.resolve(TcpEventData::Connected(Ok(&msg))); let mut state = state.write().unwrap(); let server = state.server_mut().unwrap(); server.parse_server_sync(*msg); @@ -322,7 +322,7 @@ async fn listen( debug!("Login rejected: {:?}", msg); match msg.get_field_type() { msgs::Reject_RejectType::WrongServerPW => { - event_queue.resolve(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword))).await; + event_queue.resolve(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword))); } ty => { warn!("Unhandled reject type: {:?}", ty); @@ -387,6 +387,6 @@ async fn register_events( ) { loop { let (event, handler) = tcp_event_register_receiver.recv().await.unwrap(); - event_queue.register(event, handler).await; + event_queue.register(event, handler); } } -- cgit v1.2.1 From cf81a1141cdc6a6db842d992d065eba74829e0c7 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 01:45:07 +0200 Subject: add re-runnable callbacks to the event system --- mumd/src/network/tcp.rs | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) (limited to 'mumd/src/network/tcp.rs') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 18a053b..8f34cd8 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -30,17 +30,20 @@ type TcpReceiver = SplitStream, ControlCodec>>; pub(crate) type TcpEventCallback = Box; +pub(crate) type TcpEventSubscriber = Box bool>; //the bool indicates if it should be kept or not #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub enum TcpEvent { Connected, //fires when the client has connected to a server Disconnected, //fires when the client has disconnected from a server + TextMessage, //fires when a text message comes in } #[derive(Clone)] pub enum TcpEventData<'a> { Connected(Result<&'a msgs::ServerSync, mumlib::Error>), Disconnected, + TextMessage(&'a msgs::TextMessage), } impl<'a> From<&TcpEventData<'a>> for TcpEvent { @@ -48,33 +51,53 @@ impl<'a> From<&TcpEventData<'a>> for TcpEvent { match t { TcpEventData::Connected(_) => TcpEvent::Connected, TcpEventData::Disconnected => TcpEvent::Disconnected, + TcpEventData::TextMessage(_) => TcpEvent::TextMessage, } } } #[derive(Clone)] struct TcpEventQueue { - handlers: Arc>>>, + callbacks: Arc>>>, + subscribers: Arc>>>, } impl TcpEventQueue { + /// Creates a new `TcpEventQueue`. fn new() -> Self { Self { - handlers: Arc::new(RwLock::new(HashMap::new())), + callbacks: Arc::new(RwLock::new(HashMap::new())), + subscribers: Arc::new(RwLock::new(HashMap::new())), } } - fn register(&self, at: TcpEvent, callback: TcpEventCallback) { - self.handlers.write().unwrap().entry(at).or_default().push(callback); + /// Registers a new callback to be triggered when an event is fired. + fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { + self.callbacks.write().unwrap().entry(at).or_default().push(callback); } + /// Registers a new callback to be triggered when an event is fired. + fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { + self.subscribers.write().unwrap().entry(at).or_default().push(callback); + } + + /// Fires all callbacks related to a specific TCP event and removes them from the event queue. + /// Also calls all event subscribers, but keeps them in the queue fn resolve<'a>(&self, data: TcpEventData<'a>) { - if let Some(vec) = self.handlers.write().unwrap().get_mut(&TcpEvent::from(&data)) { + if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) { let old = std::mem::take(vec); for handler in old { handler(data.clone()); } } + if let Some(vec) = self.subscribers.write().unwrap().get_mut(&TcpEvent::from(&data)) { + let old = std::mem::take(vec); + for mut e in old { + if e(data.clone()) { + vec.push(e) + } + } + } } } @@ -269,7 +292,7 @@ async fn listen( } }; match packet { - ControlPacket::TextMessage(mut msg) => { + ControlPacket::TextMessage(msg) => { let mut state = state.write().unwrap(); let user = state.server() .and_then(|server| server.users().get(&msg.get_actor())) @@ -277,7 +300,7 @@ async fn listen( if let Some(user) = user { notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this } - state.register_message((msg.take_message(), msg.get_actor())); + state.register_message((msg.get_message().to_owned(), msg.get_actor())); } ControlPacket::CryptSetup(msg) => { debug!("Crypt setup"); @@ -387,6 +410,6 @@ async fn register_events( ) { loop { let (event, handler) = tcp_event_register_receiver.recv().await.unwrap(); - event_queue.register(event, handler); + event_queue.register_callback(event, handler); } } -- cgit v1.2.1 From f551de2bbc5e41c5cd76e36c2b0a6f10d9b4cddf Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 02:09:58 +0200 Subject: remove event_register_handler from tcp stack --- mumd/src/network/tcp.rs | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) (limited to 'mumd/src/network/tcp.rs') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 8f34cd8..b6e939a 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -57,14 +57,14 @@ impl<'a> From<&TcpEventData<'a>> for TcpEvent { } #[derive(Clone)] -struct TcpEventQueue { +pub struct TcpEventQueue { callbacks: Arc>>>, subscribers: Arc>>>, } impl TcpEventQueue { /// Creates a new `TcpEventQueue`. - fn new() -> Self { + pub fn new() -> Self { Self { callbacks: Arc::new(RwLock::new(HashMap::new())), subscribers: Arc::new(RwLock::new(HashMap::new())), @@ -72,18 +72,18 @@ impl TcpEventQueue { } /// Registers a new callback to be triggered when an event is fired. - fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { + pub fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { self.callbacks.write().unwrap().entry(at).or_default().push(callback); } /// Registers a new callback to be triggered when an event is fired. - fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { + pub fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { self.subscribers.write().unwrap().entry(at).or_default().push(callback); } /// Fires all callbacks related to a specific TCP event and removes them from the event queue. /// Also calls all event subscribers, but keeps them in the queue - fn resolve<'a>(&self, data: TcpEventData<'a>) { + pub fn resolve<'a>(&self, data: TcpEventData<'a>) { if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) { let old = std::mem::take(vec); for handler in old { @@ -107,7 +107,7 @@ pub async fn handle( crypt_state_sender: mpsc::Sender, packet_sender: mpsc::UnboundedSender>, mut packet_receiver: mpsc::UnboundedReceiver>, - mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, + event_queue: TcpEventQueue, ) -> Result<(), TcpError> { loop { let connection_info = 'data: loop { @@ -137,7 +137,6 @@ pub async fn handle( (state_lock.phase_receiver(), state_lock.audio_input().receiver()) }; - let event_queue = TcpEventQueue::new(); info!("Logging in..."); @@ -160,7 +159,6 @@ pub async fn handle( phase_watcher_inner, ).fuse() => r, r = send_packets(sink, &mut packet_receiver).fuse() => r, - _ = register_events(&mut tcp_event_register_receiver, event_queue.clone()).fuse() => Ok(()), } }, phase_watcher, @@ -403,13 +401,3 @@ async fn listen( } Ok(()) } - -async fn register_events( - tcp_event_register_receiver: &mut mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, - event_queue: TcpEventQueue, -) { - loop { - let (event, handler) = tcp_event_register_receiver.recv().await.unwrap(); - event_queue.register_callback(event, handler); - } -} -- cgit v1.2.1 From 6519ad9c82549817d797a5d9d463a418eb35273f Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 21 May 2021 15:21:57 +0200 Subject: fix deadlock and change message registering properly --- mumd/src/network/tcp.rs | 2 ++ 1 file changed, 2 insertions(+) (limited to 'mumd/src/network/tcp.rs') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index b6e939a..b513797 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -299,6 +299,8 @@ async fn listen( notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this } state.register_message((msg.get_message().to_owned(), msg.get_actor())); + drop(state); + event_queue.resolve(TcpEventData::TextMessage(&*msg)); } ControlPacket::CryptSetup(msg) => { debug!("Crypt setup"); -- cgit v1.2.1