use crate::network::{ConnectionInfo, tcp::TcpEventQueue, udp::PingRequest}; use crate::state::{ExecutionContext, State}; use log::*; use mumble_protocol::{Serverbound, control::ControlPacket}; use mumlib::command::{Command, CommandResponse}; use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock}; use tokio::sync::{mpsc, watch}; pub async fn handle( state: Arc>, mut command_receiver: mpsc::UnboundedReceiver<( Command, mpsc::UnboundedSender>>, )>, tcp_event_queue: TcpEventQueue, ping_request_sender: mpsc::UnboundedSender, mut packet_sender: mpsc::UnboundedSender>, mut connection_info_sender: watch::Sender>, ) { debug!("Begin listening for commands"); let ping_count = AtomicU64::new(0); while let Some((command, mut response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); let mut state = state.write().unwrap(); let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender); drop(state); match event { ExecutionContext::TcpEventCallback(event, generator) => { tcp_event_queue.register_callback( event, Box::new(move |e| { let response = generator(e); response_sender.send(response).unwrap(); }), ); } ExecutionContext::TcpEventSubscriber(event, mut handler) => { tcp_event_queue.register_subscriber( event, Box::new(move |event| { handler(event, &mut response_sender) }), ) } ExecutionContext::Now(generator) => { response_sender.send(generator()).unwrap(); } ExecutionContext::Ping(generator, converter) => { let ret = generator(); debug!("Ping generated: {:?}", ret); match ret { Ok(addr) => { let id = ping_count.fetch_add(1, Ordering::Relaxed); let res = ping_request_sender.send(( id, addr, Box::new(move |packet| { response_sender.send(converter(packet)).unwrap(); }), )); if res.is_err() { panic!(); } } Err(e) => { response_sender.send(Err(e)).unwrap(); } }; } } } }