diff options
| author | Eskil Queseth <eskilq@kth.se> | 2021-05-22 01:27:17 +0200 |
|---|---|---|
| committer | Eskil Queseth <eskilq@kth.se> | 2021-05-22 01:27:17 +0200 |
| commit | aa710a3420ef4d834ee1df4099b25f3c83b9c31d (patch) | |
| tree | 1ec20ad483eaf0602f55b1f50fb45826b888b05f | |
| parent | f72440096cefecbe62a37813ea3ee6f3cd3c7299 (diff) | |
| download | mum-aa710a3420ef4d834ee1df4099b25f3c83b9c31d.tar.gz | |
rework command response mechanism
| -rw-r--r-- | Cargo.lock | 1 | ||||
| -rw-r--r-- | mumctl/Cargo.toml | 1 | ||||
| -rw-r--r-- | mumctl/src/main.rs | 72 | ||||
| -rw-r--r-- | mumd/src/command.rs | 9 | ||||
| -rw-r--r-- | mumd/src/main.rs | 8 | ||||
| -rw-r--r-- | mumd/src/state.rs | 25 | ||||
| -rw-r--r-- | mumlib/src/command.rs | 3 |
7 files changed, 93 insertions, 26 deletions
@@ -860,6 +860,7 @@ dependencies = [ "colored", "log", "mumlib", + "serde", "structopt", ] diff --git a/mumctl/Cargo.toml b/mumctl/Cargo.toml index fff2a1c..3467ffc 100644 --- a/mumctl/Cargo.toml +++ b/mumctl/Cargo.toml @@ -18,5 +18,6 @@ bincode = "1" colored = "2" log = "0.4" structopt = "0.3" +serde = "1" #cursive = "0.15" diff --git a/mumctl/src/main.rs b/mumctl/src/main.rs index 4e1249e..318aa3c 100644 --- a/mumctl/src/main.rs +++ b/mumctl/src/main.rs @@ -4,11 +4,13 @@ use mumlib::command::{Command as MumCommand, CommandResponse, MessageTarget}; use mumlib::config::{self, Config, ServerConfig}; use mumlib::state::Channel as MumChannel; use std::fmt; +use std::marker::PhantomData; use std::io::{self, BufRead, Read, Write}; use std::iter; use std::os::unix::net::UnixStream; use std::thread; use structopt::{clap::Shell, StructOpt}; +use serde::de::DeserializeOwned; const INDENTATION: &str = " "; @@ -88,8 +90,11 @@ enum Command { Deafen, /// Undeafen yourself Undeafen, - /// Get messages - Messages, + /// Get messages sent to the server you're currently connected to + Messages { + #[structopt(short = "i", long = "interactive")] + interactive: bool, + }, /// Send a message to a channel or a user Message(Target), } @@ -372,14 +377,15 @@ fn match_opt() -> Result<(), Error> { Command::Undeafen => { send_command(MumCommand::DeafenSelf(Some(false)))??; } - Command::Messages => { - match send_command(MumCommand::PastMessages { block: false })?? { - Some(CommandResponse::PastMessages { messages }) => { - for (msg, sender) in messages { - println!("{}: {}", sender, msg); - } + Command::Messages { + interactive + } => { + for response in send_command_multi(MumCommand::PastMessages { block: interactive })? { + match response { + Ok(Some(CommandResponse::PastMessage { message })) => println!("{}: {}", message.1, message.0), + Ok(_) => unreachable!("Response should only be a Some(PastMessages)"), + Err(e) => error!("{}", e), } - _ => unreachable!("Response should only be a PastMessages"), } } Command::Message(target) => { @@ -660,6 +666,54 @@ fn send_command( bincode::deserialize_from(&mut connection).map_err(|_| CliError::ConnectionError) } +fn send_command_multi( + command: MumCommand, +) -> Result<impl Iterator<Item = mumlib::error::Result<Option<CommandResponse>>>, CliError> { + let mut connection = + UnixStream::connect(mumlib::SOCKET_PATH).map_err(|_| CliError::ConnectionError)?; + + let serialized = bincode::serialize(&command).unwrap(); + + connection + .write(&(serialized.len() as u32).to_be_bytes()) + .map_err(|_| CliError::ConnectionError)?; + connection + .write(&serialized) + .map_err(|_| CliError::ConnectionError)?; + + connection.shutdown(std::net::Shutdown::Write) + .map_err(|_| CliError::ConnectionError)?; + + Ok(BincodeIter::new(connection)) +} + +struct BincodeIter<R, I> { + reader: R, + phantom: PhantomData<*const I>, +} + +impl<R, I> BincodeIter<R, I> { + fn new(reader: R) -> Self { + Self { + reader, + phantom: PhantomData, + } + } +} + +impl<R, I> Iterator for BincodeIter<R, I> + where R: Read, I: DeserializeOwned { + type Item = I; + + #[inline] + fn next(&mut self) -> Option<Self::Item> { + self.reader + .read_exact(&mut [0; 4]) + .ok()?; + bincode::deserialize_from(&mut self.reader).ok() + } +} + fn print_channel(channel: &MumChannel, depth: usize) { println!( "{}{}{}", diff --git a/mumd/src/command.rs b/mumd/src/command.rs index f02ad19..5255afa 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -29,7 +29,9 @@ pub async fn handle( event, Box::new(move |e| { let response = generator(e); - response_sender.send(response).unwrap(); + for response in response { + response_sender.send(response).unwrap(); + } }), ); } @@ -42,7 +44,10 @@ pub async fn handle( ) } ExecutionContext::Now(generator) => { - response_sender.send(generator()).unwrap(); + for response in generator() { + response_sender.send(response).unwrap(); + } + drop(response_sender); } ExecutionContext::Ping(generator, converter) => { let ret = generator(); diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 12a8802..0c175c2 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -8,13 +8,14 @@ mod state; use crate::state::State; +use bytes::{BufMut, BytesMut}; use futures_util::{select, FutureExt, SinkExt, StreamExt}; use log::*; use mumlib::command::{Command, CommandResponse}; use mumlib::setup_logger; +use std::io::ErrorKind; use tokio::{net::{UnixListener, UnixStream}, sync::mpsc}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; -use bytes::{BufMut, BytesMut}; #[tokio::main] async fn main() { @@ -114,7 +115,10 @@ async fn receive_commands( bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); if let Err(e) = writer.send(serialized.freeze()).await { - error!("Error sending response: {:?}", e); + if e.kind() != ErrorKind::BrokenPipe { //if the client closed the connection, ignore logging the error + //we just assume that they just don't want any more packets + error!("Error sending response: {:?}", e); + } break; } } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 8072b8e..a224afc 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -18,7 +18,7 @@ use mumlib::command::{Command, CommandResponse, MessageTarget}; use mumlib::config::Config; use mumlib::Error; use crate::state::user::UserDiff; -use std::{net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}}; +use std::{iter, net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}}; use tokio::sync::{mpsc, watch}; macro_rules! at { @@ -29,21 +29,23 @@ macro_rules! at { macro_rules! now { ($data:expr) => { - ExecutionContext::Now(Box::new(move || $data)) + ExecutionContext::Now(Box::new(move || Box::new(iter::once($data)))) }; } +type Responses = Box<dyn Iterator<Item = mumlib::error::Result<Option<CommandResponse>>>>; + //TODO give me a better name pub enum ExecutionContext { TcpEventCallback( TcpEvent, - Box<dyn FnOnce(TcpEventData) -> mumlib::error::Result<Option<CommandResponse>>>, + Box<dyn FnOnce(TcpEventData) -> Responses>, ), TcpEventSubscriber( TcpEvent, Box<dyn FnMut(TcpEventData, &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>) -> bool>, ), - Now(Box<dyn FnOnce() -> mumlib::error::Result<Option<CommandResponse>>>), + Now(Box<dyn FnOnce() -> Responses>), Ping( Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>, Box<dyn FnOnce(Option<PongPacket>) -> mumlib::error::Result<Option<CommandResponse>> + Send>, @@ -545,7 +547,7 @@ pub fn handle_command( at!(TcpEvent::Connected, |res| { //runs the closure when the client is connected if let TcpEventData::Connected(res) = res { - res.map(|msg| { + Box::new(iter::once(res.map(|msg| { Some(CommandResponse::ServerConnect { welcome_message: if msg.has_welcome_text() { Some(msg.get_welcome_text().to_string()) @@ -553,7 +555,7 @@ pub fn handle_command( None }, }) - }) + }))) } else { unreachable!("callback should be provided with a TcpEventData::Connected"); } @@ -584,12 +586,12 @@ pub fn handle_command( } }), Box::new(move |pong| { - Ok(pong.map(|pong| CommandResponse::ServerStatus { + Ok(pong.map(|pong| (CommandResponse::ServerStatus { version: pong.version, users: pong.users, max_users: pong.max_users, bandwidth: pong.bandwidth, - })) + }))) }), ), Command::Status => { @@ -643,11 +645,14 @@ pub fn handle_command( ) } else { let messages = std::mem::take(&mut state.message_buffer); - let messages = messages.into_iter() + let messages: Vec<_> = messages.into_iter() .map(|(msg, user)| (msg, state.get_user_name(user).unwrap())) + .map(|e| Ok(Some(CommandResponse::PastMessage { message: e }))) .collect(); - now!(Ok(Some(CommandResponse::PastMessages { messages }))) + ExecutionContext::Now(Box::new(move || { + Box::new(messages.into_iter()) + })) } } Command::SendMessage { message, targets } => { diff --git a/mumlib/src/command.rs b/mumlib/src/command.rs index c4fc913..847b7fd 100644 --- a/mumlib/src/command.rs +++ b/mumlib/src/command.rs @@ -62,9 +62,6 @@ pub enum CommandResponse { Status { server_state: Server, }, - PastMessages { - messages: Vec<(String, String)>, - }, PastMessage { message: (String, String), } |
