From f551de2bbc5e41c5cd76e36c2b0a6f10d9b4cddf Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 02:09:58 +0200 Subject: remove event_register_handler from tcp stack --- mumd/src/command.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) (limited to 'mumd/src/command.rs') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 1337dce..a62ddbd 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,8 +1,4 @@ -use crate::network::{ - ConnectionInfo, - tcp::{TcpEvent, TcpEventCallback}, - udp::PingRequest -}; +use crate::network::{ConnectionInfo, tcp::TcpEventQueue, udp::PingRequest}; use crate::state::{ExecutionContext, State}; use log::*; @@ -17,7 +13,7 @@ pub async fn handle( Command, oneshot::Sender>>, )>, - tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>, + tcp_event_queue: TcpEventQueue, ping_request_sender: mpsc::UnboundedSender, mut packet_sender: mpsc::UnboundedSender>, mut connection_info_sender: watch::Sender>, @@ -33,14 +29,14 @@ pub async fn handle( ExecutionContext::TcpEvent(event, generator) => { let (tx, rx) = oneshot::channel(); //TODO handle this error - let _ = tcp_event_register_sender.send(( - event, + tcp_event_queue.register_callback( + event, Box::new(move |e| { let response = generator(e); response_sender.send(response).unwrap(); tx.send(()).unwrap(); }), - )); + ); rx.await.unwrap(); } -- cgit v1.2.1 From 0b2efad3e9aa569c27d339a5eca17c96155b4f9d Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 02:11:53 +0200 Subject: remove await to parallellize better --- mumd/src/command.rs | 5 ----- 1 file changed, 5 deletions(-) (limited to 'mumd/src/command.rs') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index a62ddbd..d101104 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -27,18 +27,13 @@ pub async fn handle( drop(state); match event { ExecutionContext::TcpEvent(event, generator) => { - let (tx, rx) = oneshot::channel(); - //TODO handle this error tcp_event_queue.register_callback( event, Box::new(move |e| { let response = generator(e); response_sender.send(response).unwrap(); - tx.send(()).unwrap(); }), ); - - rx.await.unwrap(); } ExecutionContext::Now(generator) => { response_sender.send(generator()).unwrap(); -- cgit v1.2.1 From 5d05d292ddb7f8b28b71abd46930028b6e66dfde Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 02:27:27 +0200 Subject: add support for sending multiple responses --- mumd/src/command.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) (limited to 'mumd/src/command.rs') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index d101104..a1e8b21 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -5,13 +5,13 @@ use log::*; use mumble_protocol::{Serverbound, control::ControlPacket}; use mumlib::command::{Command, CommandResponse}; use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock}; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::sync::{mpsc, watch}; pub async fn handle( state: Arc>, mut command_receiver: mpsc::UnboundedReceiver<( Command, - oneshot::Sender>>, + mpsc::UnboundedSender>>, )>, tcp_event_queue: TcpEventQueue, ping_request_sender: mpsc::UnboundedSender, @@ -20,13 +20,13 @@ pub async fn handle( ) { debug!("Begin listening for commands"); let ping_count = AtomicU64::new(0); - while let Some((command, response_sender)) = command_receiver.recv().await { + while let Some((command, mut response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); let mut state = state.write().unwrap(); let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender); drop(state); match event { - ExecutionContext::TcpEvent(event, generator) => { + ExecutionContext::TcpEventCallback(event, generator) => { tcp_event_queue.register_callback( event, Box::new(move |e| { @@ -35,6 +35,14 @@ pub async fn handle( }), ); } + ExecutionContext::TcpEventSubscriber(event, mut handler) => { + tcp_event_queue.register_subscriber( + event, + Box::new(move |event| { + handler(event, &mut response_sender) + }), + ) + } ExecutionContext::Now(generator) => { response_sender.send(generator()).unwrap(); } -- cgit v1.2.1 From d6779ca065a896d329a7634d69a5f4270d829b73 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 21 May 2021 14:53:58 +0200 Subject: rework event system to allow multiple triggers --- mumd/src/command.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'mumd/src/command.rs') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index a1e8b21..f02ad19 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -22,9 +22,7 @@ pub async fn handle( let ping_count = AtomicU64::new(0); while let Some((command, mut response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); - let mut state = state.write().unwrap(); - let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender); - drop(state); + let event = crate::state::handle_command(Arc::clone(&state), command, &mut packet_sender, &mut connection_info_sender); match event { ExecutionContext::TcpEventCallback(event, generator) => { tcp_event_queue.register_callback( -- cgit v1.2.1 From aa710a3420ef4d834ee1df4099b25f3c83b9c31d Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Sat, 22 May 2021 01:27:17 +0200 Subject: rework command response mechanism --- mumd/src/command.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'mumd/src/command.rs') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index f02ad19..5255afa 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -29,7 +29,9 @@ pub async fn handle( event, Box::new(move |e| { let response = generator(e); - response_sender.send(response).unwrap(); + for response in response { + response_sender.send(response).unwrap(); + } }), ); } @@ -42,7 +44,10 @@ pub async fn handle( ) } ExecutionContext::Now(generator) => { - response_sender.send(generator()).unwrap(); + for response in generator() { + response_sender.send(response).unwrap(); + } + drop(response_sender); } ExecutionContext::Ping(generator, converter) => { let ret = generator(); -- cgit v1.2.1