From 19267cb7ac28ce51674baa9516ebb36074709d4f Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Tue, 18 May 2021 02:36:47 +0200 Subject: add ability for backend to keep track of messages --- mumd/src/network/tcp.rs | 9 +++------ mumd/src/state.rs | 22 ++++++++++++++++------ 2 files changed, 19 insertions(+), 12 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 7606987..2a97b4a 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -269,12 +269,9 @@ async fn listen( } }; match packet { - ControlPacket::TextMessage(msg) => { - info!( - "Got message from user with session ID {}: {}", - msg.get_actor(), - msg.get_message() - ); + ControlPacket::TextMessage(mut msg) => { + let mut state = state.write().unwrap(); + state.register_message((msg.take_message(), msg.get_actor())); } ControlPacket::CryptSetup(msg) => { debug!("Crypt setup"); diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 45e7301..0f608d7 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -59,6 +59,7 @@ pub struct State { server: Option, audio_input: AudioInput, audio_output: AudioOutput, + message_buffer: Vec<(String, u32)>, phase_watcher: (watch::Sender, watch::Receiver), } @@ -79,6 +80,7 @@ impl State { server: None, audio_input, audio_output, + message_buffer: Vec::new(), phase_watcher, }; state.reload_config(); @@ -426,6 +428,16 @@ impl State { self.audio_output.set_user_volume(user_id, volume); now!(Ok(None)) } + Command::PastMessages => { + let server = match self.server.as_ref() { + Some(s) => s, + None => return now!(Err(Error::Disconnected)), + }; + let messages = std::mem::take(&mut self.message_buffer).into_iter() + .map(|(msg, user)| (msg, server.users().get(&user).unwrap().name().to_string())).collect(); + + now!(Ok(Some(CommandResponse::PastMessages { messages }))) + } } } @@ -590,6 +602,10 @@ impl State { self.audio_output.load_sound_effects(sound_effects); } } + + pub fn register_message(&mut self, msg: (String, u32)) { + self.message_buffer.push(msg); + } pub fn broadcast_phase(&self, phase: StatePhase) { self.phase_watcher @@ -609,12 +625,6 @@ impl State { pub fn audio_output(&self) -> &AudioOutput { &self.audio_output } - pub fn audio_input_mut(&mut self) -> &mut AudioInput { - &mut self.audio_input - } - pub fn audio_output_mut(&mut self) -> &mut AudioOutput { - &mut self.audio_output - } pub fn phase_receiver(&self) -> watch::Receiver { self.phase_watcher.1.clone() } -- cgit v1.2.1 From 6a03656f963bb59c6a6a56e3933f05f9da850ca8 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Tue, 18 May 2021 02:47:16 +0200 Subject: add notification on message --- mumd/src/network/tcp.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'mumd/src') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 2a97b4a..3696c58 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,4 +1,4 @@ -use crate::error::{ServerSendError, TcpError}; +use crate::{error::{ServerSendError, TcpError}, notifications}; use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; @@ -271,6 +271,12 @@ async fn listen( match packet { ControlPacket::TextMessage(mut msg) => { let mut state = state.write().unwrap(); + let user = state.server() + .and_then(|server| server.users().get(&msg.get_actor())) + .map(|user| user.name()); + if let Some(user) = user { + notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this + } state.register_message((msg.take_message(), msg.get_actor())); } ControlPacket::CryptSetup(msg) => { -- cgit v1.2.1 From 6fa328db646a0e1c33b883d387ad95ec971cf2e0 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 00:19:40 +0200 Subject: refactor getting of channel data from name --- mumd/src/state/server.rs | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) (limited to 'mumd/src') diff --git a/mumd/src/state/server.rs b/mumd/src/state/server.rs index c9f8a69..e44d1e8 100644 --- a/mumd/src/state/server.rs +++ b/mumd/src/state/server.rs @@ -3,6 +3,7 @@ use crate::state::user::User; use log::*; use mumble_protocol::control::msgs; +use mumlib::error::ChannelIdentifierError; use serde::{Deserialize, Serialize}; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -88,6 +89,44 @@ impl Server { &self.channels } + /// Takes a channel name and returns either a tuple with the channel id and a reference to the + /// channel struct if the channel name unambiguosly refers to a channel, or an error describing + /// if the channel identifier was ambigous or invalid. + /*/// note that doctests currently aren't run in binary crates yet (see #50784) + /// ``` + /// use crate::state::channel::Channel; + /// let mut server = Server::new(); + /// let channel = Channel { + /// name: "Foobar".to_owned(), + /// ..Default::default(), + /// }; + /// server.channels.insert(0, channel.clone); + /// assert_eq!(server.channel_name("Foobar"), Ok((0, &channel))); + /// ```*/ + pub fn channel_name(&self, channel_name: &str) -> Result<(u32, &Channel), ChannelIdentifierError> { + let matches = self.channels + .iter() + .map(|e| ((*e.0, e.1), e.1.path(&self.channels))) + .filter(|e| e.1.ends_with(channel_name)) + .collect::>(); + Ok(match matches.len() { + 0 => { + let soft_matches = self.channels + .iter() + .map(|e| ((*e.0, e.1), e.1.path(&self.channels).to_lowercase())) + .filter(|e| e.1.ends_with(&channel_name.to_lowercase())) + .collect::>(); + match soft_matches.len() { + 0 => return Err(ChannelIdentifierError::Invalid), + 1 => soft_matches.get(0).unwrap().0, + _ => return Err(ChannelIdentifierError::Ambiguous), + } + } + 1 => matches.get(0).unwrap().0, + _ => return Err(ChannelIdentifierError::Ambiguous), + }) + } + pub fn host_mut(&mut self) -> &mut Option { &mut self.host } -- cgit v1.2.1 From 7ac57f3803bcf0f357ee307a6f0daf0783efbf92 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 00:20:33 +0200 Subject: add backend support for sending messages --- mumd/src/state.rs | 94 +++++++++++++++++++++++++++-------------------- mumd/src/state/channel.rs | 2 +- 2 files changed, 56 insertions(+), 40 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 0f608d7..bffc082 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -14,9 +14,8 @@ use mumble_protocol::control::msgs; use mumble_protocol::control::ControlPacket; use mumble_protocol::ping::PongPacket; use mumble_protocol::voice::Serverbound; -use mumlib::command::{Command, CommandResponse}; +use mumlib::command::{Command, CommandResponse, MessageTarget}; use mumlib::config::Config; -use mumlib::error::ChannelIdentifierError; use mumlib::Error; use crate::state::user::UserDiff; use std::net::{SocketAddr, ToSocketAddrs}; @@ -99,43 +98,9 @@ impl State { return now!(Err(Error::Disconnected)); } - let channels = self.server().unwrap().channels(); - - let matches = channels - .iter() - .map(|e| (e.0, e.1.path(channels))) - .filter(|e| e.1.ends_with(&channel_identifier)) - .collect::>(); - let id = match matches.len() { - 0 => { - let soft_matches = channels - .iter() - .map(|e| (e.0, e.1.path(channels).to_lowercase())) - .filter(|e| e.1.ends_with(&channel_identifier.to_lowercase())) - .collect::>(); - match soft_matches.len() { - 0 => { - return now!(Err(Error::ChannelIdentifierError( - channel_identifier, - ChannelIdentifierError::Invalid - ))) - } - 1 => *soft_matches.get(0).unwrap().0, - _ => { - return now!(Err(Error::ChannelIdentifierError( - channel_identifier, - ChannelIdentifierError::Invalid - ))) - } - } - } - 1 => *matches.get(0).unwrap().0, - _ => { - return now!(Err(Error::ChannelIdentifierError( - channel_identifier, - ChannelIdentifierError::Ambiguous - ))) - } + let id = match self.server().unwrap().channel_name(&channel_identifier) { + Ok((id, _)) => id, + Err(e) => return now!(Err(Error::ChannelIdentifierError(channel_identifier, e))), }; let mut msg = msgs::UserState::new(); @@ -438,6 +403,57 @@ impl State { now!(Ok(Some(CommandResponse::PastMessages { messages }))) } + Command::SendMessage { message, targets } => { + if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + let mut msg = msgs::TextMessage::new(); + + msg.set_message(message); + + for target in targets { + match target { + MessageTarget::Channel { recursive, name } => { + let channel_id = self + .server() + .unwrap() + .channel_name(&name); + + let channel_id = match channel_id { + Ok(id) => id, + Err(e) => return now!(Err(Error::ChannelIdentifierError(name, e))), + }.0; + + if recursive { + msg.mut_tree_id() + } else { + msg.mut_channel_id() + }.push(channel_id); + } + MessageTarget::User { name } => { + let id = self + .server() + .unwrap() + .users() + .iter() + .find(|(_, user)| user.name() == &name) + .map(|(e, _)| *e); + + let id = match id { + Some(id) => id, + None => return now!(Err(Error::InvalidUsername(name))), + }; + + msg.mut_session().push(id); + } + } + } + + packet_sender.send(msg.into()).unwrap(); + + now!(Ok(None)) + } } } diff --git a/mumd/src/state/channel.rs b/mumd/src/state/channel.rs index 5b6d669..f58ed15 100644 --- a/mumd/src/state/channel.rs +++ b/mumd/src/state/channel.rs @@ -4,7 +4,7 @@ use mumble_protocol::control::msgs; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct Channel { description: Option, links: Vec, -- cgit v1.2.1 From e73a442171b364291ed5399bf4f86e8f9a3091ee Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 01:23:28 +0200 Subject: change Mutex to RwLock and de-async --- mumd/src/network/tcp.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 3696c58..18a053b 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -54,22 +54,22 @@ impl<'a> From<&TcpEventData<'a>> for TcpEvent { #[derive(Clone)] struct TcpEventQueue { - handlers: Arc>>>, + handlers: Arc>>>, } impl TcpEventQueue { fn new() -> Self { Self { - handlers: Arc::new(Mutex::new(HashMap::new())), + handlers: Arc::new(RwLock::new(HashMap::new())), } } - async fn register(&self, at: TcpEvent, callback: TcpEventCallback) { - self.handlers.lock().await.entry(at).or_default().push(callback); + fn register(&self, at: TcpEvent, callback: TcpEventCallback) { + self.handlers.write().unwrap().entry(at).or_default().push(callback); } - async fn resolve<'a>(&self, data: TcpEventData<'a>) { - if let Some(vec) = self.handlers.lock().await.get_mut(&TcpEvent::from(&data)) { + fn resolve<'a>(&self, data: TcpEventData<'a>) { + if let Some(vec) = self.handlers.write().unwrap().get_mut(&TcpEvent::from(&data)) { let old = std::mem::take(vec); for handler in old { handler(data.clone()); @@ -143,7 +143,7 @@ pub async fn handle( phase_watcher, ).await.unwrap_or(Ok(()))?; - event_queue.resolve(TcpEventData::Disconnected).await; + event_queue.resolve(TcpEventData::Disconnected); debug!("Fully disconnected TCP stream, waiting for new connection info"); } @@ -305,7 +305,7 @@ async fn listen( ) .await; } - event_queue.resolve(TcpEventData::Connected(Ok(&msg))).await; + event_queue.resolve(TcpEventData::Connected(Ok(&msg))); let mut state = state.write().unwrap(); let server = state.server_mut().unwrap(); server.parse_server_sync(*msg); @@ -322,7 +322,7 @@ async fn listen( debug!("Login rejected: {:?}", msg); match msg.get_field_type() { msgs::Reject_RejectType::WrongServerPW => { - event_queue.resolve(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword))).await; + event_queue.resolve(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword))); } ty => { warn!("Unhandled reject type: {:?}", ty); @@ -387,6 +387,6 @@ async fn register_events( ) { loop { let (event, handler) = tcp_event_register_receiver.recv().await.unwrap(); - event_queue.register(event, handler).await; + event_queue.register(event, handler); } } -- cgit v1.2.1 From cf81a1141cdc6a6db842d992d065eba74829e0c7 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 01:45:07 +0200 Subject: add re-runnable callbacks to the event system --- mumd/src/network/tcp.rs | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 18a053b..8f34cd8 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -30,17 +30,20 @@ type TcpReceiver = SplitStream, ControlCodec>>; pub(crate) type TcpEventCallback = Box; +pub(crate) type TcpEventSubscriber = Box bool>; //the bool indicates if it should be kept or not #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub enum TcpEvent { Connected, //fires when the client has connected to a server Disconnected, //fires when the client has disconnected from a server + TextMessage, //fires when a text message comes in } #[derive(Clone)] pub enum TcpEventData<'a> { Connected(Result<&'a msgs::ServerSync, mumlib::Error>), Disconnected, + TextMessage(&'a msgs::TextMessage), } impl<'a> From<&TcpEventData<'a>> for TcpEvent { @@ -48,33 +51,53 @@ impl<'a> From<&TcpEventData<'a>> for TcpEvent { match t { TcpEventData::Connected(_) => TcpEvent::Connected, TcpEventData::Disconnected => TcpEvent::Disconnected, + TcpEventData::TextMessage(_) => TcpEvent::TextMessage, } } } #[derive(Clone)] struct TcpEventQueue { - handlers: Arc>>>, + callbacks: Arc>>>, + subscribers: Arc>>>, } impl TcpEventQueue { + /// Creates a new `TcpEventQueue`. fn new() -> Self { Self { - handlers: Arc::new(RwLock::new(HashMap::new())), + callbacks: Arc::new(RwLock::new(HashMap::new())), + subscribers: Arc::new(RwLock::new(HashMap::new())), } } - fn register(&self, at: TcpEvent, callback: TcpEventCallback) { - self.handlers.write().unwrap().entry(at).or_default().push(callback); + /// Registers a new callback to be triggered when an event is fired. + fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { + self.callbacks.write().unwrap().entry(at).or_default().push(callback); } + /// Registers a new callback to be triggered when an event is fired. + fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { + self.subscribers.write().unwrap().entry(at).or_default().push(callback); + } + + /// Fires all callbacks related to a specific TCP event and removes them from the event queue. + /// Also calls all event subscribers, but keeps them in the queue fn resolve<'a>(&self, data: TcpEventData<'a>) { - if let Some(vec) = self.handlers.write().unwrap().get_mut(&TcpEvent::from(&data)) { + if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) { let old = std::mem::take(vec); for handler in old { handler(data.clone()); } } + if let Some(vec) = self.subscribers.write().unwrap().get_mut(&TcpEvent::from(&data)) { + let old = std::mem::take(vec); + for mut e in old { + if e(data.clone()) { + vec.push(e) + } + } + } } } @@ -269,7 +292,7 @@ async fn listen( } }; match packet { - ControlPacket::TextMessage(mut msg) => { + ControlPacket::TextMessage(msg) => { let mut state = state.write().unwrap(); let user = state.server() .and_then(|server| server.users().get(&msg.get_actor())) @@ -277,7 +300,7 @@ async fn listen( if let Some(user) = user { notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this } - state.register_message((msg.take_message(), msg.get_actor())); + state.register_message((msg.get_message().to_owned(), msg.get_actor())); } ControlPacket::CryptSetup(msg) => { debug!("Crypt setup"); @@ -387,6 +410,6 @@ async fn register_events( ) { loop { let (event, handler) = tcp_event_register_receiver.recv().await.unwrap(); - event_queue.register(event, handler); + event_queue.register_callback(event, handler); } } -- cgit v1.2.1 From f551de2bbc5e41c5cd76e36c2b0a6f10d9b4cddf Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 02:09:58 +0200 Subject: remove event_register_handler from tcp stack --- mumd/src/client.rs | 9 ++++----- mumd/src/command.rs | 14 +++++--------- mumd/src/network/tcp.rs | 24 ++++++------------------ mumd/src/state.rs | 2 +- 4 files changed, 16 insertions(+), 33 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/client.rs b/mumd/src/client.rs index 9c2c2a0..3c491da 100644 --- a/mumd/src/client.rs +++ b/mumd/src/client.rs @@ -1,4 +1,4 @@ -use crate::command; +use crate::{command, network::tcp::TcpEventQueue}; use crate::error::ClientError; use crate::network::{tcp, udp, ConnectionInfo}; use crate::state::State; @@ -24,8 +24,7 @@ pub async fn handle( mpsc::unbounded_channel::>(); let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel(); - let (response_sender, response_receiver) = - mpsc::unbounded_channel(); + let event_queue = TcpEventQueue::new(); let state = Arc::new(RwLock::new(state)); @@ -36,7 +35,7 @@ pub async fn handle( crypt_state_sender, packet_sender.clone(), packet_receiver, - response_receiver, + event_queue.clone(), ).fuse() => r.map_err(|e| ClientError::TcpError(e)), _ = udp::handle( Arc::clone(&state), @@ -46,7 +45,7 @@ pub async fn handle( _ = command::handle( state, command_receiver, - response_sender, + event_queue, ping_request_sender, packet_sender, connection_info_sender, diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 1337dce..a62ddbd 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,8 +1,4 @@ -use crate::network::{ - ConnectionInfo, - tcp::{TcpEvent, TcpEventCallback}, - udp::PingRequest -}; +use crate::network::{ConnectionInfo, tcp::TcpEventQueue, udp::PingRequest}; use crate::state::{ExecutionContext, State}; use log::*; @@ -17,7 +13,7 @@ pub async fn handle( Command, oneshot::Sender>>, )>, - tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>, + tcp_event_queue: TcpEventQueue, ping_request_sender: mpsc::UnboundedSender, mut packet_sender: mpsc::UnboundedSender>, mut connection_info_sender: watch::Sender>, @@ -33,14 +29,14 @@ pub async fn handle( ExecutionContext::TcpEvent(event, generator) => { let (tx, rx) = oneshot::channel(); //TODO handle this error - let _ = tcp_event_register_sender.send(( - event, + tcp_event_queue.register_callback( + event, Box::new(move |e| { let response = generator(e); response_sender.send(response).unwrap(); tx.send(()).unwrap(); }), - )); + ); rx.await.unwrap(); } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 8f34cd8..b6e939a 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -57,14 +57,14 @@ impl<'a> From<&TcpEventData<'a>> for TcpEvent { } #[derive(Clone)] -struct TcpEventQueue { +pub struct TcpEventQueue { callbacks: Arc>>>, subscribers: Arc>>>, } impl TcpEventQueue { /// Creates a new `TcpEventQueue`. - fn new() -> Self { + pub fn new() -> Self { Self { callbacks: Arc::new(RwLock::new(HashMap::new())), subscribers: Arc::new(RwLock::new(HashMap::new())), @@ -72,18 +72,18 @@ impl TcpEventQueue { } /// Registers a new callback to be triggered when an event is fired. - fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { + pub fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { self.callbacks.write().unwrap().entry(at).or_default().push(callback); } /// Registers a new callback to be triggered when an event is fired. - fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { + pub fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { self.subscribers.write().unwrap().entry(at).or_default().push(callback); } /// Fires all callbacks related to a specific TCP event and removes them from the event queue. /// Also calls all event subscribers, but keeps them in the queue - fn resolve<'a>(&self, data: TcpEventData<'a>) { + pub fn resolve<'a>(&self, data: TcpEventData<'a>) { if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) { let old = std::mem::take(vec); for handler in old { @@ -107,7 +107,7 @@ pub async fn handle( crypt_state_sender: mpsc::Sender, packet_sender: mpsc::UnboundedSender>, mut packet_receiver: mpsc::UnboundedReceiver>, - mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, + event_queue: TcpEventQueue, ) -> Result<(), TcpError> { loop { let connection_info = 'data: loop { @@ -137,7 +137,6 @@ pub async fn handle( (state_lock.phase_receiver(), state_lock.audio_input().receiver()) }; - let event_queue = TcpEventQueue::new(); info!("Logging in..."); @@ -160,7 +159,6 @@ pub async fn handle( phase_watcher_inner, ).fuse() => r, r = send_packets(sink, &mut packet_receiver).fuse() => r, - _ = register_events(&mut tcp_event_register_receiver, event_queue.clone()).fuse() => Ok(()), } }, phase_watcher, @@ -403,13 +401,3 @@ async fn listen( } Ok(()) } - -async fn register_events( - tcp_event_register_receiver: &mut mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>, - event_queue: TcpEventQueue, -) { - loop { - let (event, handler) = tcp_event_register_receiver.recv().await.unwrap(); - event_queue.register_callback(event, handler); - } -} diff --git a/mumd/src/state.rs b/mumd/src/state.rs index bffc082..91c6ee7 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -393,7 +393,7 @@ impl State { self.audio_output.set_user_volume(user_id, volume); now!(Ok(None)) } - Command::PastMessages => { + Command::PastMessages { block } => { let server = match self.server.as_ref() { Some(s) => s, None => return now!(Err(Error::Disconnected)), -- cgit v1.2.1 From 0b2efad3e9aa569c27d339a5eca17c96155b4f9d Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 02:11:53 +0200 Subject: remove await to parallellize better --- mumd/src/command.rs | 5 ----- 1 file changed, 5 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index a62ddbd..d101104 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -27,18 +27,13 @@ pub async fn handle( drop(state); match event { ExecutionContext::TcpEvent(event, generator) => { - let (tx, rx) = oneshot::channel(); - //TODO handle this error tcp_event_queue.register_callback( event, Box::new(move |e| { let response = generator(e); response_sender.send(response).unwrap(); - tx.send(()).unwrap(); }), ); - - rx.await.unwrap(); } ExecutionContext::Now(generator) => { response_sender.send(generator()).unwrap(); -- cgit v1.2.1 From 5d05d292ddb7f8b28b71abd46930028b6e66dfde Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 19 May 2021 02:27:27 +0200 Subject: add support for sending multiple responses --- mumd/src/client.rs | 4 ++-- mumd/src/command.rs | 16 ++++++++++++---- mumd/src/main.rs | 23 ++++++++++------------- mumd/src/state.rs | 8 ++++++-- 4 files changed, 30 insertions(+), 21 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/client.rs b/mumd/src/client.rs index 3c491da..ba9cad4 100644 --- a/mumd/src/client.rs +++ b/mumd/src/client.rs @@ -7,13 +7,13 @@ use futures_util::{select, FutureExt}; use mumble_protocol::{Serverbound, control::ControlPacket, crypt::ClientCryptState}; use mumlib::command::{Command, CommandResponse}; use std::sync::{Arc, RwLock}; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::sync::{mpsc, watch}; pub async fn handle( state: State, command_receiver: mpsc::UnboundedReceiver<( Command, - oneshot::Sender>>, + mpsc::UnboundedSender>>, )>, ) -> Result<(), ClientError> { let (connection_info_sender, connection_info_receiver) = diff --git a/mumd/src/command.rs b/mumd/src/command.rs index d101104..a1e8b21 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -5,13 +5,13 @@ use log::*; use mumble_protocol::{Serverbound, control::ControlPacket}; use mumlib::command::{Command, CommandResponse}; use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock}; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::sync::{mpsc, watch}; pub async fn handle( state: Arc>, mut command_receiver: mpsc::UnboundedReceiver<( Command, - oneshot::Sender>>, + mpsc::UnboundedSender>>, )>, tcp_event_queue: TcpEventQueue, ping_request_sender: mpsc::UnboundedSender, @@ -20,13 +20,13 @@ pub async fn handle( ) { debug!("Begin listening for commands"); let ping_count = AtomicU64::new(0); - while let Some((command, response_sender)) = command_receiver.recv().await { + while let Some((command, mut response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); let mut state = state.write().unwrap(); let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender); drop(state); match event { - ExecutionContext::TcpEvent(event, generator) => { + ExecutionContext::TcpEventCallback(event, generator) => { tcp_event_queue.register_callback( event, Box::new(move |e| { @@ -35,6 +35,14 @@ pub async fn handle( }), ); } + ExecutionContext::TcpEventSubscriber(event, mut handler) => { + tcp_event_queue.register_subscriber( + event, + Box::new(move |event| { + handler(event, &mut response_sender) + }), + ) + } ExecutionContext::Now(generator) => { response_sender.send(generator()).unwrap(); } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index f298070..c34deab 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -12,7 +12,7 @@ use futures_util::{select, FutureExt, SinkExt, StreamExt}; use log::*; use mumlib::command::{Command, CommandResponse}; use mumlib::setup_logger; -use tokio::{net::{UnixListener, UnixStream}, sync::{mpsc, oneshot}}; +use tokio::{net::{UnixListener, UnixStream}, sync::mpsc}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use bytes::{BufMut, BytesMut}; @@ -81,7 +81,7 @@ async fn main() { async fn receive_commands( command_sender: mpsc::UnboundedSender<( Command, - oneshot::Sender>>, + mpsc::UnboundedSender>>, )>, ) { let socket = UnixListener::bind(mumlib::SOCKET_PATH).unwrap(); @@ -105,21 +105,18 @@ async fn receive_commands( Err(_) => continue, }; - let (tx, rx) = oneshot::channel(); + let (tx, mut rx) = mpsc::unbounded_channel(); sender.send((command, tx)).unwrap(); - let response = match rx.await { - Ok(r) => r, - Err(_) => { - error!("Internal command response sender dropped"); - Ok(None) - } - }; - let mut serialized = BytesMut::new(); - bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); + while let Some(response) = rx.recv().await { + let mut serialized = BytesMut::new(); + bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); - let _ = writer.send(serialized.freeze()).await; + if let Err(e) = writer.send(serialized.freeze()).await { + error!("Error sending response: {:?}", e); + } + } } }); } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 91c6ee7..423ce76 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -23,7 +23,7 @@ use tokio::sync::{mpsc, watch}; macro_rules! at { ($event:expr, $generator:expr) => { - ExecutionContext::TcpEvent($event, Box::new($generator)) + ExecutionContext::TcpEventCallback($event, Box::new($generator)) }; } @@ -35,10 +35,14 @@ macro_rules! now { //TODO give me a better name pub enum ExecutionContext { - TcpEvent( + TcpEventCallback( TcpEvent, Box mumlib::error::Result>>, ), + TcpEventSubscriber( + TcpEvent, + Box>>) -> bool>, + ), Now(Box mumlib::error::Result>>), Ping( Box mumlib::error::Result>, -- cgit v1.2.1 From d6779ca065a896d329a7634d69a5f4270d829b73 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 21 May 2021 14:53:58 +0200 Subject: rework event system to allow multiple triggers --- mumd/src/command.rs | 4 +- mumd/src/main.rs | 1 + mumd/src/state.rs | 763 +++++++++++++++++++++++++++------------------------- 3 files changed, 394 insertions(+), 374 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index a1e8b21..f02ad19 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -22,9 +22,7 @@ pub async fn handle( let ping_count = AtomicU64::new(0); while let Some((command, mut response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); - let mut state = state.write().unwrap(); - let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender); - drop(state); + let event = crate::state::handle_command(Arc::clone(&state), command, &mut packet_sender, &mut connection_info_sender); match event { ExecutionContext::TcpEventCallback(event, generator) => { tcp_event_queue.register_callback( diff --git a/mumd/src/main.rs b/mumd/src/main.rs index c34deab..12a8802 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -115,6 +115,7 @@ async fn receive_commands( if let Err(e) = writer.send(serialized.freeze()).await { error!("Error sending response: {:?}", e); + break; } } } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 423ce76..483f915 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -18,7 +18,7 @@ use mumlib::command::{Command, CommandResponse, MessageTarget}; use mumlib::config::Config; use mumlib::Error; use crate::state::user::UserDiff; -use std::net::{SocketAddr, ToSocketAddrs}; +use std::{net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}}; use tokio::sync::{mpsc, watch}; macro_rules! at { @@ -90,376 +90,6 @@ impl State { Ok(state) } - pub fn handle_command( - &mut self, - command: Command, - packet_sender: &mut mpsc::UnboundedSender>, - connection_info_sender: &mut watch::Sender>, - ) -> ExecutionContext { - match command { - Command::ChannelJoin { channel_identifier } => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - let id = match self.server().unwrap().channel_name(&channel_identifier) { - Ok((id, _)) => id, - Err(e) => return now!(Err(Error::ChannelIdentifierError(channel_identifier, e))), - }; - - let mut msg = msgs::UserState::new(); - msg.set_session(self.server.as_ref().unwrap().session_id().unwrap()); - msg.set_channel_id(id); - packet_sender.send(msg.into()).unwrap(); - now!(Ok(None)) - } - Command::ChannelList => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - let list = channel::into_channel( - self.server.as_ref().unwrap().channels(), - self.server.as_ref().unwrap().users(), - ); - now!(Ok(Some(CommandResponse::ChannelList { channels: list }))) - } - Command::ConfigReload => { - self.reload_config(); - now!(Ok(None)) - } - Command::DeafenSelf(toggle) => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - let server = self.server().unwrap(); - let action = match (toggle, server.muted(), server.deafened()) { - (Some(false), false, false) => None, - (Some(false), false, true) => Some((false, false)), - (Some(false), true, false) => None, - (Some(false), true, true) => Some((true, false)), - (Some(true), false, false) => Some((false, true)), - (Some(true), false, true) => None, - (Some(true), true, false) => Some((true, true)), - (Some(true), true, true) => None, - (None, false, false) => Some((false, true)), - (None, false, true) => Some((false, false)), - (None, true, false) => Some((true, true)), - (None, true, true) => Some((true, false)), - }; - - let mut new_deaf = None; - if let Some((mute, deafen)) = action { - if server.deafened() != deafen { - self.audio_output.play_effect(if deafen { - NotificationEvents::Deafen - } else { - NotificationEvents::Undeafen - }); - } else if server.muted() != mute { - self.audio_output.play_effect(if mute { - NotificationEvents::Mute - } else { - NotificationEvents::Unmute - }); - } - let mut msg = msgs::UserState::new(); - if server.muted() != mute { - msg.set_self_mute(mute); - } else if !mute && !deafen && server.deafened() { - msg.set_self_mute(false); - } - if server.deafened() != deafen { - msg.set_self_deaf(deafen); - new_deaf = Some(deafen); - } - let server = self.server_mut().unwrap(); - server.set_muted(mute); - server.set_deafened(deafen); - packet_sender.send(msg.into()).unwrap(); - } - - now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }))) - } - Command::InputVolumeSet(volume) => { - self.audio_input.set_volume(volume); - now!(Ok(None)) - } - Command::MuteOther(string, toggle) => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - let id = self - .server_mut() - .unwrap() - .users_mut() - .iter_mut() - .find(|(_, user)| user.name() == string); - - let (id, user) = match id { - Some(id) => (*id.0, id.1), - None => return now!(Err(Error::InvalidUsername(string))), - }; - - let action = match toggle { - Some(state) => { - if user.suppressed() != state { - Some(state) - } else { - None - } - } - None => Some(!user.suppressed()), - }; - - if let Some(action) = action { - user.set_suppressed(action); - self.audio_output.set_mute(id, action); - } - - return now!(Ok(None)); - } - Command::MuteSelf(toggle) => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - let server = self.server().unwrap(); - let action = match (toggle, server.muted(), server.deafened()) { - (Some(false), false, false) => None, - (Some(false), false, true) => Some((false, false)), - (Some(false), true, false) => Some((false, false)), - (Some(false), true, true) => Some((false, false)), - (Some(true), false, false) => Some((true, false)), - (Some(true), false, true) => None, - (Some(true), true, false) => None, - (Some(true), true, true) => None, - (None, false, false) => Some((true, false)), - (None, false, true) => Some((false, false)), - (None, true, false) => Some((false, false)), - (None, true, true) => Some((false, false)), - }; - - let mut new_mute = None; - if let Some((mute, deafen)) = action { - if server.deafened() != deafen { - self.audio_output.play_effect(if deafen { - NotificationEvents::Deafen - } else { - NotificationEvents::Undeafen - }); - } else if server.muted() != mute { - self.audio_output.play_effect(if mute { - NotificationEvents::Mute - } else { - NotificationEvents::Unmute - }); - } - let mut msg = msgs::UserState::new(); - if server.muted() != mute { - msg.set_self_mute(mute); - new_mute = Some(mute) - } else if !mute && !deafen && server.deafened() { - msg.set_self_mute(false); - new_mute = Some(false) - } - if server.deafened() != deafen { - msg.set_self_deaf(deafen); - } - let server = self.server_mut().unwrap(); - server.set_muted(mute); - server.set_deafened(deafen); - packet_sender.send(msg.into()).unwrap(); - } - - now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }))) - } - Command::OutputVolumeSet(volume) => { - self.audio_output.set_volume(volume); - now!(Ok(None)) - } - Command::Ping => { - now!(Ok(Some(CommandResponse::Pong))) - } - Command::ServerConnect { - host, - port, - username, - password, - accept_invalid_cert, - } => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Disconnected) { - return now!(Err(Error::AlreadyConnected)); - } - let mut server = Server::new(); - *server.username_mut() = Some(username); - *server.password_mut() = password; - *server.host_mut() = Some(format!("{}:{}", host, port)); - self.server = Some(server); - self.phase_watcher - .0 - .send(StatePhase::Connecting) - .unwrap(); - - let socket_addr = match (host.as_ref(), port) - .to_socket_addrs() - .map(|mut e| e.next()) - { - Ok(Some(v)) => v, - _ => { - warn!("Error parsing server addr"); - return now!(Err(Error::InvalidServerAddr(host, port))); - } - }; - connection_info_sender - .send(Some(ConnectionInfo::new( - socket_addr, - host, - accept_invalid_cert, - ))) - .unwrap(); - at!(TcpEvent::Connected, |res| { - //runs the closure when the client is connected - if let TcpEventData::Connected(res) = res { - res.map(|msg| { - Some(CommandResponse::ServerConnect { - welcome_message: if msg.has_welcome_text() { - Some(msg.get_welcome_text().to_string()) - } else { - None - }, - }) - }) - } else { - unreachable!("callback should be provided with a TcpEventData::Connected"); - } - }) - } - Command::ServerDisconnect => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - self.server = None; - - self.phase_watcher - .0 - .send(StatePhase::Disconnected) - .unwrap(); - self.audio_output.play_effect(NotificationEvents::ServerDisconnect); - now!(Ok(None)) - } - Command::ServerStatus { host, port } => ExecutionContext::Ping( - Box::new(move || { - match (host.as_str(), port) - .to_socket_addrs() - .map(|mut e| e.next()) - { - Ok(Some(v)) => Ok(v), - _ => Err(Error::InvalidServerAddr(host, port)), - } - }), - Box::new(move |pong| { - Ok(pong.map(|pong| CommandResponse::ServerStatus { - version: pong.version, - users: pong.users, - max_users: pong.max_users, - bandwidth: pong.bandwidth, - })) - }), - ), - Command::Status => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - let state = self.server.as_ref().unwrap().into(); - now!(Ok(Some(CommandResponse::Status { - server_state: state, //guaranteed not to panic because if we are connected, server is guaranteed to be Some - }))) - } - Command::UserVolumeSet(string, volume) => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - let user_id = match self - .server() - .unwrap() - .users() - .iter() - .find(|e| e.1.name() == string) - .map(|e| *e.0) - { - None => return now!(Err(Error::InvalidUsername(string))), - Some(v) => v, - }; - - self.audio_output.set_user_volume(user_id, volume); - now!(Ok(None)) - } - Command::PastMessages { block } => { - let server = match self.server.as_ref() { - Some(s) => s, - None => return now!(Err(Error::Disconnected)), - }; - let messages = std::mem::take(&mut self.message_buffer).into_iter() - .map(|(msg, user)| (msg, server.users().get(&user).unwrap().name().to_string())).collect(); - - now!(Ok(Some(CommandResponse::PastMessages { messages }))) - } - Command::SendMessage { message, targets } => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - let mut msg = msgs::TextMessage::new(); - - msg.set_message(message); - - for target in targets { - match target { - MessageTarget::Channel { recursive, name } => { - let channel_id = self - .server() - .unwrap() - .channel_name(&name); - - let channel_id = match channel_id { - Ok(id) => id, - Err(e) => return now!(Err(Error::ChannelIdentifierError(name, e))), - }.0; - - if recursive { - msg.mut_tree_id() - } else { - msg.mut_channel_id() - }.push(channel_id); - } - MessageTarget::User { name } => { - let id = self - .server() - .unwrap() - .users() - .iter() - .find(|(_, user)| user.name() == &name) - .map(|(e, _)| *e); - - let id = match id { - Some(id) => id, - None => return now!(Err(Error::InvalidUsername(name))), - }; - - msg.mut_session().push(id); - } - } - } - - packet_sender.send(msg.into()).unwrap(); - - now!(Ok(None)) - } - } - } pub fn parse_user_state(&mut self, msg: msgs::UserState) { if !msg.has_session() { @@ -671,3 +301,394 @@ impl State { .channel() } } + +pub fn handle_command( + og_state: Arc>, + command: Command, + packet_sender: &mut mpsc::UnboundedSender>, + connection_info_sender: &mut watch::Sender>, +) -> ExecutionContext { + let mut state = og_state.write().unwrap(); + match command { + Command::ChannelJoin { channel_identifier } => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + let id = match state.server().unwrap().channel_name(&channel_identifier) { + Ok((id, _)) => id, + Err(e) => return now!(Err(Error::ChannelIdentifierError(channel_identifier, e))), + }; + + let mut msg = msgs::UserState::new(); + msg.set_session(state.server.as_ref().unwrap().session_id().unwrap()); + msg.set_channel_id(id); + packet_sender.send(msg.into()).unwrap(); + now!(Ok(None)) + } + Command::ChannelList => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + let list = channel::into_channel( + state.server.as_ref().unwrap().channels(), + state.server.as_ref().unwrap().users(), + ); + now!(Ok(Some(CommandResponse::ChannelList { channels: list }))) + } + Command::ConfigReload => { + state.reload_config(); + now!(Ok(None)) + } + Command::DeafenSelf(toggle) => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + let server = state.server().unwrap(); + let action = match (toggle, server.muted(), server.deafened()) { + (Some(false), false, false) => None, + (Some(false), false, true) => Some((false, false)), + (Some(false), true, false) => None, + (Some(false), true, true) => Some((true, false)), + (Some(true), false, false) => Some((false, true)), + (Some(true), false, true) => None, + (Some(true), true, false) => Some((true, true)), + (Some(true), true, true) => None, + (None, false, false) => Some((false, true)), + (None, false, true) => Some((false, false)), + (None, true, false) => Some((true, true)), + (None, true, true) => Some((true, false)), + }; + + let mut new_deaf = None; + if let Some((mute, deafen)) = action { + if server.deafened() != deafen { + state.audio_output.play_effect(if deafen { + NotificationEvents::Deafen + } else { + NotificationEvents::Undeafen + }); + } else if server.muted() != mute { + state.audio_output.play_effect(if mute { + NotificationEvents::Mute + } else { + NotificationEvents::Unmute + }); + } + let mut msg = msgs::UserState::new(); + if server.muted() != mute { + msg.set_self_mute(mute); + } else if !mute && !deafen && server.deafened() { + msg.set_self_mute(false); + } + if server.deafened() != deafen { + msg.set_self_deaf(deafen); + new_deaf = Some(deafen); + } + let server = state.server_mut().unwrap(); + server.set_muted(mute); + server.set_deafened(deafen); + packet_sender.send(msg.into()).unwrap(); + } + + now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }))) + } + Command::InputVolumeSet(volume) => { + state.audio_input.set_volume(volume); + now!(Ok(None)) + } + Command::MuteOther(string, toggle) => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + let id = state + .server_mut() + .unwrap() + .users_mut() + .iter_mut() + .find(|(_, user)| user.name() == string); + + let (id, user) = match id { + Some(id) => (*id.0, id.1), + None => return now!(Err(Error::InvalidUsername(string))), + }; + + let action = match toggle { + Some(state) => { + if user.suppressed() != state { + Some(state) + } else { + None + } + } + None => Some(!user.suppressed()), + }; + + if let Some(action) = action { + user.set_suppressed(action); + state.audio_output.set_mute(id, action); + } + + return now!(Ok(None)); + } + Command::MuteSelf(toggle) => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + let server = state.server().unwrap(); + let action = match (toggle, server.muted(), server.deafened()) { + (Some(false), false, false) => None, + (Some(false), false, true) => Some((false, false)), + (Some(false), true, false) => Some((false, false)), + (Some(false), true, true) => Some((false, false)), + (Some(true), false, false) => Some((true, false)), + (Some(true), false, true) => None, + (Some(true), true, false) => None, + (Some(true), true, true) => None, + (None, false, false) => Some((true, false)), + (None, false, true) => Some((false, false)), + (None, true, false) => Some((false, false)), + (None, true, true) => Some((false, false)), + }; + + let mut new_mute = None; + if let Some((mute, deafen)) = action { + if server.deafened() != deafen { + state.audio_output.play_effect(if deafen { + NotificationEvents::Deafen + } else { + NotificationEvents::Undeafen + }); + } else if server.muted() != mute { + state.audio_output.play_effect(if mute { + NotificationEvents::Mute + } else { + NotificationEvents::Unmute + }); + } + let mut msg = msgs::UserState::new(); + if server.muted() != mute { + msg.set_self_mute(mute); + new_mute = Some(mute) + } else if !mute && !deafen && server.deafened() { + msg.set_self_mute(false); + new_mute = Some(false) + } + if server.deafened() != deafen { + msg.set_self_deaf(deafen); + } + let server = state.server_mut().unwrap(); + server.set_muted(mute); + server.set_deafened(deafen); + packet_sender.send(msg.into()).unwrap(); + } + + now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }))) + } + Command::OutputVolumeSet(volume) => { + state.audio_output.set_volume(volume); + now!(Ok(None)) + } + Command::Ping => { + now!(Ok(Some(CommandResponse::Pong))) + } + Command::ServerConnect { + host, + port, + username, + password, + accept_invalid_cert, + } => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Disconnected) { + return now!(Err(Error::AlreadyConnected)); + } + let mut server = Server::new(); + *server.username_mut() = Some(username); + *server.password_mut() = password; + *server.host_mut() = Some(format!("{}:{}", host, port)); + state.server = Some(server); + state.phase_watcher + .0 + .send(StatePhase::Connecting) + .unwrap(); + + let socket_addr = match (host.as_ref(), port) + .to_socket_addrs() + .map(|mut e| e.next()) + { + Ok(Some(v)) => v, + _ => { + warn!("Error parsing server addr"); + return now!(Err(Error::InvalidServerAddr(host, port))); + } + }; + connection_info_sender + .send(Some(ConnectionInfo::new( + socket_addr, + host, + accept_invalid_cert, + ))) + .unwrap(); + at!(TcpEvent::Connected, |res| { + //runs the closure when the client is connected + if let TcpEventData::Connected(res) = res { + res.map(|msg| { + Some(CommandResponse::ServerConnect { + welcome_message: if msg.has_welcome_text() { + Some(msg.get_welcome_text().to_string()) + } else { + None + }, + }) + }) + } else { + unreachable!("callback should be provided with a TcpEventData::Connected"); + } + }) + } + Command::ServerDisconnect => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + state.server = None; + + state.phase_watcher + .0 + .send(StatePhase::Disconnected) + .unwrap(); + state.audio_output.play_effect(NotificationEvents::ServerDisconnect); + now!(Ok(None)) + } + Command::ServerStatus { host, port } => ExecutionContext::Ping( + Box::new(move || { + match (host.as_str(), port) + .to_socket_addrs() + .map(|mut e| e.next()) + { + Ok(Some(v)) => Ok(v), + _ => Err(Error::InvalidServerAddr(host, port)), + } + }), + Box::new(move |pong| { + Ok(pong.map(|pong| CommandResponse::ServerStatus { + version: pong.version, + users: pong.users, + max_users: pong.max_users, + bandwidth: pong.bandwidth, + })) + }), + ), + Command::Status => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + let state = state.server.as_ref().unwrap().into(); + now!(Ok(Some(CommandResponse::Status { + server_state: state, //guaranteed not to panic because if we are connected, server is guaranteed to be Some + }))) + } + Command::UserVolumeSet(string, volume) => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + let user_id = match state + .server() + .unwrap() + .users() + .iter() + .find(|e| e.1.name() == string) + .map(|e| *e.0) + { + None => return now!(Err(Error::InvalidUsername(string))), + Some(v) => v, + }; + + state.audio_output.set_user_volume(user_id, volume); + now!(Ok(None)) + } + Command::PastMessages { block } => { + if block { + let messages = std::mem::take(&mut state.message_buffer); + let server = match state.server.as_ref() { + Some(s) => s, + None => return now!(Err(Error::Disconnected)), + }; + let messages = messages.into_iter() + .map(|(msg, user)| (msg, server.users().get(&user).unwrap().name().to_string())).collect(); + + now!(Ok(Some(CommandResponse::PastMessages { messages }))) + } else { + let ref_state = Arc::clone(&og_state); + ExecutionContext::TcpEventSubscriber( + TcpEvent::TextMessage, + Box::new(move |data, sender| { + if let TcpEventData::TextMessage(a) = data { + let message = ( + a.get_message().to_owned(), + ref_state.read().unwrap().server().unwrap().users().get(&a.get_actor()).unwrap().name().to_string() + ); + sender.send(Ok(Some(CommandResponse::PastMessage { message }))).is_ok() + } else { + unreachable!("Should only receive a TextMessage data when listening to TextMessage events"); + } + }), + ) + } + } + Command::SendMessage { message, targets } => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + let mut msg = msgs::TextMessage::new(); + + msg.set_message(message); + + for target in targets { + match target { + MessageTarget::Channel { recursive, name } => { + let channel_id = state + .server() + .unwrap() + .channel_name(&name); + + let channel_id = match channel_id { + Ok(id) => id, + Err(e) => return now!(Err(Error::ChannelIdentifierError(name, e))), + }.0; + + if recursive { + msg.mut_tree_id() + } else { + msg.mut_channel_id() + }.push(channel_id); + } + MessageTarget::User { name } => { + let id = state + .server() + .unwrap() + .users() + .iter() + .find(|(_, user)| user.name() == &name) + .map(|(e, _)| *e); + + let id = match id { + Some(id) => id, + None => return now!(Err(Error::InvalidUsername(name))), + }; + + msg.mut_session().push(id); + } + } + } + + packet_sender.send(msg.into()).unwrap(); + + now!(Ok(None)) + } + } +} -- cgit v1.2.1 From 6519ad9c82549817d797a5d9d463a418eb35273f Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 21 May 2021 15:21:57 +0200 Subject: fix deadlock and change message registering properly --- mumd/src/network/tcp.rs | 2 ++ mumd/src/state.rs | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 10 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index b6e939a..b513797 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -299,6 +299,8 @@ async fn listen( notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this } state.register_message((msg.get_message().to_owned(), msg.get_actor())); + drop(state); + event_queue.resolve(TcpEventData::TextMessage(&*msg)); } ControlPacket::CryptSetup(msg) => { debug!("Crypt setup"); diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 483f915..fba6ab0 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -612,16 +612,6 @@ pub fn handle_command( } Command::PastMessages { block } => { if block { - let messages = std::mem::take(&mut state.message_buffer); - let server = match state.server.as_ref() { - Some(s) => s, - None => return now!(Err(Error::Disconnected)), - }; - let messages = messages.into_iter() - .map(|(msg, user)| (msg, server.users().get(&user).unwrap().name().to_string())).collect(); - - now!(Ok(Some(CommandResponse::PastMessages { messages }))) - } else { let ref_state = Arc::clone(&og_state); ExecutionContext::TcpEventSubscriber( TcpEvent::TextMessage, @@ -637,6 +627,16 @@ pub fn handle_command( } }), ) + } else { + let messages = std::mem::take(&mut state.message_buffer); + let server = match state.server.as_ref() { + Some(s) => s, + None => return now!(Err(Error::Disconnected)), + }; + let messages = messages.into_iter() + .map(|(msg, user)| (msg, server.users().get(&user).unwrap().name().to_string())).collect(); + + now!(Ok(Some(CommandResponse::PastMessages { messages }))) } } Command::SendMessage { message, targets } => { -- cgit v1.2.1 From 9769c26156311fa24a1a376736172283e199bbf9 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 21 May 2021 15:35:36 +0200 Subject: fix panic when user leaves channel --- mumd/src/state.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/state.rs b/mumd/src/state.rs index fba6ab0..7a5df90 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -619,7 +619,13 @@ pub fn handle_command( if let TcpEventData::TextMessage(a) = data { let message = ( a.get_message().to_owned(), - ref_state.read().unwrap().server().unwrap().users().get(&a.get_actor()).unwrap().name().to_string() + ref_state.read() + .unwrap() + .server() + .unwrap() + .users() + .get(&a.get_actor()).map(|e| e.name().to_string()) + .unwrap_or(format!("Unknown user {}", a.get_actor())) ); sender.send(Ok(Some(CommandResponse::PastMessage { message }))).is_ok() } else { @@ -634,7 +640,14 @@ pub fn handle_command( None => return now!(Err(Error::Disconnected)), }; let messages = messages.into_iter() - .map(|(msg, user)| (msg, server.users().get(&user).unwrap().name().to_string())).collect(); + .map(|(msg, user)| (msg, server + .users() + .get(&user) + .map(|e| e + .name() + .to_string()) + .unwrap_or(format!("Unknown user {}", user)))) + .collect(); now!(Ok(Some(CommandResponse::PastMessages { messages }))) } -- cgit v1.2.1 From 1a0a0cc5796bebf32e50465abb68b06044aac2e7 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 21 May 2021 15:41:49 +0200 Subject: refactor user name getting --- mumd/src/state.rs | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 7a5df90..87a5a0e 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -300,6 +300,13 @@ impl State { .1 .channel() } + fn get_user_name(&self, user: u32) -> String { + self.server() + .unwrap() + .users() + .get(&user).map(|e| e.name().to_string()) + .unwrap_or(format!("Unknown user {}", user)) + } } pub fn handle_command( @@ -619,13 +626,7 @@ pub fn handle_command( if let TcpEventData::TextMessage(a) = data { let message = ( a.get_message().to_owned(), - ref_state.read() - .unwrap() - .server() - .unwrap() - .users() - .get(&a.get_actor()).map(|e| e.name().to_string()) - .unwrap_or(format!("Unknown user {}", a.get_actor())) + ref_state.read().unwrap().get_user_name(a.get_actor()) ); sender.send(Ok(Some(CommandResponse::PastMessage { message }))).is_ok() } else { @@ -635,18 +636,8 @@ pub fn handle_command( ) } else { let messages = std::mem::take(&mut state.message_buffer); - let server = match state.server.as_ref() { - Some(s) => s, - None => return now!(Err(Error::Disconnected)), - }; let messages = messages.into_iter() - .map(|(msg, user)| (msg, server - .users() - .get(&user) - .map(|e| e - .name() - .to_string()) - .unwrap_or(format!("Unknown user {}", user)))) + .map(|(msg, user)| (msg, state.get_user_name(user))) .collect(); now!(Ok(Some(CommandResponse::PastMessages { messages }))) -- cgit v1.2.1 From aac27b02754e2d8b8152a062c112842d9f22478a Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 21 May 2021 15:47:53 +0200 Subject: fix potential panic when getting usernames while disconnected --- mumd/src/state.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 87a5a0e..ce915ac 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -300,12 +300,15 @@ impl State { .1 .channel() } - fn get_user_name(&self, user: u32) -> String { + + /// Gets the username of a user with id `user` connected to the same server that we are connected to. + /// If we are connected to the server but the user with the id doesn't exist, the string "Unknown user {id}" + /// is returned instead. If we aren't connected to a server, None is returned instead. + fn get_user_name(&self, user: u32) -> Option { self.server() - .unwrap() - .users() - .get(&user).map(|e| e.name().to_string()) - .unwrap_or(format!("Unknown user {}", user)) + .map(|e| e.users() + .get(&user).map(|e| e.name().to_string()) + .unwrap_or(format!("Unknown user {}", user))) } } -- cgit v1.2.1 From f72440096cefecbe62a37813ea3ee6f3cd3c7299 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 21 May 2021 21:48:14 +0200 Subject: add connectivity check --- mumd/src/state.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/state.rs b/mumd/src/state.rs index ce915ac..8072b8e 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -621,6 +621,10 @@ pub fn handle_command( now!(Ok(None)) } Command::PastMessages { block } => { + //does it make sense to wait for messages while not connected? + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } if block { let ref_state = Arc::clone(&og_state); ExecutionContext::TcpEventSubscriber( @@ -629,7 +633,7 @@ pub fn handle_command( if let TcpEventData::TextMessage(a) = data { let message = ( a.get_message().to_owned(), - ref_state.read().unwrap().get_user_name(a.get_actor()) + ref_state.read().unwrap().get_user_name(a.get_actor()).unwrap() ); sender.send(Ok(Some(CommandResponse::PastMessage { message }))).is_ok() } else { @@ -640,7 +644,7 @@ pub fn handle_command( } else { let messages = std::mem::take(&mut state.message_buffer); let messages = messages.into_iter() - .map(|(msg, user)| (msg, state.get_user_name(user))) + .map(|(msg, user)| (msg, state.get_user_name(user).unwrap())) .collect(); now!(Ok(Some(CommandResponse::PastMessages { messages }))) -- cgit v1.2.1 From aa710a3420ef4d834ee1df4099b25f3c83b9c31d Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Sat, 22 May 2021 01:27:17 +0200 Subject: rework command response mechanism --- mumd/src/command.rs | 9 +++++++-- mumd/src/main.rs | 8 ++++++-- mumd/src/state.rs | 25 +++++++++++++++---------- 3 files changed, 28 insertions(+), 14 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index f02ad19..5255afa 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -29,7 +29,9 @@ pub async fn handle( event, Box::new(move |e| { let response = generator(e); - response_sender.send(response).unwrap(); + for response in response { + response_sender.send(response).unwrap(); + } }), ); } @@ -42,7 +44,10 @@ pub async fn handle( ) } ExecutionContext::Now(generator) => { - response_sender.send(generator()).unwrap(); + for response in generator() { + response_sender.send(response).unwrap(); + } + drop(response_sender); } ExecutionContext::Ping(generator, converter) => { let ret = generator(); diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 12a8802..0c175c2 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -8,13 +8,14 @@ mod state; use crate::state::State; +use bytes::{BufMut, BytesMut}; use futures_util::{select, FutureExt, SinkExt, StreamExt}; use log::*; use mumlib::command::{Command, CommandResponse}; use mumlib::setup_logger; +use std::io::ErrorKind; use tokio::{net::{UnixListener, UnixStream}, sync::mpsc}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; -use bytes::{BufMut, BytesMut}; #[tokio::main] async fn main() { @@ -114,7 +115,10 @@ async fn receive_commands( bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); if let Err(e) = writer.send(serialized.freeze()).await { - error!("Error sending response: {:?}", e); + if e.kind() != ErrorKind::BrokenPipe { //if the client closed the connection, ignore logging the error + //we just assume that they just don't want any more packets + error!("Error sending response: {:?}", e); + } break; } } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 8072b8e..a224afc 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -18,7 +18,7 @@ use mumlib::command::{Command, CommandResponse, MessageTarget}; use mumlib::config::Config; use mumlib::Error; use crate::state::user::UserDiff; -use std::{net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}}; +use std::{iter, net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}}; use tokio::sync::{mpsc, watch}; macro_rules! at { @@ -29,21 +29,23 @@ macro_rules! at { macro_rules! now { ($data:expr) => { - ExecutionContext::Now(Box::new(move || $data)) + ExecutionContext::Now(Box::new(move || Box::new(iter::once($data)))) }; } +type Responses = Box>>>; + //TODO give me a better name pub enum ExecutionContext { TcpEventCallback( TcpEvent, - Box mumlib::error::Result>>, + Box Responses>, ), TcpEventSubscriber( TcpEvent, Box>>) -> bool>, ), - Now(Box mumlib::error::Result>>), + Now(Box Responses>), Ping( Box mumlib::error::Result>, Box) -> mumlib::error::Result> + Send>, @@ -545,7 +547,7 @@ pub fn handle_command( at!(TcpEvent::Connected, |res| { //runs the closure when the client is connected if let TcpEventData::Connected(res) = res { - res.map(|msg| { + Box::new(iter::once(res.map(|msg| { Some(CommandResponse::ServerConnect { welcome_message: if msg.has_welcome_text() { Some(msg.get_welcome_text().to_string()) @@ -553,7 +555,7 @@ pub fn handle_command( None }, }) - }) + }))) } else { unreachable!("callback should be provided with a TcpEventData::Connected"); } @@ -584,12 +586,12 @@ pub fn handle_command( } }), Box::new(move |pong| { - Ok(pong.map(|pong| CommandResponse::ServerStatus { + Ok(pong.map(|pong| (CommandResponse::ServerStatus { version: pong.version, users: pong.users, max_users: pong.max_users, bandwidth: pong.bandwidth, - })) + }))) }), ), Command::Status => { @@ -643,11 +645,14 @@ pub fn handle_command( ) } else { let messages = std::mem::take(&mut state.message_buffer); - let messages = messages.into_iter() + let messages: Vec<_> = messages.into_iter() .map(|(msg, user)| (msg, state.get_user_name(user).unwrap())) + .map(|e| Ok(Some(CommandResponse::PastMessage { message: e }))) .collect(); - now!(Ok(Some(CommandResponse::PastMessages { messages }))) + ExecutionContext::Now(Box::new(move || { + Box::new(messages.into_iter()) + })) } } Command::SendMessage { message, targets } => { -- cgit v1.2.1 From 55a12fbdfb435886b2f211fe1fb00daafb32b6a7 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Sun, 6 Jun 2021 23:17:39 +0200 Subject: unhide doctests --- mumd/src/state/server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/state/server.rs b/mumd/src/state/server.rs index e44d1e8..78a10b9 100644 --- a/mumd/src/state/server.rs +++ b/mumd/src/state/server.rs @@ -92,7 +92,7 @@ impl Server { /// Takes a channel name and returns either a tuple with the channel id and a reference to the /// channel struct if the channel name unambiguosly refers to a channel, or an error describing /// if the channel identifier was ambigous or invalid. - /*/// note that doctests currently aren't run in binary crates yet (see #50784) + /// note that doctests currently aren't run in binary crates yet (see #50784) /// ``` /// use crate::state::channel::Channel; /// let mut server = Server::new(); @@ -102,7 +102,7 @@ impl Server { /// }; /// server.channels.insert(0, channel.clone); /// assert_eq!(server.channel_name("Foobar"), Ok((0, &channel))); - /// ```*/ + /// ``` pub fn channel_name(&self, channel_name: &str) -> Result<(u32, &Channel), ChannelIdentifierError> { let matches = self.channels .iter() -- cgit v1.2.1