diff options
| author | Kapten Z∅∅m <55669224+default-username-852@users.noreply.github.com> | 2021-06-06 23:19:05 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-06-06 23:19:05 +0200 |
| commit | 360b232de29f0104a8beb0c57e8defd9e54c9e6c (patch) | |
| tree | 3595d6ae9dbe293ef0403ce581edd4742569147c /mumd/src/command.rs | |
| parent | ea8b1906e14c3b319d3ad184b6d7cfc507c23b4f (diff) | |
| parent | 55a12fbdfb435886b2f211fe1fb00daafb32b6a7 (diff) | |
| download | mum-360b232de29f0104a8beb0c57e8defd9e54c9e6c.tar.gz | |
Merge pull request #92 from mum-rs/text-message
Text message
Diffstat (limited to 'mumd/src/command.rs')
| -rw-r--r-- | mumd/src/command.rs | 48 |
1 files changed, 25 insertions, 23 deletions
diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 1337dce..5255afa 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,51 +1,53 @@ -use crate::network::{ - ConnectionInfo, - tcp::{TcpEvent, TcpEventCallback}, - udp::PingRequest -}; +use crate::network::{ConnectionInfo, tcp::TcpEventQueue, udp::PingRequest}; use crate::state::{ExecutionContext, State}; use log::*; use mumble_protocol::{Serverbound, control::ControlPacket}; use mumlib::command::{Command, CommandResponse}; use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock}; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::sync::{mpsc, watch}; pub async fn handle( state: Arc<RwLock<State>>, mut command_receiver: mpsc::UnboundedReceiver<( Command, - oneshot::Sender<mumlib::error::Result<Option<CommandResponse>>>, + mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>, )>, - tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>, + tcp_event_queue: TcpEventQueue, ping_request_sender: mpsc::UnboundedSender<PingRequest>, mut packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, mut connection_info_sender: watch::Sender<Option<ConnectionInfo>>, ) { debug!("Begin listening for commands"); let ping_count = AtomicU64::new(0); - while let Some((command, response_sender)) = command_receiver.recv().await { + while let Some((command, mut response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); - let mut state = state.write().unwrap(); - let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender); - drop(state); + let event = crate::state::handle_command(Arc::clone(&state), command, &mut packet_sender, &mut connection_info_sender); match event { - ExecutionContext::TcpEvent(event, generator) => { - let (tx, rx) = oneshot::channel(); - //TODO handle this error - let _ = tcp_event_register_sender.send(( - event, + ExecutionContext::TcpEventCallback(event, generator) => { + tcp_event_queue.register_callback( + event, Box::new(move |e| { let response = generator(e); - response_sender.send(response).unwrap(); - tx.send(()).unwrap(); + for response in response { + response_sender.send(response).unwrap(); + } }), - )); - - rx.await.unwrap(); + ); + } + ExecutionContext::TcpEventSubscriber(event, mut handler) => { + tcp_event_queue.register_subscriber( + event, + Box::new(move |event| { + handler(event, &mut response_sender) + }), + ) } ExecutionContext::Now(generator) => { - response_sender.send(generator()).unwrap(); + for response in generator() { + response_sender.send(response).unwrap(); + } + drop(response_sender); } ExecutionContext::Ping(generator, converter) => { let ret = generator(); |
