diff options
| -rw-r--r-- | mumctl/src/main.rs | 2 | ||||
| -rw-r--r-- | mumd/src/client.rs | 9 | ||||
| -rw-r--r-- | mumd/src/command.rs | 14 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 24 | ||||
| -rw-r--r-- | mumd/src/state.rs | 2 | ||||
| -rw-r--r-- | mumlib/src/command.rs | 4 |
6 files changed, 20 insertions, 35 deletions
diff --git a/mumctl/src/main.rs b/mumctl/src/main.rs index 5d3f332..4e1249e 100644 --- a/mumctl/src/main.rs +++ b/mumctl/src/main.rs @@ -373,7 +373,7 @@ fn match_opt() -> Result<(), Error> { send_command(MumCommand::DeafenSelf(Some(false)))??; } Command::Messages => { - match send_command(MumCommand::PastMessages)?? { + match send_command(MumCommand::PastMessages { block: false })?? { Some(CommandResponse::PastMessages { messages }) => { for (msg, sender) in messages { println!("{}: {}", sender, msg); diff --git a/mumd/src/client.rs b/mumd/src/client.rs index 9c2c2a0..3c491da 100644 --- a/mumd/src/client.rs +++ b/mumd/src/client.rs @@ -1,4 +1,4 @@ -use crate::command; +use crate::{command, network::tcp::TcpEventQueue}; use crate::error::ClientError; use crate::network::{tcp, udp, ConnectionInfo}; use crate::state::State; @@ -24,8 +24,7 @@ pub async fn handle( mpsc::unbounded_channel::<ControlPacket<Serverbound>>(); let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel(); - let (response_sender, response_receiver) = - mpsc::unbounded_channel(); + let event_queue = TcpEventQueue::new(); let state = Arc::new(RwLock::new(state)); @@ -36,7 +35,7 @@ pub async fn handle( crypt_state_sender, packet_sender.clone(), packet_receiver, - response_receiver, + event_queue.clone(), ).fuse() => r.map_err(|e| ClientError::TcpError(e)), _ = udp::handle( Arc::clone(&state), @@ -46,7 +45,7 @@ pub async fn handle( _ = command::handle( state, command_receiver, - response_sender, + event_queue, ping_request_sender, packet_sender, connection_info_sender, diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 1337dce..a62ddbd 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,8 +1,4 @@ -use crate::network::{ - ConnectionInfo, - tcp::{TcpEvent, TcpEventCallback}, - udp::PingRequest -}; +use crate::network::{ConnectionInfo, tcp::TcpEventQueue, udp::PingRequest}; use crate::state::{ExecutionContext, State}; use log::*; @@ -17,7 +13,7 @@ pub async fn handle( Command, oneshot::Sender<mumlib::error::Result<Option<CommandResponse>>>, )>, - tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>, + tcp_event_queue: TcpEventQueue, ping_request_sender: mpsc::UnboundedSender<PingRequest>, mut packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, mut connection_info_sender: watch::Sender<Option<ConnectionInfo>>, @@ -33,14 +29,14 @@ pub async fn handle( ExecutionContext::TcpEvent(event, generator) => { let (tx, rx) = oneshot::channel(); //TODO handle this error - let _ = tcp_event_register_sender.send(( - event, + tcp_event_queue.register_callback( + event, Box::new(move |e| { let response = generator(e); response_sender.send(response).unwrap(); tx.send(()).unwrap(); }), - )); + ); rx.await.unwrap(); } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 8f34cd8..b6e939a 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -57,14 +57,14 @@ impl<'a> From<&TcpEventData<'a>> for TcpEvent { } #[derive(Clone)] -struct TcpEventQueue { +pub struct TcpEventQueue { callbacks: Arc<RwLock<HashMap<TcpEvent, Vec<TcpEventCallback>>>>, subscribers: Arc<RwLock<HashMap<TcpEvent, Vec<TcpEventSubscriber>>>>, } impl TcpEventQueue { /// Creates a new `TcpEventQueue`. - fn new() -> Self { + pub fn new() -> Self { Self { callbacks: Arc::new(RwLock::new(HashMap::new())), subscribers: Arc::new(RwLock::new(HashMap::new())), @@ -72,18 +72,18 @@ impl TcpEventQueue { } /// Registers a new callback to be triggered when an event is fired. - fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { + pub fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { self.callbacks.write().unwrap().entry(at).or_default().push(callback); } /// Registers a new callback to be triggered when an event is fired. - fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { + pub fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { self.subscribers.write().unwrap().entry(at).or_default().push(callback); } /// Fires all callbacks related to a specific TCP event and removes them from the event queue. /// Also calls all event subscribers, but keeps them in the queue - fn resolve<'a>(&self, data: TcpEventData<'a>) { + pub fn resolve<'a>(&self, data: TcpEventData<'a>) { if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) { let old = std::mem::take(vec); for handler in old { @@ -107,7 +107,7 @@ pub async fn handle( crypt_state_sender: mpsc::Sender<ClientCryptState>, packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, - mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, + event_queue: TcpEventQueue, ) -> Result<(), TcpError> { loop { let connection_info = 'data: loop { @@ -137,7 +137,6 @@ pub async fn handle( (state_lock.phase_receiver(), state_lock.audio_input().receiver()) }; - let event_queue = TcpEventQueue::new(); info!("Logging in..."); @@ -160,7 +159,6 @@ pub async fn handle( phase_watcher_inner, ).fuse() => r, r = send_packets(sink, &mut packet_receiver).fuse() => r, - _ = register_events(&mut tcp_event_register_receiver, event_queue.clone()).fuse() => Ok(()), } }, phase_watcher, @@ -403,13 +401,3 @@ async fn listen( } Ok(()) } - -async fn register_events( - tcp_event_register_receiver: &mut mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, - event_queue: TcpEventQueue, -) { - loop { - let (event, handler) = tcp_event_register_receiver.recv().await.unwrap(); - event_queue.register_callback(event, handler); - } -} diff --git a/mumd/src/state.rs b/mumd/src/state.rs index bffc082..91c6ee7 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -393,7 +393,7 @@ impl State { self.audio_output.set_user_volume(user_id, volume); now!(Ok(None)) } - Command::PastMessages => { + Command::PastMessages { block } => { let server = match self.server.as_ref() { Some(s) => s, None => return now!(Err(Error::Disconnected)), diff --git a/mumlib/src/command.rs b/mumlib/src/command.rs index 5155aaa..4e8775f 100644 --- a/mumlib/src/command.rs +++ b/mumlib/src/command.rs @@ -29,7 +29,9 @@ pub enum Command { }, Status, UserVolumeSet(String, f32), - PastMessages, + PastMessages { + block: bool, + }, SendMessage { message: String, targets: Vec<MessageTarget>, |
