aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src
diff options
context:
space:
mode:
authorEskil Queseth <eskilq@kth.se>2021-05-19 02:27:27 +0200
committerEskil Queseth <eskilq@kth.se>2021-05-19 02:27:27 +0200
commit5d05d292ddb7f8b28b71abd46930028b6e66dfde (patch)
treeabd10727e7c7e5ec004ec14ced7189d2c1c0687c /mumd/src
parent0b2efad3e9aa569c27d339a5eca17c96155b4f9d (diff)
downloadmum-5d05d292ddb7f8b28b71abd46930028b6e66dfde.tar.gz
add support for sending multiple responses
Diffstat (limited to 'mumd/src')
-rw-r--r--mumd/src/client.rs4
-rw-r--r--mumd/src/command.rs16
-rw-r--r--mumd/src/main.rs23
-rw-r--r--mumd/src/state.rs8
4 files changed, 30 insertions, 21 deletions
diff --git a/mumd/src/client.rs b/mumd/src/client.rs
index 3c491da..ba9cad4 100644
--- a/mumd/src/client.rs
+++ b/mumd/src/client.rs
@@ -7,13 +7,13 @@ use futures_util::{select, FutureExt};
use mumble_protocol::{Serverbound, control::ControlPacket, crypt::ClientCryptState};
use mumlib::command::{Command, CommandResponse};
use std::sync::{Arc, RwLock};
-use tokio::sync::{mpsc, oneshot, watch};
+use tokio::sync::{mpsc, watch};
pub async fn handle(
state: State,
command_receiver: mpsc::UnboundedReceiver<(
Command,
- oneshot::Sender<mumlib::error::Result<Option<CommandResponse>>>,
+ mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
)>,
) -> Result<(), ClientError> {
let (connection_info_sender, connection_info_receiver) =
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index d101104..a1e8b21 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -5,13 +5,13 @@ use log::*;
use mumble_protocol::{Serverbound, control::ControlPacket};
use mumlib::command::{Command, CommandResponse};
use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock};
-use tokio::sync::{mpsc, oneshot, watch};
+use tokio::sync::{mpsc, watch};
pub async fn handle(
state: Arc<RwLock<State>>,
mut command_receiver: mpsc::UnboundedReceiver<(
Command,
- oneshot::Sender<mumlib::error::Result<Option<CommandResponse>>>,
+ mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
)>,
tcp_event_queue: TcpEventQueue,
ping_request_sender: mpsc::UnboundedSender<PingRequest>,
@@ -20,13 +20,13 @@ pub async fn handle(
) {
debug!("Begin listening for commands");
let ping_count = AtomicU64::new(0);
- while let Some((command, response_sender)) = command_receiver.recv().await {
+ while let Some((command, mut response_sender)) = command_receiver.recv().await {
debug!("Received command {:?}", command);
let mut state = state.write().unwrap();
let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender);
drop(state);
match event {
- ExecutionContext::TcpEvent(event, generator) => {
+ ExecutionContext::TcpEventCallback(event, generator) => {
tcp_event_queue.register_callback(
event,
Box::new(move |e| {
@@ -35,6 +35,14 @@ pub async fn handle(
}),
);
}
+ ExecutionContext::TcpEventSubscriber(event, mut handler) => {
+ tcp_event_queue.register_subscriber(
+ event,
+ Box::new(move |event| {
+ handler(event, &mut response_sender)
+ }),
+ )
+ }
ExecutionContext::Now(generator) => {
response_sender.send(generator()).unwrap();
}
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index f298070..c34deab 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -12,7 +12,7 @@ use futures_util::{select, FutureExt, SinkExt, StreamExt};
use log::*;
use mumlib::command::{Command, CommandResponse};
use mumlib::setup_logger;
-use tokio::{net::{UnixListener, UnixStream}, sync::{mpsc, oneshot}};
+use tokio::{net::{UnixListener, UnixStream}, sync::mpsc};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use bytes::{BufMut, BytesMut};
@@ -81,7 +81,7 @@ async fn main() {
async fn receive_commands(
command_sender: mpsc::UnboundedSender<(
Command,
- oneshot::Sender<mumlib::error::Result<Option<CommandResponse>>>,
+ mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
)>,
) {
let socket = UnixListener::bind(mumlib::SOCKET_PATH).unwrap();
@@ -105,21 +105,18 @@ async fn receive_commands(
Err(_) => continue,
};
- let (tx, rx) = oneshot::channel();
+ let (tx, mut rx) = mpsc::unbounded_channel();
sender.send((command, tx)).unwrap();
- let response = match rx.await {
- Ok(r) => r,
- Err(_) => {
- error!("Internal command response sender dropped");
- Ok(None)
- }
- };
- let mut serialized = BytesMut::new();
- bincode::serialize_into((&mut serialized).writer(), &response).unwrap();
+ while let Some(response) = rx.recv().await {
+ let mut serialized = BytesMut::new();
+ bincode::serialize_into((&mut serialized).writer(), &response).unwrap();
- let _ = writer.send(serialized.freeze()).await;
+ if let Err(e) = writer.send(serialized.freeze()).await {
+ error!("Error sending response: {:?}", e);
+ }
+ }
}
});
}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index 91c6ee7..423ce76 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -23,7 +23,7 @@ use tokio::sync::{mpsc, watch};
macro_rules! at {
($event:expr, $generator:expr) => {
- ExecutionContext::TcpEvent($event, Box::new($generator))
+ ExecutionContext::TcpEventCallback($event, Box::new($generator))
};
}
@@ -35,10 +35,14 @@ macro_rules! now {
//TODO give me a better name
pub enum ExecutionContext {
- TcpEvent(
+ TcpEventCallback(
TcpEvent,
Box<dyn FnOnce(TcpEventData) -> mumlib::error::Result<Option<CommandResponse>>>,
),
+ TcpEventSubscriber(
+ TcpEvent,
+ Box<dyn FnMut(TcpEventData, &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>) -> bool>,
+ ),
Now(Box<dyn FnOnce() -> mumlib::error::Result<Option<CommandResponse>>>),
Ping(
Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>,