diff options
Diffstat (limited to 'mumd/src')
| -rw-r--r-- | mumd/src/network/tcp.rs | 39 |
1 files changed, 31 insertions, 8 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 18a053b..8f34cd8 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -30,17 +30,20 @@ type TcpReceiver = SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>; pub(crate) type TcpEventCallback = Box<dyn FnOnce(TcpEventData)>; +pub(crate) type TcpEventSubscriber = Box<dyn FnMut(TcpEventData) -> bool>; //the bool indicates if it should be kept or not #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub enum TcpEvent { Connected, //fires when the client has connected to a server Disconnected, //fires when the client has disconnected from a server + TextMessage, //fires when a text message comes in } #[derive(Clone)] pub enum TcpEventData<'a> { Connected(Result<&'a msgs::ServerSync, mumlib::Error>), Disconnected, + TextMessage(&'a msgs::TextMessage), } impl<'a> From<&TcpEventData<'a>> for TcpEvent { @@ -48,33 +51,53 @@ impl<'a> From<&TcpEventData<'a>> for TcpEvent { match t { TcpEventData::Connected(_) => TcpEvent::Connected, TcpEventData::Disconnected => TcpEvent::Disconnected, + TcpEventData::TextMessage(_) => TcpEvent::TextMessage, } } } #[derive(Clone)] struct TcpEventQueue { - handlers: Arc<RwLock<HashMap<TcpEvent, Vec<TcpEventCallback>>>>, + callbacks: Arc<RwLock<HashMap<TcpEvent, Vec<TcpEventCallback>>>>, + subscribers: Arc<RwLock<HashMap<TcpEvent, Vec<TcpEventSubscriber>>>>, } impl TcpEventQueue { + /// Creates a new `TcpEventQueue`. fn new() -> Self { Self { - handlers: Arc::new(RwLock::new(HashMap::new())), + callbacks: Arc::new(RwLock::new(HashMap::new())), + subscribers: Arc::new(RwLock::new(HashMap::new())), } } - fn register(&self, at: TcpEvent, callback: TcpEventCallback) { - self.handlers.write().unwrap().entry(at).or_default().push(callback); + /// Registers a new callback to be triggered when an event is fired. + fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { + self.callbacks.write().unwrap().entry(at).or_default().push(callback); } + /// Registers a new callback to be triggered when an event is fired. + fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { + self.subscribers.write().unwrap().entry(at).or_default().push(callback); + } + + /// Fires all callbacks related to a specific TCP event and removes them from the event queue. + /// Also calls all event subscribers, but keeps them in the queue fn resolve<'a>(&self, data: TcpEventData<'a>) { - if let Some(vec) = self.handlers.write().unwrap().get_mut(&TcpEvent::from(&data)) { + if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) { let old = std::mem::take(vec); for handler in old { handler(data.clone()); } } + if let Some(vec) = self.subscribers.write().unwrap().get_mut(&TcpEvent::from(&data)) { + let old = std::mem::take(vec); + for mut e in old { + if e(data.clone()) { + vec.push(e) + } + } + } } } @@ -269,7 +292,7 @@ async fn listen( } }; match packet { - ControlPacket::TextMessage(mut msg) => { + ControlPacket::TextMessage(msg) => { let mut state = state.write().unwrap(); let user = state.server() .and_then(|server| server.users().get(&msg.get_actor())) @@ -277,7 +300,7 @@ async fn listen( if let Some(user) = user { notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this } - state.register_message((msg.take_message(), msg.get_actor())); + state.register_message((msg.get_message().to_owned(), msg.get_actor())); } ControlPacket::CryptSetup(msg) => { debug!("Crypt setup"); @@ -387,6 +410,6 @@ async fn register_events( ) { loop { let (event, handler) = tcp_event_register_receiver.recv().await.unwrap(); - event_queue.register(event, handler); + event_queue.register_callback(event, handler); } } |
