aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/command.rs
diff options
context:
space:
mode:
Diffstat (limited to 'mumd/src/command.rs')
-rw-r--r--mumd/src/command.rs48
1 files changed, 25 insertions, 23 deletions
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index 1337dce..5255afa 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -1,51 +1,53 @@
-use crate::network::{
- ConnectionInfo,
- tcp::{TcpEvent, TcpEventCallback},
- udp::PingRequest
-};
+use crate::network::{ConnectionInfo, tcp::TcpEventQueue, udp::PingRequest};
use crate::state::{ExecutionContext, State};
use log::*;
use mumble_protocol::{Serverbound, control::ControlPacket};
use mumlib::command::{Command, CommandResponse};
use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock};
-use tokio::sync::{mpsc, oneshot, watch};
+use tokio::sync::{mpsc, watch};
pub async fn handle(
state: Arc<RwLock<State>>,
mut command_receiver: mpsc::UnboundedReceiver<(
Command,
- oneshot::Sender<mumlib::error::Result<Option<CommandResponse>>>,
+ mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
)>,
- tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>,
+ tcp_event_queue: TcpEventQueue,
ping_request_sender: mpsc::UnboundedSender<PingRequest>,
mut packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
mut connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
) {
debug!("Begin listening for commands");
let ping_count = AtomicU64::new(0);
- while let Some((command, response_sender)) = command_receiver.recv().await {
+ while let Some((command, mut response_sender)) = command_receiver.recv().await {
debug!("Received command {:?}", command);
- let mut state = state.write().unwrap();
- let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender);
- drop(state);
+ let event = crate::state::handle_command(Arc::clone(&state), command, &mut packet_sender, &mut connection_info_sender);
match event {
- ExecutionContext::TcpEvent(event, generator) => {
- let (tx, rx) = oneshot::channel();
- //TODO handle this error
- let _ = tcp_event_register_sender.send((
- event,
+ ExecutionContext::TcpEventCallback(event, generator) => {
+ tcp_event_queue.register_callback(
+ event,
Box::new(move |e| {
let response = generator(e);
- response_sender.send(response).unwrap();
- tx.send(()).unwrap();
+ for response in response {
+ response_sender.send(response).unwrap();
+ }
}),
- ));
-
- rx.await.unwrap();
+ );
+ }
+ ExecutionContext::TcpEventSubscriber(event, mut handler) => {
+ tcp_event_queue.register_subscriber(
+ event,
+ Box::new(move |event| {
+ handler(event, &mut response_sender)
+ }),
+ )
}
ExecutionContext::Now(generator) => {
- response_sender.send(generator()).unwrap();
+ for response in generator() {
+ response_sender.send(response).unwrap();
+ }
+ drop(response_sender);
}
ExecutionContext::Ping(generator, converter) => {
let ret = generator();