diff options
| author | Eskil Queseth <eskilq@kth.se> | 2021-05-21 14:53:58 +0200 |
|---|---|---|
| committer | Eskil Queseth <eskilq@kth.se> | 2021-05-21 14:53:58 +0200 |
| commit | d6779ca065a896d329a7634d69a5f4270d829b73 (patch) | |
| tree | 505cd0eef7a4981e44c06a7fc45062ce2b4e9b15 /mumd/src/state.rs | |
| parent | 5d05d292ddb7f8b28b71abd46930028b6e66dfde (diff) | |
| download | mum-d6779ca065a896d329a7634d69a5f4270d829b73.tar.gz | |
rework event system to allow multiple triggers
Diffstat (limited to 'mumd/src/state.rs')
| -rw-r--r-- | mumd/src/state.rs | 763 |
1 files changed, 392 insertions, 371 deletions
diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 423ce76..483f915 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -18,7 +18,7 @@ use mumlib::command::{Command, CommandResponse, MessageTarget}; use mumlib::config::Config; use mumlib::Error; use crate::state::user::UserDiff; -use std::net::{SocketAddr, ToSocketAddrs}; +use std::{net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}}; use tokio::sync::{mpsc, watch}; macro_rules! at { @@ -90,376 +90,6 @@ impl State { Ok(state) } - pub fn handle_command( - &mut self, - command: Command, - packet_sender: &mut mpsc::UnboundedSender<ControlPacket<Serverbound>>, - connection_info_sender: &mut watch::Sender<Option<ConnectionInfo>>, - ) -> ExecutionContext { - match command { - Command::ChannelJoin { channel_identifier } => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - let id = match self.server().unwrap().channel_name(&channel_identifier) { - Ok((id, _)) => id, - Err(e) => return now!(Err(Error::ChannelIdentifierError(channel_identifier, e))), - }; - - let mut msg = msgs::UserState::new(); - msg.set_session(self.server.as_ref().unwrap().session_id().unwrap()); - msg.set_channel_id(id); - packet_sender.send(msg.into()).unwrap(); - now!(Ok(None)) - } - Command::ChannelList => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - let list = channel::into_channel( - self.server.as_ref().unwrap().channels(), - self.server.as_ref().unwrap().users(), - ); - now!(Ok(Some(CommandResponse::ChannelList { channels: list }))) - } - Command::ConfigReload => { - self.reload_config(); - now!(Ok(None)) - } - Command::DeafenSelf(toggle) => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - let server = self.server().unwrap(); - let action = match (toggle, server.muted(), server.deafened()) { - (Some(false), false, false) => None, - (Some(false), false, true) => Some((false, false)), - (Some(false), true, false) => None, - (Some(false), true, true) => Some((true, false)), - (Some(true), false, false) => Some((false, true)), - (Some(true), false, true) => None, - (Some(true), true, false) => Some((true, true)), - (Some(true), true, true) => None, - (None, false, false) => Some((false, true)), - (None, false, true) => Some((false, false)), - (None, true, false) => Some((true, true)), - (None, true, true) => Some((true, false)), - }; - - let mut new_deaf = None; - if let Some((mute, deafen)) = action { - if server.deafened() != deafen { - self.audio_output.play_effect(if deafen { - NotificationEvents::Deafen - } else { - NotificationEvents::Undeafen - }); - } else if server.muted() != mute { - self.audio_output.play_effect(if mute { - NotificationEvents::Mute - } else { - NotificationEvents::Unmute - }); - } - let mut msg = msgs::UserState::new(); - if server.muted() != mute { - msg.set_self_mute(mute); - } else if !mute && !deafen && server.deafened() { - msg.set_self_mute(false); - } - if server.deafened() != deafen { - msg.set_self_deaf(deafen); - new_deaf = Some(deafen); - } - let server = self.server_mut().unwrap(); - server.set_muted(mute); - server.set_deafened(deafen); - packet_sender.send(msg.into()).unwrap(); - } - - now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }))) - } - Command::InputVolumeSet(volume) => { - self.audio_input.set_volume(volume); - now!(Ok(None)) - } - Command::MuteOther(string, toggle) => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - let id = self - .server_mut() - .unwrap() - .users_mut() - .iter_mut() - .find(|(_, user)| user.name() == string); - - let (id, user) = match id { - Some(id) => (*id.0, id.1), - None => return now!(Err(Error::InvalidUsername(string))), - }; - - let action = match toggle { - Some(state) => { - if user.suppressed() != state { - Some(state) - } else { - None - } - } - None => Some(!user.suppressed()), - }; - - if let Some(action) = action { - user.set_suppressed(action); - self.audio_output.set_mute(id, action); - } - - return now!(Ok(None)); - } - Command::MuteSelf(toggle) => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - let server = self.server().unwrap(); - let action = match (toggle, server.muted(), server.deafened()) { - (Some(false), false, false) => None, - (Some(false), false, true) => Some((false, false)), - (Some(false), true, false) => Some((false, false)), - (Some(false), true, true) => Some((false, false)), - (Some(true), false, false) => Some((true, false)), - (Some(true), false, true) => None, - (Some(true), true, false) => None, - (Some(true), true, true) => None, - (None, false, false) => Some((true, false)), - (None, false, true) => Some((false, false)), - (None, true, false) => Some((false, false)), - (None, true, true) => Some((false, false)), - }; - - let mut new_mute = None; - if let Some((mute, deafen)) = action { - if server.deafened() != deafen { - self.audio_output.play_effect(if deafen { - NotificationEvents::Deafen - } else { - NotificationEvents::Undeafen - }); - } else if server.muted() != mute { - self.audio_output.play_effect(if mute { - NotificationEvents::Mute - } else { - NotificationEvents::Unmute - }); - } - let mut msg = msgs::UserState::new(); - if server.muted() != mute { - msg.set_self_mute(mute); - new_mute = Some(mute) - } else if !mute && !deafen && server.deafened() { - msg.set_self_mute(false); - new_mute = Some(false) - } - if server.deafened() != deafen { - msg.set_self_deaf(deafen); - } - let server = self.server_mut().unwrap(); - server.set_muted(mute); - server.set_deafened(deafen); - packet_sender.send(msg.into()).unwrap(); - } - - now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }))) - } - Command::OutputVolumeSet(volume) => { - self.audio_output.set_volume(volume); - now!(Ok(None)) - } - Command::Ping => { - now!(Ok(Some(CommandResponse::Pong))) - } - Command::ServerConnect { - host, - port, - username, - password, - accept_invalid_cert, - } => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Disconnected) { - return now!(Err(Error::AlreadyConnected)); - } - let mut server = Server::new(); - *server.username_mut() = Some(username); - *server.password_mut() = password; - *server.host_mut() = Some(format!("{}:{}", host, port)); - self.server = Some(server); - self.phase_watcher - .0 - .send(StatePhase::Connecting) - .unwrap(); - - let socket_addr = match (host.as_ref(), port) - .to_socket_addrs() - .map(|mut e| e.next()) - { - Ok(Some(v)) => v, - _ => { - warn!("Error parsing server addr"); - return now!(Err(Error::InvalidServerAddr(host, port))); - } - }; - connection_info_sender - .send(Some(ConnectionInfo::new( - socket_addr, - host, - accept_invalid_cert, - ))) - .unwrap(); - at!(TcpEvent::Connected, |res| { - //runs the closure when the client is connected - if let TcpEventData::Connected(res) = res { - res.map(|msg| { - Some(CommandResponse::ServerConnect { - welcome_message: if msg.has_welcome_text() { - Some(msg.get_welcome_text().to_string()) - } else { - None - }, - }) - }) - } else { - unreachable!("callback should be provided with a TcpEventData::Connected"); - } - }) - } - Command::ServerDisconnect => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - self.server = None; - - self.phase_watcher - .0 - .send(StatePhase::Disconnected) - .unwrap(); - self.audio_output.play_effect(NotificationEvents::ServerDisconnect); - now!(Ok(None)) - } - Command::ServerStatus { host, port } => ExecutionContext::Ping( - Box::new(move || { - match (host.as_str(), port) - .to_socket_addrs() - .map(|mut e| e.next()) - { - Ok(Some(v)) => Ok(v), - _ => Err(Error::InvalidServerAddr(host, port)), - } - }), - Box::new(move |pong| { - Ok(pong.map(|pong| CommandResponse::ServerStatus { - version: pong.version, - users: pong.users, - max_users: pong.max_users, - bandwidth: pong.bandwidth, - })) - }), - ), - Command::Status => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - let state = self.server.as_ref().unwrap().into(); - now!(Ok(Some(CommandResponse::Status { - server_state: state, //guaranteed not to panic because if we are connected, server is guaranteed to be Some - }))) - } - Command::UserVolumeSet(string, volume) => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - let user_id = match self - .server() - .unwrap() - .users() - .iter() - .find(|e| e.1.name() == string) - .map(|e| *e.0) - { - None => return now!(Err(Error::InvalidUsername(string))), - Some(v) => v, - }; - - self.audio_output.set_user_volume(user_id, volume); - now!(Ok(None)) - } - Command::PastMessages { block } => { - let server = match self.server.as_ref() { - Some(s) => s, - None => return now!(Err(Error::Disconnected)), - }; - let messages = std::mem::take(&mut self.message_buffer).into_iter() - .map(|(msg, user)| (msg, server.users().get(&user).unwrap().name().to_string())).collect(); - - now!(Ok(Some(CommandResponse::PastMessages { messages }))) - } - Command::SendMessage { message, targets } => { - if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { - return now!(Err(Error::Disconnected)); - } - - let mut msg = msgs::TextMessage::new(); - - msg.set_message(message); - - for target in targets { - match target { - MessageTarget::Channel { recursive, name } => { - let channel_id = self - .server() - .unwrap() - .channel_name(&name); - - let channel_id = match channel_id { - Ok(id) => id, - Err(e) => return now!(Err(Error::ChannelIdentifierError(name, e))), - }.0; - - if recursive { - msg.mut_tree_id() - } else { - msg.mut_channel_id() - }.push(channel_id); - } - MessageTarget::User { name } => { - let id = self - .server() - .unwrap() - .users() - .iter() - .find(|(_, user)| user.name() == &name) - .map(|(e, _)| *e); - - let id = match id { - Some(id) => id, - None => return now!(Err(Error::InvalidUsername(name))), - }; - - msg.mut_session().push(id); - } - } - } - - packet_sender.send(msg.into()).unwrap(); - - now!(Ok(None)) - } - } - } pub fn parse_user_state(&mut self, msg: msgs::UserState) { if !msg.has_session() { @@ -671,3 +301,394 @@ impl State { .channel() } } + +pub fn handle_command( + og_state: Arc<RwLock<State>>, + command: Command, + packet_sender: &mut mpsc::UnboundedSender<ControlPacket<Serverbound>>, + connection_info_sender: &mut watch::Sender<Option<ConnectionInfo>>, +) -> ExecutionContext { + let mut state = og_state.write().unwrap(); + match command { + Command::ChannelJoin { channel_identifier } => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + let id = match state.server().unwrap().channel_name(&channel_identifier) { + Ok((id, _)) => id, + Err(e) => return now!(Err(Error::ChannelIdentifierError(channel_identifier, e))), + }; + + let mut msg = msgs::UserState::new(); + msg.set_session(state.server.as_ref().unwrap().session_id().unwrap()); + msg.set_channel_id(id); + packet_sender.send(msg.into()).unwrap(); + now!(Ok(None)) + } + Command::ChannelList => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + let list = channel::into_channel( + state.server.as_ref().unwrap().channels(), + state.server.as_ref().unwrap().users(), + ); + now!(Ok(Some(CommandResponse::ChannelList { channels: list }))) + } + Command::ConfigReload => { + state.reload_config(); + now!(Ok(None)) + } + Command::DeafenSelf(toggle) => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + let server = state.server().unwrap(); + let action = match (toggle, server.muted(), server.deafened()) { + (Some(false), false, false) => None, + (Some(false), false, true) => Some((false, false)), + (Some(false), true, false) => None, + (Some(false), true, true) => Some((true, false)), + (Some(true), false, false) => Some((false, true)), + (Some(true), false, true) => None, + (Some(true), true, false) => Some((true, true)), + (Some(true), true, true) => None, + (None, false, false) => Some((false, true)), + (None, false, true) => Some((false, false)), + (None, true, false) => Some((true, true)), + (None, true, true) => Some((true, false)), + }; + + let mut new_deaf = None; + if let Some((mute, deafen)) = action { + if server.deafened() != deafen { + state.audio_output.play_effect(if deafen { + NotificationEvents::Deafen + } else { + NotificationEvents::Undeafen + }); + } else if server.muted() != mute { + state.audio_output.play_effect(if mute { + NotificationEvents::Mute + } else { + NotificationEvents::Unmute + }); + } + let mut msg = msgs::UserState::new(); + if server.muted() != mute { + msg.set_self_mute(mute); + } else if !mute && !deafen && server.deafened() { + msg.set_self_mute(false); + } + if server.deafened() != deafen { + msg.set_self_deaf(deafen); + new_deaf = Some(deafen); + } + let server = state.server_mut().unwrap(); + server.set_muted(mute); + server.set_deafened(deafen); + packet_sender.send(msg.into()).unwrap(); + } + + now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }))) + } + Command::InputVolumeSet(volume) => { + state.audio_input.set_volume(volume); + now!(Ok(None)) + } + Command::MuteOther(string, toggle) => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + let id = state + .server_mut() + .unwrap() + .users_mut() + .iter_mut() + .find(|(_, user)| user.name() == string); + + let (id, user) = match id { + Some(id) => (*id.0, id.1), + None => return now!(Err(Error::InvalidUsername(string))), + }; + + let action = match toggle { + Some(state) => { + if user.suppressed() != state { + Some(state) + } else { + None + } + } + None => Some(!user.suppressed()), + }; + + if let Some(action) = action { + user.set_suppressed(action); + state.audio_output.set_mute(id, action); + } + + return now!(Ok(None)); + } + Command::MuteSelf(toggle) => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + let server = state.server().unwrap(); + let action = match (toggle, server.muted(), server.deafened()) { + (Some(false), false, false) => None, + (Some(false), false, true) => Some((false, false)), + (Some(false), true, false) => Some((false, false)), + (Some(false), true, true) => Some((false, false)), + (Some(true), false, false) => Some((true, false)), + (Some(true), false, true) => None, + (Some(true), true, false) => None, + (Some(true), true, true) => None, + (None, false, false) => Some((true, false)), + (None, false, true) => Some((false, false)), + (None, true, false) => Some((false, false)), + (None, true, true) => Some((false, false)), + }; + + let mut new_mute = None; + if let Some((mute, deafen)) = action { + if server.deafened() != deafen { + state.audio_output.play_effect(if deafen { + NotificationEvents::Deafen + } else { + NotificationEvents::Undeafen + }); + } else if server.muted() != mute { + state.audio_output.play_effect(if mute { + NotificationEvents::Mute + } else { + NotificationEvents::Unmute + }); + } + let mut msg = msgs::UserState::new(); + if server.muted() != mute { + msg.set_self_mute(mute); + new_mute = Some(mute) + } else if !mute && !deafen && server.deafened() { + msg.set_self_mute(false); + new_mute = Some(false) + } + if server.deafened() != deafen { + msg.set_self_deaf(deafen); + } + let server = state.server_mut().unwrap(); + server.set_muted(mute); + server.set_deafened(deafen); + packet_sender.send(msg.into()).unwrap(); + } + + now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }))) + } + Command::OutputVolumeSet(volume) => { + state.audio_output.set_volume(volume); + now!(Ok(None)) + } + Command::Ping => { + now!(Ok(Some(CommandResponse::Pong))) + } + Command::ServerConnect { + host, + port, + username, + password, + accept_invalid_cert, + } => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Disconnected) { + return now!(Err(Error::AlreadyConnected)); + } + let mut server = Server::new(); + *server.username_mut() = Some(username); + *server.password_mut() = password; + *server.host_mut() = Some(format!("{}:{}", host, port)); + state.server = Some(server); + state.phase_watcher + .0 + .send(StatePhase::Connecting) + .unwrap(); + + let socket_addr = match (host.as_ref(), port) + .to_socket_addrs() + .map(|mut e| e.next()) + { + Ok(Some(v)) => v, + _ => { + warn!("Error parsing server addr"); + return now!(Err(Error::InvalidServerAddr(host, port))); + } + }; + connection_info_sender + .send(Some(ConnectionInfo::new( + socket_addr, + host, + accept_invalid_cert, + ))) + .unwrap(); + at!(TcpEvent::Connected, |res| { + //runs the closure when the client is connected + if let TcpEventData::Connected(res) = res { + res.map(|msg| { + Some(CommandResponse::ServerConnect { + welcome_message: if msg.has_welcome_text() { + Some(msg.get_welcome_text().to_string()) + } else { + None + }, + }) + }) + } else { + unreachable!("callback should be provided with a TcpEventData::Connected"); + } + }) + } + Command::ServerDisconnect => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + state.server = None; + + state.phase_watcher + .0 + .send(StatePhase::Disconnected) + .unwrap(); + state.audio_output.play_effect(NotificationEvents::ServerDisconnect); + now!(Ok(None)) + } + Command::ServerStatus { host, port } => ExecutionContext::Ping( + Box::new(move || { + match (host.as_str(), port) + .to_socket_addrs() + .map(|mut e| e.next()) + { + Ok(Some(v)) => Ok(v), + _ => Err(Error::InvalidServerAddr(host, port)), + } + }), + Box::new(move |pong| { + Ok(pong.map(|pong| CommandResponse::ServerStatus { + version: pong.version, + users: pong.users, + max_users: pong.max_users, + bandwidth: pong.bandwidth, + })) + }), + ), + Command::Status => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + let state = state.server.as_ref().unwrap().into(); + now!(Ok(Some(CommandResponse::Status { + server_state: state, //guaranteed not to panic because if we are connected, server is guaranteed to be Some + }))) + } + Command::UserVolumeSet(string, volume) => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + let user_id = match state + .server() + .unwrap() + .users() + .iter() + .find(|e| e.1.name() == string) + .map(|e| *e.0) + { + None => return now!(Err(Error::InvalidUsername(string))), + Some(v) => v, + }; + + state.audio_output.set_user_volume(user_id, volume); + now!(Ok(None)) + } + Command::PastMessages { block } => { + if block { + let messages = std::mem::take(&mut state.message_buffer); + let server = match state.server.as_ref() { + Some(s) => s, + None => return now!(Err(Error::Disconnected)), + }; + let messages = messages.into_iter() + .map(|(msg, user)| (msg, server.users().get(&user).unwrap().name().to_string())).collect(); + + now!(Ok(Some(CommandResponse::PastMessages { messages }))) + } else { + let ref_state = Arc::clone(&og_state); + ExecutionContext::TcpEventSubscriber( + TcpEvent::TextMessage, + Box::new(move |data, sender| { + if let TcpEventData::TextMessage(a) = data { + let message = ( + a.get_message().to_owned(), + ref_state.read().unwrap().server().unwrap().users().get(&a.get_actor()).unwrap().name().to_string() + ); + sender.send(Ok(Some(CommandResponse::PastMessage { message }))).is_ok() + } else { + unreachable!("Should only receive a TextMessage data when listening to TextMessage events"); + } + }), + ) + } + } + Command::SendMessage { message, targets } => { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + return now!(Err(Error::Disconnected)); + } + + let mut msg = msgs::TextMessage::new(); + + msg.set_message(message); + + for target in targets { + match target { + MessageTarget::Channel { recursive, name } => { + let channel_id = state + .server() + .unwrap() + .channel_name(&name); + + let channel_id = match channel_id { + Ok(id) => id, + Err(e) => return now!(Err(Error::ChannelIdentifierError(name, e))), + }.0; + + if recursive { + msg.mut_tree_id() + } else { + msg.mut_channel_id() + }.push(channel_id); + } + MessageTarget::User { name } => { + let id = state + .server() + .unwrap() + .users() + .iter() + .find(|(_, user)| user.name() == &name) + .map(|(e, _)| *e); + + let id = match id { + Some(id) => id, + None => return now!(Err(Error::InvalidUsername(name))), + }; + + msg.mut_session().push(id); + } + } + } + + packet_sender.send(msg.into()).unwrap(); + + now!(Ok(None)) + } + } +} |
