aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEskil Queseth <eskilq@kth.se>2021-05-22 01:27:17 +0200
committerEskil Queseth <eskilq@kth.se>2021-05-22 01:27:17 +0200
commitaa710a3420ef4d834ee1df4099b25f3c83b9c31d (patch)
tree1ec20ad483eaf0602f55b1f50fb45826b888b05f
parentf72440096cefecbe62a37813ea3ee6f3cd3c7299 (diff)
downloadmum-aa710a3420ef4d834ee1df4099b25f3c83b9c31d.tar.gz
rework command response mechanism
-rw-r--r--Cargo.lock1
-rw-r--r--mumctl/Cargo.toml1
-rw-r--r--mumctl/src/main.rs72
-rw-r--r--mumd/src/command.rs9
-rw-r--r--mumd/src/main.rs8
-rw-r--r--mumd/src/state.rs25
-rw-r--r--mumlib/src/command.rs3
7 files changed, 93 insertions, 26 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ccb8871..19bb4cb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -860,6 +860,7 @@ dependencies = [
"colored",
"log",
"mumlib",
+ "serde",
"structopt",
]
diff --git a/mumctl/Cargo.toml b/mumctl/Cargo.toml
index fff2a1c..3467ffc 100644
--- a/mumctl/Cargo.toml
+++ b/mumctl/Cargo.toml
@@ -18,5 +18,6 @@ bincode = "1"
colored = "2"
log = "0.4"
structopt = "0.3"
+serde = "1"
#cursive = "0.15"
diff --git a/mumctl/src/main.rs b/mumctl/src/main.rs
index 4e1249e..318aa3c 100644
--- a/mumctl/src/main.rs
+++ b/mumctl/src/main.rs
@@ -4,11 +4,13 @@ use mumlib::command::{Command as MumCommand, CommandResponse, MessageTarget};
use mumlib::config::{self, Config, ServerConfig};
use mumlib::state::Channel as MumChannel;
use std::fmt;
+use std::marker::PhantomData;
use std::io::{self, BufRead, Read, Write};
use std::iter;
use std::os::unix::net::UnixStream;
use std::thread;
use structopt::{clap::Shell, StructOpt};
+use serde::de::DeserializeOwned;
const INDENTATION: &str = " ";
@@ -88,8 +90,11 @@ enum Command {
Deafen,
/// Undeafen yourself
Undeafen,
- /// Get messages
- Messages,
+ /// Get messages sent to the server you're currently connected to
+ Messages {
+ #[structopt(short = "i", long = "interactive")]
+ interactive: bool,
+ },
/// Send a message to a channel or a user
Message(Target),
}
@@ -372,14 +377,15 @@ fn match_opt() -> Result<(), Error> {
Command::Undeafen => {
send_command(MumCommand::DeafenSelf(Some(false)))??;
}
- Command::Messages => {
- match send_command(MumCommand::PastMessages { block: false })?? {
- Some(CommandResponse::PastMessages { messages }) => {
- for (msg, sender) in messages {
- println!("{}: {}", sender, msg);
- }
+ Command::Messages {
+ interactive
+ } => {
+ for response in send_command_multi(MumCommand::PastMessages { block: interactive })? {
+ match response {
+ Ok(Some(CommandResponse::PastMessage { message })) => println!("{}: {}", message.1, message.0),
+ Ok(_) => unreachable!("Response should only be a Some(PastMessages)"),
+ Err(e) => error!("{}", e),
}
- _ => unreachable!("Response should only be a PastMessages"),
}
}
Command::Message(target) => {
@@ -660,6 +666,54 @@ fn send_command(
bincode::deserialize_from(&mut connection).map_err(|_| CliError::ConnectionError)
}
+fn send_command_multi(
+ command: MumCommand,
+) -> Result<impl Iterator<Item = mumlib::error::Result<Option<CommandResponse>>>, CliError> {
+ let mut connection =
+ UnixStream::connect(mumlib::SOCKET_PATH).map_err(|_| CliError::ConnectionError)?;
+
+ let serialized = bincode::serialize(&command).unwrap();
+
+ connection
+ .write(&(serialized.len() as u32).to_be_bytes())
+ .map_err(|_| CliError::ConnectionError)?;
+ connection
+ .write(&serialized)
+ .map_err(|_| CliError::ConnectionError)?;
+
+ connection.shutdown(std::net::Shutdown::Write)
+ .map_err(|_| CliError::ConnectionError)?;
+
+ Ok(BincodeIter::new(connection))
+}
+
+struct BincodeIter<R, I> {
+ reader: R,
+ phantom: PhantomData<*const I>,
+}
+
+impl<R, I> BincodeIter<R, I> {
+ fn new(reader: R) -> Self {
+ Self {
+ reader,
+ phantom: PhantomData,
+ }
+ }
+}
+
+impl<R, I> Iterator for BincodeIter<R, I>
+ where R: Read, I: DeserializeOwned {
+ type Item = I;
+
+ #[inline]
+ fn next(&mut self) -> Option<Self::Item> {
+ self.reader
+ .read_exact(&mut [0; 4])
+ .ok()?;
+ bincode::deserialize_from(&mut self.reader).ok()
+ }
+}
+
fn print_channel(channel: &MumChannel, depth: usize) {
println!(
"{}{}{}",
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index f02ad19..5255afa 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -29,7 +29,9 @@ pub async fn handle(
event,
Box::new(move |e| {
let response = generator(e);
- response_sender.send(response).unwrap();
+ for response in response {
+ response_sender.send(response).unwrap();
+ }
}),
);
}
@@ -42,7 +44,10 @@ pub async fn handle(
)
}
ExecutionContext::Now(generator) => {
- response_sender.send(generator()).unwrap();
+ for response in generator() {
+ response_sender.send(response).unwrap();
+ }
+ drop(response_sender);
}
ExecutionContext::Ping(generator, converter) => {
let ret = generator();
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index 12a8802..0c175c2 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -8,13 +8,14 @@ mod state;
use crate::state::State;
+use bytes::{BufMut, BytesMut};
use futures_util::{select, FutureExt, SinkExt, StreamExt};
use log::*;
use mumlib::command::{Command, CommandResponse};
use mumlib::setup_logger;
+use std::io::ErrorKind;
use tokio::{net::{UnixListener, UnixStream}, sync::mpsc};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
-use bytes::{BufMut, BytesMut};
#[tokio::main]
async fn main() {
@@ -114,7 +115,10 @@ async fn receive_commands(
bincode::serialize_into((&mut serialized).writer(), &response).unwrap();
if let Err(e) = writer.send(serialized.freeze()).await {
- error!("Error sending response: {:?}", e);
+ if e.kind() != ErrorKind::BrokenPipe { //if the client closed the connection, ignore logging the error
+ //we just assume that they just don't want any more packets
+ error!("Error sending response: {:?}", e);
+ }
break;
}
}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index 8072b8e..a224afc 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -18,7 +18,7 @@ use mumlib::command::{Command, CommandResponse, MessageTarget};
use mumlib::config::Config;
use mumlib::Error;
use crate::state::user::UserDiff;
-use std::{net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}};
+use std::{iter, net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}};
use tokio::sync::{mpsc, watch};
macro_rules! at {
@@ -29,21 +29,23 @@ macro_rules! at {
macro_rules! now {
($data:expr) => {
- ExecutionContext::Now(Box::new(move || $data))
+ ExecutionContext::Now(Box::new(move || Box::new(iter::once($data))))
};
}
+type Responses = Box<dyn Iterator<Item = mumlib::error::Result<Option<CommandResponse>>>>;
+
//TODO give me a better name
pub enum ExecutionContext {
TcpEventCallback(
TcpEvent,
- Box<dyn FnOnce(TcpEventData) -> mumlib::error::Result<Option<CommandResponse>>>,
+ Box<dyn FnOnce(TcpEventData) -> Responses>,
),
TcpEventSubscriber(
TcpEvent,
Box<dyn FnMut(TcpEventData, &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>) -> bool>,
),
- Now(Box<dyn FnOnce() -> mumlib::error::Result<Option<CommandResponse>>>),
+ Now(Box<dyn FnOnce() -> Responses>),
Ping(
Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>,
Box<dyn FnOnce(Option<PongPacket>) -> mumlib::error::Result<Option<CommandResponse>> + Send>,
@@ -545,7 +547,7 @@ pub fn handle_command(
at!(TcpEvent::Connected, |res| {
//runs the closure when the client is connected
if let TcpEventData::Connected(res) = res {
- res.map(|msg| {
+ Box::new(iter::once(res.map(|msg| {
Some(CommandResponse::ServerConnect {
welcome_message: if msg.has_welcome_text() {
Some(msg.get_welcome_text().to_string())
@@ -553,7 +555,7 @@ pub fn handle_command(
None
},
})
- })
+ })))
} else {
unreachable!("callback should be provided with a TcpEventData::Connected");
}
@@ -584,12 +586,12 @@ pub fn handle_command(
}
}),
Box::new(move |pong| {
- Ok(pong.map(|pong| CommandResponse::ServerStatus {
+ Ok(pong.map(|pong| (CommandResponse::ServerStatus {
version: pong.version,
users: pong.users,
max_users: pong.max_users,
bandwidth: pong.bandwidth,
- }))
+ })))
}),
),
Command::Status => {
@@ -643,11 +645,14 @@ pub fn handle_command(
)
} else {
let messages = std::mem::take(&mut state.message_buffer);
- let messages = messages.into_iter()
+ let messages: Vec<_> = messages.into_iter()
.map(|(msg, user)| (msg, state.get_user_name(user).unwrap()))
+ .map(|e| Ok(Some(CommandResponse::PastMessage { message: e })))
.collect();
- now!(Ok(Some(CommandResponse::PastMessages { messages })))
+ ExecutionContext::Now(Box::new(move || {
+ Box::new(messages.into_iter())
+ }))
}
}
Command::SendMessage { message, targets } => {
diff --git a/mumlib/src/command.rs b/mumlib/src/command.rs
index c4fc913..847b7fd 100644
--- a/mumlib/src/command.rs
+++ b/mumlib/src/command.rs
@@ -62,9 +62,6 @@ pub enum CommandResponse {
Status {
server_state: Server,
},
- PastMessages {
- messages: Vec<(String, String)>,
- },
PastMessage {
message: (String, String),
}