aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src
diff options
context:
space:
mode:
authorEskil Queseth <eskilq@kth.se>2021-05-22 01:27:17 +0200
committerEskil Queseth <eskilq@kth.se>2021-05-22 01:27:17 +0200
commitaa710a3420ef4d834ee1df4099b25f3c83b9c31d (patch)
tree1ec20ad483eaf0602f55b1f50fb45826b888b05f /mumd/src
parentf72440096cefecbe62a37813ea3ee6f3cd3c7299 (diff)
downloadmum-aa710a3420ef4d834ee1df4099b25f3c83b9c31d.tar.gz
rework command response mechanism
Diffstat (limited to 'mumd/src')
-rw-r--r--mumd/src/command.rs9
-rw-r--r--mumd/src/main.rs8
-rw-r--r--mumd/src/state.rs25
3 files changed, 28 insertions, 14 deletions
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index f02ad19..5255afa 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -29,7 +29,9 @@ pub async fn handle(
event,
Box::new(move |e| {
let response = generator(e);
- response_sender.send(response).unwrap();
+ for response in response {
+ response_sender.send(response).unwrap();
+ }
}),
);
}
@@ -42,7 +44,10 @@ pub async fn handle(
)
}
ExecutionContext::Now(generator) => {
- response_sender.send(generator()).unwrap();
+ for response in generator() {
+ response_sender.send(response).unwrap();
+ }
+ drop(response_sender);
}
ExecutionContext::Ping(generator, converter) => {
let ret = generator();
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index 12a8802..0c175c2 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -8,13 +8,14 @@ mod state;
use crate::state::State;
+use bytes::{BufMut, BytesMut};
use futures_util::{select, FutureExt, SinkExt, StreamExt};
use log::*;
use mumlib::command::{Command, CommandResponse};
use mumlib::setup_logger;
+use std::io::ErrorKind;
use tokio::{net::{UnixListener, UnixStream}, sync::mpsc};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
-use bytes::{BufMut, BytesMut};
#[tokio::main]
async fn main() {
@@ -114,7 +115,10 @@ async fn receive_commands(
bincode::serialize_into((&mut serialized).writer(), &response).unwrap();
if let Err(e) = writer.send(serialized.freeze()).await {
- error!("Error sending response: {:?}", e);
+ if e.kind() != ErrorKind::BrokenPipe { //if the client closed the connection, ignore logging the error
+ //we just assume that they just don't want any more packets
+ error!("Error sending response: {:?}", e);
+ }
break;
}
}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index 8072b8e..a224afc 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -18,7 +18,7 @@ use mumlib::command::{Command, CommandResponse, MessageTarget};
use mumlib::config::Config;
use mumlib::Error;
use crate::state::user::UserDiff;
-use std::{net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}};
+use std::{iter, net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}};
use tokio::sync::{mpsc, watch};
macro_rules! at {
@@ -29,21 +29,23 @@ macro_rules! at {
macro_rules! now {
($data:expr) => {
- ExecutionContext::Now(Box::new(move || $data))
+ ExecutionContext::Now(Box::new(move || Box::new(iter::once($data))))
};
}
+type Responses = Box<dyn Iterator<Item = mumlib::error::Result<Option<CommandResponse>>>>;
+
//TODO give me a better name
pub enum ExecutionContext {
TcpEventCallback(
TcpEvent,
- Box<dyn FnOnce(TcpEventData) -> mumlib::error::Result<Option<CommandResponse>>>,
+ Box<dyn FnOnce(TcpEventData) -> Responses>,
),
TcpEventSubscriber(
TcpEvent,
Box<dyn FnMut(TcpEventData, &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>) -> bool>,
),
- Now(Box<dyn FnOnce() -> mumlib::error::Result<Option<CommandResponse>>>),
+ Now(Box<dyn FnOnce() -> Responses>),
Ping(
Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>,
Box<dyn FnOnce(Option<PongPacket>) -> mumlib::error::Result<Option<CommandResponse>> + Send>,
@@ -545,7 +547,7 @@ pub fn handle_command(
at!(TcpEvent::Connected, |res| {
//runs the closure when the client is connected
if let TcpEventData::Connected(res) = res {
- res.map(|msg| {
+ Box::new(iter::once(res.map(|msg| {
Some(CommandResponse::ServerConnect {
welcome_message: if msg.has_welcome_text() {
Some(msg.get_welcome_text().to_string())
@@ -553,7 +555,7 @@ pub fn handle_command(
None
},
})
- })
+ })))
} else {
unreachable!("callback should be provided with a TcpEventData::Connected");
}
@@ -584,12 +586,12 @@ pub fn handle_command(
}
}),
Box::new(move |pong| {
- Ok(pong.map(|pong| CommandResponse::ServerStatus {
+ Ok(pong.map(|pong| (CommandResponse::ServerStatus {
version: pong.version,
users: pong.users,
max_users: pong.max_users,
bandwidth: pong.bandwidth,
- }))
+ })))
}),
),
Command::Status => {
@@ -643,11 +645,14 @@ pub fn handle_command(
)
} else {
let messages = std::mem::take(&mut state.message_buffer);
- let messages = messages.into_iter()
+ let messages: Vec<_> = messages.into_iter()
.map(|(msg, user)| (msg, state.get_user_name(user).unwrap()))
+ .map(|e| Ok(Some(CommandResponse::PastMessage { message: e })))
.collect();
- now!(Ok(Some(CommandResponse::PastMessages { messages })))
+ ExecutionContext::Now(Box::new(move || {
+ Box::new(messages.into_iter())
+ }))
}
}
Command::SendMessage { message, targets } => {