aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEskil Queseth <eskilq@kth.se>2021-05-19 02:09:58 +0200
committerEskil Queseth <eskilq@kth.se>2021-05-19 02:09:58 +0200
commitf551de2bbc5e41c5cd76e36c2b0a6f10d9b4cddf (patch)
tree750cbad28427067b66fa690236a25ea3374bd9aa
parentcf81a1141cdc6a6db842d992d065eba74829e0c7 (diff)
downloadmum-f551de2bbc5e41c5cd76e36c2b0a6f10d9b4cddf.tar.gz
remove event_register_handler from tcp stack
-rw-r--r--mumctl/src/main.rs2
-rw-r--r--mumd/src/client.rs9
-rw-r--r--mumd/src/command.rs14
-rw-r--r--mumd/src/network/tcp.rs24
-rw-r--r--mumd/src/state.rs2
-rw-r--r--mumlib/src/command.rs4
6 files changed, 20 insertions, 35 deletions
diff --git a/mumctl/src/main.rs b/mumctl/src/main.rs
index 5d3f332..4e1249e 100644
--- a/mumctl/src/main.rs
+++ b/mumctl/src/main.rs
@@ -373,7 +373,7 @@ fn match_opt() -> Result<(), Error> {
send_command(MumCommand::DeafenSelf(Some(false)))??;
}
Command::Messages => {
- match send_command(MumCommand::PastMessages)?? {
+ match send_command(MumCommand::PastMessages { block: false })?? {
Some(CommandResponse::PastMessages { messages }) => {
for (msg, sender) in messages {
println!("{}: {}", sender, msg);
diff --git a/mumd/src/client.rs b/mumd/src/client.rs
index 9c2c2a0..3c491da 100644
--- a/mumd/src/client.rs
+++ b/mumd/src/client.rs
@@ -1,4 +1,4 @@
-use crate::command;
+use crate::{command, network::tcp::TcpEventQueue};
use crate::error::ClientError;
use crate::network::{tcp, udp, ConnectionInfo};
use crate::state::State;
@@ -24,8 +24,7 @@ pub async fn handle(
mpsc::unbounded_channel::<ControlPacket<Serverbound>>();
let (ping_request_sender, ping_request_receiver) =
mpsc::unbounded_channel();
- let (response_sender, response_receiver) =
- mpsc::unbounded_channel();
+ let event_queue = TcpEventQueue::new();
let state = Arc::new(RwLock::new(state));
@@ -36,7 +35,7 @@ pub async fn handle(
crypt_state_sender,
packet_sender.clone(),
packet_receiver,
- response_receiver,
+ event_queue.clone(),
).fuse() => r.map_err(|e| ClientError::TcpError(e)),
_ = udp::handle(
Arc::clone(&state),
@@ -46,7 +45,7 @@ pub async fn handle(
_ = command::handle(
state,
command_receiver,
- response_sender,
+ event_queue,
ping_request_sender,
packet_sender,
connection_info_sender,
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index 1337dce..a62ddbd 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -1,8 +1,4 @@
-use crate::network::{
- ConnectionInfo,
- tcp::{TcpEvent, TcpEventCallback},
- udp::PingRequest
-};
+use crate::network::{ConnectionInfo, tcp::TcpEventQueue, udp::PingRequest};
use crate::state::{ExecutionContext, State};
use log::*;
@@ -17,7 +13,7 @@ pub async fn handle(
Command,
oneshot::Sender<mumlib::error::Result<Option<CommandResponse>>>,
)>,
- tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>,
+ tcp_event_queue: TcpEventQueue,
ping_request_sender: mpsc::UnboundedSender<PingRequest>,
mut packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
mut connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
@@ -33,14 +29,14 @@ pub async fn handle(
ExecutionContext::TcpEvent(event, generator) => {
let (tx, rx) = oneshot::channel();
//TODO handle this error
- let _ = tcp_event_register_sender.send((
- event,
+ tcp_event_queue.register_callback(
+ event,
Box::new(move |e| {
let response = generator(e);
response_sender.send(response).unwrap();
tx.send(()).unwrap();
}),
- ));
+ );
rx.await.unwrap();
}
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 8f34cd8..b6e939a 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -57,14 +57,14 @@ impl<'a> From<&TcpEventData<'a>> for TcpEvent {
}
#[derive(Clone)]
-struct TcpEventQueue {
+pub struct TcpEventQueue {
callbacks: Arc<RwLock<HashMap<TcpEvent, Vec<TcpEventCallback>>>>,
subscribers: Arc<RwLock<HashMap<TcpEvent, Vec<TcpEventSubscriber>>>>,
}
impl TcpEventQueue {
/// Creates a new `TcpEventQueue`.
- fn new() -> Self {
+ pub fn new() -> Self {
Self {
callbacks: Arc::new(RwLock::new(HashMap::new())),
subscribers: Arc::new(RwLock::new(HashMap::new())),
@@ -72,18 +72,18 @@ impl TcpEventQueue {
}
/// Registers a new callback to be triggered when an event is fired.
- fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) {
+ pub fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) {
self.callbacks.write().unwrap().entry(at).or_default().push(callback);
}
/// Registers a new callback to be triggered when an event is fired.
- fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) {
+ pub fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) {
self.subscribers.write().unwrap().entry(at).or_default().push(callback);
}
/// Fires all callbacks related to a specific TCP event and removes them from the event queue.
/// Also calls all event subscribers, but keeps them in the queue
- fn resolve<'a>(&self, data: TcpEventData<'a>) {
+ pub fn resolve<'a>(&self, data: TcpEventData<'a>) {
if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) {
let old = std::mem::take(vec);
for handler in old {
@@ -107,7 +107,7 @@ pub async fn handle(
crypt_state_sender: mpsc::Sender<ClientCryptState>,
packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>,
- mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>,
+ event_queue: TcpEventQueue,
) -> Result<(), TcpError> {
loop {
let connection_info = 'data: loop {
@@ -137,7 +137,6 @@ pub async fn handle(
(state_lock.phase_receiver(),
state_lock.audio_input().receiver())
};
- let event_queue = TcpEventQueue::new();
info!("Logging in...");
@@ -160,7 +159,6 @@ pub async fn handle(
phase_watcher_inner,
).fuse() => r,
r = send_packets(sink, &mut packet_receiver).fuse() => r,
- _ = register_events(&mut tcp_event_register_receiver, event_queue.clone()).fuse() => Ok(()),
}
},
phase_watcher,
@@ -403,13 +401,3 @@ async fn listen(
}
Ok(())
}
-
-async fn register_events(
- tcp_event_register_receiver: &mut mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>,
- event_queue: TcpEventQueue,
-) {
- loop {
- let (event, handler) = tcp_event_register_receiver.recv().await.unwrap();
- event_queue.register_callback(event, handler);
- }
-}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index bffc082..91c6ee7 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -393,7 +393,7 @@ impl State {
self.audio_output.set_user_volume(user_id, volume);
now!(Ok(None))
}
- Command::PastMessages => {
+ Command::PastMessages { block } => {
let server = match self.server.as_ref() {
Some(s) => s,
None => return now!(Err(Error::Disconnected)),
diff --git a/mumlib/src/command.rs b/mumlib/src/command.rs
index 5155aaa..4e8775f 100644
--- a/mumlib/src/command.rs
+++ b/mumlib/src/command.rs
@@ -29,7 +29,9 @@ pub enum Command {
},
Status,
UserVolumeSet(String, f32),
- PastMessages,
+ PastMessages {
+ block: bool,
+ },
SendMessage {
message: String,
targets: Vec<MessageTarget>,