From a32511e0b70288cad6d4915b30956f3eb8728149 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Fri, 1 Jan 2021 17:32:07 +0100 Subject: move mumble backend code to new client --- mumd/src/client.rs | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++ mumd/src/main.rs | 43 ++++--------------------------------------- mumd/src/state.rs | 5 ++++- 3 files changed, 60 insertions(+), 40 deletions(-) create mode 100644 mumd/src/client.rs (limited to 'mumd/src') diff --git a/mumd/src/client.rs b/mumd/src/client.rs new file mode 100644 index 0000000..74e744f --- /dev/null +++ b/mumd/src/client.rs @@ -0,0 +1,52 @@ +use crate::command; +use crate::network::{tcp, udp, ConnectionInfo}; +use crate::state::State; + +use futures_util::join; +use ipc_channel::ipc::IpcSender; +use mumble_protocol::{Serverbound, control::ControlPacket, crypt::ClientCryptState}; +use mumlib::command::{Command, CommandResponse}; +use std::sync::{Arc, Mutex}; +use tokio::sync::{mpsc, watch}; + +pub async fn handle( + command_receiver: mpsc::UnboundedReceiver<( + Command, + IpcSender>>, + )>, +) { + let (connection_info_sender, connection_info_receiver) = + watch::channel::>(None); + let (crypt_state_sender, crypt_state_receiver) = + mpsc::channel::(1); + let (packet_sender, packet_receiver) = + mpsc::unbounded_channel::>(); + let (ping_request_sender, ping_request_receiver) = + mpsc::unbounded_channel(); + let (response_sender, response_receiver) = + mpsc::unbounded_channel(); + + let state = State::new(packet_sender, connection_info_sender); + let state = Arc::new(Mutex::new(state)); + join!( + tcp::handle( + Arc::clone(&state), + connection_info_receiver.clone(), + crypt_state_sender, + packet_receiver, + response_receiver, + ), + udp::handle( + Arc::clone(&state), + connection_info_receiver.clone(), + crypt_state_receiver, + ), + command::handle( + state, + command_receiver, + response_sender, + ping_request_sender, + ), + udp::handle_pings(ping_request_receiver), + ); +} diff --git a/mumd/src/main.rs b/mumd/src/main.rs index db6d2ef..67481f9 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -1,23 +1,17 @@ mod audio; +mod client; mod command; mod network; mod notify; mod state; -use crate::network::ConnectionInfo; -use crate::state::State; - use futures::join; use ipc_channel::ipc::{self, IpcOneShotServer, IpcSender}; use log::*; -use mumble_protocol::control::ControlPacket; -use mumble_protocol::crypt::ClientCryptState; -use mumble_protocol::voice::Serverbound; use mumlib::command::{Command, CommandResponse}; use mumlib::setup_logger; use std::fs; -use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::mpsc; use tokio::task::spawn_blocking; #[tokio::main] @@ -45,46 +39,17 @@ async fn main() { } } - // Oneshot channel for setting UDP CryptState from control task - // For simplicity we don't deal with re-syncing, real applications would have to. - let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::(1); // crypt state should always be consumed before sending a new one - let (packet_sender, packet_receiver) = mpsc::unbounded_channel::>(); let (command_sender, command_receiver) = mpsc::unbounded_channel::<( Command, IpcSender>>, )>(); - let (connection_info_sender, connection_info_receiver) = - watch::channel::>(None); - let (response_sender, response_receiver) = mpsc::unbounded_channel(); - let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel(); - - let state = State::new(packet_sender, connection_info_sender); - let state = Arc::new(Mutex::new(state)); - let (_, _, _, e, _) = join!( - network::tcp::handle( - Arc::clone(&state), - connection_info_receiver.clone(), - crypt_state_sender, - packet_receiver, - response_receiver, - ), - network::udp::handle( - Arc::clone(&state), - connection_info_receiver.clone(), - crypt_state_receiver, - ), - command::handle( - state, - command_receiver, - response_sender, - ping_request_sender, - ), + let (_, e) = join!( + client::handle(command_receiver), spawn_blocking(move || { // IpcSender is blocking receive_oneshot_commands(command_sender); }), - network::udp::handle_pings(ping_request_receiver), ); e.unwrap(); } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 85e5449..574d0cb 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -85,7 +85,10 @@ impl State { state } - pub fn handle_command(&mut self, command: Command) -> ExecutionContext { + pub fn handle_command( + &mut self, + command: Command, + ) -> ExecutionContext { match command { Command::ChannelJoin { channel_identifier } => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) { -- cgit v1.2.1 From aef5b85b22b916a3a7f84b1b9bbea151544580f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Fri, 1 Jan 2021 17:40:00 +0100 Subject: move none-state-fields on state to client --- mumd/src/client.rs | 5 ++++- mumd/src/command.rs | 10 ++++++---- mumd/src/network/tcp.rs | 4 ++-- mumd/src/state.rs | 23 +++++++---------------- 4 files changed, 19 insertions(+), 23 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/client.rs b/mumd/src/client.rs index 74e744f..3613061 100644 --- a/mumd/src/client.rs +++ b/mumd/src/client.rs @@ -26,13 +26,14 @@ pub async fn handle( let (response_sender, response_receiver) = mpsc::unbounded_channel(); - let state = State::new(packet_sender, connection_info_sender); + let state = State::new(); let state = Arc::new(Mutex::new(state)); join!( tcp::handle( Arc::clone(&state), connection_info_receiver.clone(), crypt_state_sender, + packet_sender.clone(), packet_receiver, response_receiver, ), @@ -46,6 +47,8 @@ pub async fn handle( command_receiver, response_sender, ping_request_sender, + packet_sender, + connection_info_sender, ), udp::handle_pings(ping_request_receiver), ); diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 330e3fc..e8c92c3 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,13 +1,13 @@ -use crate::state::{ExecutionContext, State}; +use crate::{network::ConnectionInfo, state::{ExecutionContext, State}}; use crate::network::tcp::{TcpEvent, TcpEventCallback}; use ipc_channel::ipc::IpcSender; use log::*; -use mumble_protocol::ping::PongPacket; +use mumble_protocol::{Serverbound, control::ControlPacket, ping::PongPacket}; use mumlib::command::{Command, CommandResponse}; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; pub async fn handle( state: Arc>, @@ -17,12 +17,14 @@ pub async fn handle( )>, tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>, ping_request_sender: mpsc::UnboundedSender<(u64, SocketAddr, Box)>, + mut packet_sender: mpsc::UnboundedSender>, + mut connection_info_sender: watch::Sender>, ) { debug!("Begin listening for commands"); while let Some((command, response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); let mut state = state.lock().unwrap(); - let event = state.handle_command(command); + let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender); drop(state); match event { ExecutionContext::TcpEvent(event, generator) => { diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 3c96ee1..47ea311 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -44,6 +44,7 @@ pub async fn handle( state: Arc>, mut connection_info_receiver: watch::Receiver>, crypt_state_sender: mpsc::Sender, + packet_sender: mpsc::UnboundedSender>, mut packet_receiver: mpsc::UnboundedReceiver>, mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, ) { @@ -67,14 +68,13 @@ pub async fn handle( let state_lock = state.lock().unwrap(); authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; let phase_watcher = state_lock.phase_receiver(); - let packet_sender = state_lock.packet_sender(); drop(state_lock); let event_queue = Arc::new(Mutex::new(HashMap::new())); info!("Logging in..."); join!( - send_pings(packet_sender, 10, phase_watcher.clone()), + send_pings(packet_sender.clone(), 10, phase_watcher.clone()), listen( Arc::clone(&state), stream, diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 574d0cb..8fa05ae 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -57,17 +57,11 @@ pub struct State { server: Option, audio: Audio, - packet_sender: mpsc::UnboundedSender>, - connection_info_sender: watch::Sender>, - phase_watcher: (watch::Sender, watch::Receiver), } impl State { - pub fn new( - packet_sender: mpsc::UnboundedSender>, - connection_info_sender: watch::Sender>, - ) -> Self { + pub fn new() -> Self { let config = mumlib::config::read_default_cfg(); let audio = Audio::new( config.audio.input_volume.unwrap_or(1.0), @@ -77,8 +71,6 @@ impl State { config, server: None, audio, - packet_sender, - connection_info_sender, phase_watcher: watch::channel(StatePhase::Disconnected), }; state.reload_config(); @@ -88,6 +80,8 @@ impl State { pub fn handle_command( &mut self, command: Command, + packet_sender: &mut mpsc::UnboundedSender>, + connection_info_sender: &mut watch::Sender>, ) -> ExecutionContext { match command { Command::ChannelJoin { channel_identifier } => { @@ -137,7 +131,7 @@ impl State { let mut msg = msgs::UserState::new(); msg.set_session(self.server.as_ref().unwrap().session_id().unwrap()); msg.set_channel_id(id); - self.packet_sender.send(msg.into()).unwrap(); + packet_sender.send(msg.into()).unwrap(); now!(Ok(None)) } Command::ChannelList => { @@ -203,7 +197,7 @@ impl State { let server = self.server_mut().unwrap(); server.set_muted(mute); server.set_deafened(deafen); - self.packet_sender.send(msg.into()).unwrap(); + packet_sender.send(msg.into()).unwrap(); } now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }))) @@ -297,7 +291,7 @@ impl State { let server = self.server_mut().unwrap(); server.set_muted(mute); server.set_deafened(deafen); - self.packet_sender.send(msg.into()).unwrap(); + packet_sender.send(msg.into()).unwrap(); } now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }))) @@ -337,7 +331,7 @@ impl State { return now!(Err(Error::InvalidServerAddrError(host, port))); } }; - self.connection_info_sender + connection_info_sender .send(Some(ConnectionInfo::new( socket_addr, host, @@ -595,9 +589,6 @@ impl State { pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio } - pub fn packet_sender(&self) -> mpsc::UnboundedSender> { - self.packet_sender.clone() - } pub fn phase_receiver(&self) -> watch::Receiver { self.phase_watcher.1.clone() } -- cgit v1.2.1 From 1c8b7316503d3ab710d3d3ec241b85e76b9a42be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Fri, 1 Jan 2021 22:32:12 +0100 Subject: clippy pass --- mumd/src/audio.rs | 2 +- mumd/src/command.rs | 6 ++---- mumd/src/network/udp.rs | 29 ++++++++++++----------------- mumd/src/state.rs | 4 ++-- mumd/src/state/channel.rs | 10 +++++----- mumd/src/state/server.rs | 2 +- 6 files changed, 23 insertions(+), 30 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 0666268..0820147 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -16,7 +16,7 @@ use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; //TODO? move to mumlib -pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ +pub const EVENT_SOUNDS: &[(&[u8], NotificationEvents)] = &[ (include_bytes!("resources/connect.wav"), NotificationEvents::ServerConnect), ( include_bytes!("resources/disconnect.wav"), diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 330e3fc..9c8970c 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,11 +1,9 @@ use crate::state::{ExecutionContext, State}; +use crate::network::{tcp::{TcpEvent, TcpEventCallback}, udp::PingRequest}; -use crate::network::tcp::{TcpEvent, TcpEventCallback}; use ipc_channel::ipc::IpcSender; use log::*; -use mumble_protocol::ping::PongPacket; use mumlib::command::{Command, CommandResponse}; -use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, oneshot}; @@ -16,7 +14,7 @@ pub async fn handle( IpcSender>>, )>, tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>, - ping_request_sender: mpsc::UnboundedSender<(u64, SocketAddr, Box)>, + ping_request_sender: mpsc::UnboundedSender, ) { debug!("Begin listening for commands"); while let Some((command, response_sender)) = command_receiver.recv().await { diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index b592a60..f7eeb62 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,10 +1,10 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; -use log::*; use bytes::Bytes; use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; +use log::*; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; @@ -18,6 +18,8 @@ use tokio::net::UdpSocket; use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::udp::UdpFramed; +pub type PingRequest = (u64, SocketAddr, Box); + type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; type UdpReceiver = SplitStream>; @@ -89,17 +91,14 @@ async fn new_crypt_state( source: Arc>, ) { loop { - match crypt_state.recv().await { - Some(crypt_state) => { - info!("Received new crypt state"); - let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) - .await - .expect("Failed to bind UDP socket"); - let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split(); - *sink.lock().unwrap() = new_sink; - *source.lock().unwrap() = new_source; - }, - None => {}, + if let Some(crypt_state) = crypt_state.recv().await { + info!("Received new crypt state"); + let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) + .await + .expect("Failed to bind UDP socket"); + let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split(); + *sink.lock().unwrap() = new_sink; + *source.lock().unwrap() = new_source; } } } @@ -256,11 +255,7 @@ async fn send_voice( } pub async fn handle_pings( - mut ping_request_receiver: mpsc::UnboundedReceiver<( - u64, - SocketAddr, - Box, - )>, + mut ping_request_receiver: mpsc::UnboundedReceiver, ) { let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 85e5449..546f7d1 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -219,7 +219,7 @@ impl State { .unwrap() .users_mut() .iter_mut() - .find(|(_, user)| user.name() == &string); + .find(|(_, user)| user.name() == string); let (id, user) = match id { Some(id) => (*id.0, id.1), @@ -408,7 +408,7 @@ impl State { .unwrap() .users() .iter() - .find(|e| e.1.name() == &string) + .find(|e| e.1.name() == string) .map(|e| *e.0) { None => return now!(Err(Error::InvalidUsernameError(string))), diff --git a/mumd/src/state/channel.rs b/mumd/src/state/channel.rs index 8bbf919..5b6d669 100644 --- a/mumd/src/state/channel.rs +++ b/mumd/src/state/channel.rs @@ -88,7 +88,7 @@ impl<'a> ProtoTree<'a> { users: Vec::new(), }); pt.channel = Some(channel); - pt.users = users.get(&node).map(|e| e.clone()).unwrap_or(Vec::new()); + pt.users = users.get(&node).cloned().unwrap_or_default(); } longer => { self.children @@ -135,7 +135,7 @@ pub fn into_channel( for user in users.values() { channel_lookup .entry(user.channel()) - .or_insert(Vec::new()) + .or_insert_with(Vec::new) .push(user); } @@ -148,7 +148,7 @@ pub fn into_channel( } walk.reverse(); - if walk.len() > 0 { + if !walk.is_empty() { walks.push((walk, channel)); } } @@ -159,8 +159,8 @@ pub fn into_channel( children: HashMap::new(), users: channel_lookup .get(&0) - .map(|e| e.clone()) - .unwrap_or(Vec::new()), + .cloned() + .unwrap_or_default(), }; for (walk, channel) in walks { diff --git a/mumd/src/state/server.rs b/mumd/src/state/server.rs index a065df0..8a256b6 100644 --- a/mumd/src/state/server.rs +++ b/mumd/src/state/server.rs @@ -107,7 +107,7 @@ impl Server { } pub fn username(&self) -> Option<&str> { - self.username.as_ref().map(|e| e.as_str()) + self.username.as_deref() } pub fn username_mut(&mut self) -> &mut Option { -- cgit v1.2.1