diff options
| -rw-r--r-- | mumd/src/client.rs | 5 | ||||
| -rw-r--r-- | mumd/src/command.rs | 10 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 4 | ||||
| -rw-r--r-- | mumd/src/state.rs | 23 |
4 files changed, 19 insertions, 23 deletions
diff --git a/mumd/src/client.rs b/mumd/src/client.rs index 74e744f..3613061 100644 --- a/mumd/src/client.rs +++ b/mumd/src/client.rs @@ -26,13 +26,14 @@ pub async fn handle( let (response_sender, response_receiver) = mpsc::unbounded_channel(); - let state = State::new(packet_sender, connection_info_sender); + let state = State::new(); let state = Arc::new(Mutex::new(state)); join!( tcp::handle( Arc::clone(&state), connection_info_receiver.clone(), crypt_state_sender, + packet_sender.clone(), packet_receiver, response_receiver, ), @@ -46,6 +47,8 @@ pub async fn handle( command_receiver, response_sender, ping_request_sender, + packet_sender, + connection_info_sender, ), udp::handle_pings(ping_request_receiver), ); diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 330e3fc..e8c92c3 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,13 +1,13 @@ -use crate::state::{ExecutionContext, State}; +use crate::{network::ConnectionInfo, state::{ExecutionContext, State}}; use crate::network::tcp::{TcpEvent, TcpEventCallback}; use ipc_channel::ipc::IpcSender; use log::*; -use mumble_protocol::ping::PongPacket; +use mumble_protocol::{Serverbound, control::ControlPacket, ping::PongPacket}; use mumlib::command::{Command, CommandResponse}; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; pub async fn handle( state: Arc<Mutex<State>>, @@ -17,12 +17,14 @@ pub async fn handle( )>, tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>, ping_request_sender: mpsc::UnboundedSender<(u64, SocketAddr, Box<dyn FnOnce(PongPacket)>)>, + mut packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, + mut connection_info_sender: watch::Sender<Option<ConnectionInfo>>, ) { debug!("Begin listening for commands"); while let Some((command, response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); let mut state = state.lock().unwrap(); - let event = state.handle_command(command); + let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender); drop(state); match event { ExecutionContext::TcpEvent(event, generator) => { diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 3c96ee1..47ea311 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -44,6 +44,7 @@ pub async fn handle( state: Arc<Mutex<State>>, mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, crypt_state_sender: mpsc::Sender<ClientCryptState>, + packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, ) { @@ -67,14 +68,13 @@ pub async fn handle( let state_lock = state.lock().unwrap(); authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; let phase_watcher = state_lock.phase_receiver(); - let packet_sender = state_lock.packet_sender(); drop(state_lock); let event_queue = Arc::new(Mutex::new(HashMap::new())); info!("Logging in..."); join!( - send_pings(packet_sender, 10, phase_watcher.clone()), + send_pings(packet_sender.clone(), 10, phase_watcher.clone()), listen( Arc::clone(&state), stream, diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 574d0cb..8fa05ae 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -57,17 +57,11 @@ pub struct State { server: Option<Server>, audio: Audio, - packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, - connection_info_sender: watch::Sender<Option<ConnectionInfo>>, - phase_watcher: (watch::Sender<StatePhase>, watch::Receiver<StatePhase>), } impl State { - pub fn new( - packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, - connection_info_sender: watch::Sender<Option<ConnectionInfo>>, - ) -> Self { + pub fn new() -> Self { let config = mumlib::config::read_default_cfg(); let audio = Audio::new( config.audio.input_volume.unwrap_or(1.0), @@ -77,8 +71,6 @@ impl State { config, server: None, audio, - packet_sender, - connection_info_sender, phase_watcher: watch::channel(StatePhase::Disconnected), }; state.reload_config(); @@ -88,6 +80,8 @@ impl State { pub fn handle_command( &mut self, command: Command, + packet_sender: &mut mpsc::UnboundedSender<ControlPacket<Serverbound>>, + connection_info_sender: &mut watch::Sender<Option<ConnectionInfo>>, ) -> ExecutionContext { match command { Command::ChannelJoin { channel_identifier } => { @@ -137,7 +131,7 @@ impl State { let mut msg = msgs::UserState::new(); msg.set_session(self.server.as_ref().unwrap().session_id().unwrap()); msg.set_channel_id(id); - self.packet_sender.send(msg.into()).unwrap(); + packet_sender.send(msg.into()).unwrap(); now!(Ok(None)) } Command::ChannelList => { @@ -203,7 +197,7 @@ impl State { let server = self.server_mut().unwrap(); server.set_muted(mute); server.set_deafened(deafen); - self.packet_sender.send(msg.into()).unwrap(); + packet_sender.send(msg.into()).unwrap(); } now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }))) @@ -297,7 +291,7 @@ impl State { let server = self.server_mut().unwrap(); server.set_muted(mute); server.set_deafened(deafen); - self.packet_sender.send(msg.into()).unwrap(); + packet_sender.send(msg.into()).unwrap(); } now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }))) @@ -337,7 +331,7 @@ impl State { return now!(Err(Error::InvalidServerAddrError(host, port))); } }; - self.connection_info_sender + connection_info_sender .send(Some(ConnectionInfo::new( socket_addr, host, @@ -595,9 +589,6 @@ impl State { pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio } - pub fn packet_sender(&self) -> mpsc::UnboundedSender<ControlPacket<Serverbound>> { - self.packet_sender.clone() - } pub fn phase_receiver(&self) -> watch::Receiver<StatePhase> { self.phase_watcher.1.clone() } |
