diff options
| author | Eskil Queseth <eskilq@kth.se> | 2021-05-22 01:27:17 +0200 |
|---|---|---|
| committer | Eskil Queseth <eskilq@kth.se> | 2021-05-22 01:27:17 +0200 |
| commit | aa710a3420ef4d834ee1df4099b25f3c83b9c31d (patch) | |
| tree | 1ec20ad483eaf0602f55b1f50fb45826b888b05f /mumd | |
| parent | f72440096cefecbe62a37813ea3ee6f3cd3c7299 (diff) | |
| download | mum-aa710a3420ef4d834ee1df4099b25f3c83b9c31d.tar.gz | |
rework command response mechanism
Diffstat (limited to 'mumd')
| -rw-r--r-- | mumd/src/command.rs | 9 | ||||
| -rw-r--r-- | mumd/src/main.rs | 8 | ||||
| -rw-r--r-- | mumd/src/state.rs | 25 |
3 files changed, 28 insertions, 14 deletions
diff --git a/mumd/src/command.rs b/mumd/src/command.rs index f02ad19..5255afa 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -29,7 +29,9 @@ pub async fn handle( event, Box::new(move |e| { let response = generator(e); - response_sender.send(response).unwrap(); + for response in response { + response_sender.send(response).unwrap(); + } }), ); } @@ -42,7 +44,10 @@ pub async fn handle( ) } ExecutionContext::Now(generator) => { - response_sender.send(generator()).unwrap(); + for response in generator() { + response_sender.send(response).unwrap(); + } + drop(response_sender); } ExecutionContext::Ping(generator, converter) => { let ret = generator(); diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 12a8802..0c175c2 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -8,13 +8,14 @@ mod state; use crate::state::State; +use bytes::{BufMut, BytesMut}; use futures_util::{select, FutureExt, SinkExt, StreamExt}; use log::*; use mumlib::command::{Command, CommandResponse}; use mumlib::setup_logger; +use std::io::ErrorKind; use tokio::{net::{UnixListener, UnixStream}, sync::mpsc}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; -use bytes::{BufMut, BytesMut}; #[tokio::main] async fn main() { @@ -114,7 +115,10 @@ async fn receive_commands( bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); if let Err(e) = writer.send(serialized.freeze()).await { - error!("Error sending response: {:?}", e); + if e.kind() != ErrorKind::BrokenPipe { //if the client closed the connection, ignore logging the error + //we just assume that they just don't want any more packets + error!("Error sending response: {:?}", e); + } break; } } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 8072b8e..a224afc 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -18,7 +18,7 @@ use mumlib::command::{Command, CommandResponse, MessageTarget}; use mumlib::config::Config; use mumlib::Error; use crate::state::user::UserDiff; -use std::{net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}}; +use std::{iter, net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}}; use tokio::sync::{mpsc, watch}; macro_rules! at { @@ -29,21 +29,23 @@ macro_rules! at { macro_rules! now { ($data:expr) => { - ExecutionContext::Now(Box::new(move || $data)) + ExecutionContext::Now(Box::new(move || Box::new(iter::once($data)))) }; } +type Responses = Box<dyn Iterator<Item = mumlib::error::Result<Option<CommandResponse>>>>; + //TODO give me a better name pub enum ExecutionContext { TcpEventCallback( TcpEvent, - Box<dyn FnOnce(TcpEventData) -> mumlib::error::Result<Option<CommandResponse>>>, + Box<dyn FnOnce(TcpEventData) -> Responses>, ), TcpEventSubscriber( TcpEvent, Box<dyn FnMut(TcpEventData, &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>) -> bool>, ), - Now(Box<dyn FnOnce() -> mumlib::error::Result<Option<CommandResponse>>>), + Now(Box<dyn FnOnce() -> Responses>), Ping( Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>, Box<dyn FnOnce(Option<PongPacket>) -> mumlib::error::Result<Option<CommandResponse>> + Send>, @@ -545,7 +547,7 @@ pub fn handle_command( at!(TcpEvent::Connected, |res| { //runs the closure when the client is connected if let TcpEventData::Connected(res) = res { - res.map(|msg| { + Box::new(iter::once(res.map(|msg| { Some(CommandResponse::ServerConnect { welcome_message: if msg.has_welcome_text() { Some(msg.get_welcome_text().to_string()) @@ -553,7 +555,7 @@ pub fn handle_command( None }, }) - }) + }))) } else { unreachable!("callback should be provided with a TcpEventData::Connected"); } @@ -584,12 +586,12 @@ pub fn handle_command( } }), Box::new(move |pong| { - Ok(pong.map(|pong| CommandResponse::ServerStatus { + Ok(pong.map(|pong| (CommandResponse::ServerStatus { version: pong.version, users: pong.users, max_users: pong.max_users, bandwidth: pong.bandwidth, - })) + }))) }), ), Command::Status => { @@ -643,11 +645,14 @@ pub fn handle_command( ) } else { let messages = std::mem::take(&mut state.message_buffer); - let messages = messages.into_iter() + let messages: Vec<_> = messages.into_iter() .map(|(msg, user)| (msg, state.get_user_name(user).unwrap())) + .map(|e| Ok(Some(CommandResponse::PastMessage { message: e }))) .collect(); - now!(Ok(Some(CommandResponse::PastMessages { messages }))) + ExecutionContext::Now(Box::new(move || { + Box::new(messages.into_iter()) + })) } } Command::SendMessage { message, targets } => { |
