diff options
Diffstat (limited to 'mumd/src/network/tcp.rs')
| -rw-r--r-- | mumd/src/network/tcp.rs | 93 |
1 files changed, 66 insertions, 27 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index b513797..5cc2bf7 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,11 +1,14 @@ -use crate::{error::{ServerSendError, TcpError}, notifications}; use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; +use crate::{ + error::{ServerSendError, TcpError}, + notifications, +}; use log::*; -use futures_util::{FutureExt, SinkExt, StreamExt}; use futures_util::select; use futures_util::stream::{SplitSink, SplitStream, Stream}; +use futures_util::{FutureExt, SinkExt, StreamExt}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; @@ -73,24 +76,44 @@ impl TcpEventQueue { /// Registers a new callback to be triggered when an event is fired. pub fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { - self.callbacks.write().unwrap().entry(at).or_default().push(callback); + self.callbacks + .write() + .unwrap() + .entry(at) + .or_default() + .push(callback); } /// Registers a new callback to be triggered when an event is fired. pub fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { - self.subscribers.write().unwrap().entry(at).or_default().push(callback); + self.subscribers + .write() + .unwrap() + .entry(at) + .or_default() + .push(callback); } /// Fires all callbacks related to a specific TCP event and removes them from the event queue. /// Also calls all event subscribers, but keeps them in the queue pub fn resolve<'a>(&self, data: TcpEventData<'a>) { - if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) { + if let Some(vec) = self + .callbacks + .write() + .unwrap() + .get_mut(&TcpEvent::from(&data)) + { let old = std::mem::take(vec); for handler in old { handler(data.clone()); } } - if let Some(vec) = self.subscribers.write().unwrap().get_mut(&TcpEvent::from(&data)) { + if let Some(vec) = self + .subscribers + .write() + .unwrap() + .get_mut(&TcpEvent::from(&data)) + { let old = std::mem::take(vec); for mut e in old { if e(data.clone()) { @@ -128,14 +151,18 @@ pub async fn handle( // Handshake (omitting `Version` message for brevity) let (username, password) = { let state_lock = state.read().unwrap(); - (state_lock.username().unwrap().to_string(), - state_lock.password().map(|x| x.to_string())) + ( + state_lock.username().unwrap().to_string(), + state_lock.password().map(|x| x.to_string()), + ) }; authenticate(&mut sink, username, password).await?; let (phase_watcher, input_receiver) = { let state_lock = state.read().unwrap(); - (state_lock.phase_receiver(), - state_lock.audio_input().receiver()) + ( + state_lock.phase_receiver(), + state_lock.audio_input().receiver(), + ) }; info!("Logging in..."); @@ -162,7 +189,9 @@ pub async fn handle( } }, phase_watcher, - ).await.unwrap_or(Ok(()))?; + ) + .await + .unwrap_or(Ok(()))?; event_queue.resolve(TcpEventData::Disconnected); @@ -197,7 +226,7 @@ async fn connect( async fn authenticate( sink: &mut TcpSender, username: String, - password: Option<String> + password: Option<String>, ) -> Result<(), TcpError> { let mut msg = msgs::Authenticate::new(); msg.set_username(username); @@ -242,7 +271,10 @@ async fn send_voice( let mut inner_phase_watcher = phase_watcher.clone(); loop { inner_phase_watcher.changed().await.unwrap(); - if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::TCP)) { + if matches!( + *inner_phase_watcher.borrow(), + StatePhase::Connected(VoiceStreamType::TCP) + ) { break; } } @@ -257,11 +289,14 @@ async fn send_voice( .next() .await .expect("No audio stream") - .into())?; + .into(), + )?; } }, inner_phase_watcher.clone(), - ).await.unwrap_or(Ok::<(), ServerSendError>(()))?; + ) + .await + .unwrap_or(Ok::<(), ServerSendError>(()))?; } } @@ -285,18 +320,23 @@ async fn listen( // We end up here if the login was rejected. We probably want // to exit before that. warn!("TCP stream gone"); - state.read().unwrap().broadcast_phase(StatePhase::Disconnected); + state + .read() + .unwrap() + .broadcast_phase(StatePhase::Disconnected); break; } }; match packet { ControlPacket::TextMessage(msg) => { let mut state = state.write().unwrap(); - let user = state.server() + let user = state + .server() .and_then(|server| server.users().get(&msg.get_actor())) .map(|user| user.name()); if let Some(user) = user { - notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this + notifications::send(format!("{}: {}", user, msg.get_message())); + //TODO: probably want a config flag for this } state.register_message((msg.get_message().to_owned(), msg.get_actor())); drop(state); @@ -345,7 +385,9 @@ async fn listen( debug!("Login rejected: {:?}", msg); match msg.get_field_type() { msgs::Reject_RejectType::WrongServerPW => { - event_queue.resolve(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword))); + event_queue.resolve(TcpEventData::Connected(Err( + mumlib::Error::InvalidServerPassword, + ))); } ty => { warn!("Unhandled reject type: {:?}", ty); @@ -385,14 +427,11 @@ async fn listen( // position_info, .. } => { - state - .read() - .unwrap() - .audio_output() - .decode_packet_payload( - VoiceStreamType::TCP, - session_id, - payload); + state.read().unwrap().audio_output().decode_packet_payload( + VoiceStreamType::TCP, + session_id, + payload, + ); } } } |
