diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2020-10-29 21:28:12 +0100 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2020-10-29 21:28:12 +0100 |
| commit | 4010e0f5abb28bae3207c78dba74f0896eedea51 (patch) | |
| tree | 2e1c215705d04f86e47a1a0a9015dfd6ed6feeaf /mumd | |
| parent | 3e7e375e65760a03b6692106ab0ed806ca65e470 (diff) | |
| download | mum-4010e0f5abb28bae3207c78dba74f0896eedea51.tar.gz | |
cargo fmt
Diffstat (limited to 'mumd')
| -rw-r--r-- | mumd/src/audio.rs | 26 | ||||
| -rw-r--r-- | mumd/src/audio/input.rs | 8 | ||||
| -rw-r--r-- | mumd/src/command.rs | 15 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 58 | ||||
| -rw-r--r-- | mumd/src/notify.rs | 8 | ||||
| -rw-r--r-- | mumd/src/state.rs | 130 |
6 files changed, 140 insertions, 105 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 7a673ff..ad4a762 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -33,12 +33,13 @@ impl Audio { let output_supported_config = output_device .supported_output_configs() .expect("error querying output configs") - .find_map(|c| - if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { - Some(c) - } else { - None - }) + .find_map(|c| { + if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { + Some(c) + } else { + None + } + }) .unwrap() .with_sample_rate(sample_rate); let output_supported_sample_format = output_supported_config.sample_format(); @@ -50,12 +51,13 @@ impl Audio { let input_supported_config = input_device .supported_input_configs() .expect("error querying output configs") - .find_map(|c| - if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { - Some(c) - } else { - None - }) + .find_map(|c| { + if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { + Some(c) + } else { + None + } + }) .unwrap() .with_sample_rate(sample_rate); let input_supported_sample_format = input_supported_config.sample_format(); diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 4e95360..7405fdb 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -28,9 +28,11 @@ pub fn callback<T: Sample>( move |data: &[T], _info: &InputCallbackInfo| { let mut buf = buf.lock().unwrap(); let input_volume = *input_volume_receiver.borrow(); - let out: Vec<f32> = data.iter().map(|e| e.to_f32()) - .map(|e| e * input_volume) - .collect(); + let out: Vec<f32> = data + .iter() + .map(|e| e.to_f32()) + .map(|e| e * input_volume) + .collect(); buf.extend(out); while buf.len() >= opus_frame_size as usize { let tail = buf.split_off(opus_frame_size as usize); diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 5407ea3..d4b25d0 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,11 +1,11 @@ use crate::state::State; +use crate::network::tcp::{TcpEvent, TcpEventCallback}; use ipc_channel::ipc::IpcSender; use log::*; use mumlib::command::{Command, CommandResponse}; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, oneshot}; -use crate::network::tcp::{TcpEvent, TcpEventCallback}; pub async fn handle( state: Arc<Mutex<State>>, @@ -24,11 +24,14 @@ pub async fn handle( if let Some(event) = event { let (tx, rx) = oneshot::channel(); //TODO handle this error - let _ = tcp_event_register_sender.send((event, Box::new(move |e| { - let response = generator(Some(e)); - response_sender.send(response).unwrap(); - tx.send(()).unwrap(); - }))); + let _ = tcp_event_register_sender.send(( + event, + Box::new(move |e| { + let response = generator(Some(e)); + response_sender.send(response).unwrap(); + tx.send(()).unwrap(); + }), + )); rx.await.unwrap(); } else { diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 630f46a..cd11690 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -7,18 +7,18 @@ use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::{Clientbound, Serverbound}; +use std::cell::RefCell; +use std::collections::HashMap; use std::convert::{Into, TryInto}; +use std::future::Future; use std::net::SocketAddr; +use std::rc::Rc; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; -use std::collections::HashMap; -use std::future::Future; -use std::rc::Rc; -use std::cell::RefCell; type TcpSender = SplitSink< Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>, @@ -31,7 +31,7 @@ pub(crate) type TcpEventCallback = Box<dyn FnOnce(&TcpEventData)>; #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub enum TcpEvent { - Connected, //fires when the client has connected to a server + Connected, //fires when the client has connected to a server Disconnected, //fires when the client has disconnected from a server } @@ -131,13 +131,13 @@ async fn send_pings( delay_seconds: u64, phase_watcher: watch::Receiver<StatePhase>, ) { - let interval = Rc::new(RefCell::new(time::interval(Duration::from_secs(delay_seconds)))); + let interval = Rc::new(RefCell::new(time::interval(Duration::from_secs( + delay_seconds, + )))); let packet_sender = Rc::new(RefCell::new(packet_sender)); run_until_disconnection( - || async { - Some(interval.borrow_mut().tick().await) - }, + || async { Some(interval.borrow_mut().tick().await) }, |_| async { trace!("Sending ping"); let msg = msgs::Ping::new(); @@ -145,7 +145,8 @@ async fn send_pings( }, || async {}, phase_watcher, - ).await; + ) + .await; debug!("Ping sender process killed"); } @@ -158,9 +159,7 @@ async fn send_packets( let sink = Rc::new(RefCell::new(sink)); let packet_receiver = Rc::new(RefCell::new(packet_receiver)); run_until_disconnection( - || async { - packet_receiver.borrow_mut().recv().await - }, + || async { packet_receiver.borrow_mut().recv().await }, |packet| async { sink.borrow_mut().send(packet).await.unwrap(); }, @@ -171,7 +170,8 @@ async fn send_packets( sink.borrow_mut().close().await.unwrap(); }, phase_watcher, - ).await; + ) + .await; debug!("TCP packet sender killed"); } @@ -188,9 +188,7 @@ async fn listen( let stream = Rc::new(RefCell::new(stream)); run_until_disconnection( - || async { - stream.borrow_mut().next().await - }, + || async { stream.borrow_mut().next().await }, |packet| async { match packet.unwrap() { ControlPacket::TextMessage(msg) => { @@ -289,7 +287,8 @@ async fn listen( } }, phase_watcher, - ).await; + ) + .await; debug!("Killing TCP listener block"); } @@ -301,13 +300,19 @@ async fn register_events( ) { let tcp_event_register_receiver = Rc::new(RefCell::new(tcp_event_register_receiver)); run_until_disconnection( - || async { - tcp_event_register_receiver.borrow_mut().recv().await + || async { tcp_event_register_receiver.borrow_mut().recv().await }, + |(event, handler)| async { + event_data + .lock() + .unwrap() + .entry(event) + .or_default() + .push(handler); }, - |(event, handler)| async { event_data.lock().unwrap().entry(event).or_default().push(handler); }, || async {}, phase_watcher, - ).await; + ) + .await; } async fn run_until_disconnection<T, F, G, H>( @@ -315,11 +320,10 @@ async fn run_until_disconnection<T, F, G, H>( mut handler: impl FnMut(T) -> G, mut shutdown: impl FnMut() -> H, mut phase_watcher: watch::Receiver<StatePhase>, -) - where - F: Future<Output = Option<T>>, - G: Future<Output = ()>, - H: Future<Output = ()>, +) where + F: Future<Output = Option<T>>, + G: Future<Output = ()>, + H: Future<Output = ()>, { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { diff --git a/mumd/src/notify.rs b/mumd/src/notify.rs index 5bb1a26..82ec6b6 100644 --- a/mumd/src/notify.rs +++ b/mumd/src/notify.rs @@ -5,12 +5,8 @@ pub fn init() { } pub fn send(msg: String) -> bool { - match libnotify::Notification::new( - "mumd", - Some(msg.as_str()), - None, - ).show() { - Ok(_) => { true } + match libnotify::Notification::new("mumd", Some(msg.as_str()), None).show() { + Ok(_) => true, Err(_) => { debug!("Unable to send notification"); false diff --git a/mumd/src/state.rs b/mumd/src/state.rs index ea081fc..81b6c98 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -1,5 +1,5 @@ -pub mod server; pub mod channel; +pub mod server; pub mod user; use crate::audio::Audio; @@ -7,6 +7,7 @@ use crate::network::ConnectionInfo; use crate::notify; use crate::state::server::Server; +use crate::network::tcp::{TcpEvent, TcpEventData}; use log::*; use mumble_protocol::control::msgs; use mumble_protocol::control::ControlPacket; @@ -17,7 +18,6 @@ use mumlib::error::{ChannelIdentifierError, Error}; use mumlib::state::UserDiff; use std::net::ToSocketAddrs; use tokio::sync::{mpsc, watch}; -use crate::network::tcp::{TcpEvent, TcpEventData}; macro_rules! at { ($event:expr, $generator:expr) => { @@ -71,35 +71,53 @@ impl State { pub fn handle_command( &mut self, command: Command, - ) -> (Option<TcpEvent>, Box<dyn FnOnce(Option<&TcpEventData>) -> mumlib::error::Result<Option<CommandResponse>>>) { + ) -> ( + Option<TcpEvent>, + Box<dyn FnOnce(Option<&TcpEventData>) -> mumlib::error::Result<Option<CommandResponse>>>, + ) { match command { Command::ChannelJoin { channel_identifier } => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) { return now!(Err(Error::DisconnectedError)); } - let channels = self.server() - .unwrap() - .channels(); + let channels = self.server().unwrap().channels(); - let matches = channels.iter() + let matches = channels + .iter() .map(|e| (e.0, e.1.path(channels))) .filter(|e| e.1.ends_with(&channel_identifier)) .collect::<Vec<_>>(); let id = match matches.len() { 0 => { - let soft_matches = channels.iter() + let soft_matches = channels + .iter() .map(|e| (e.0, e.1.path(channels).to_lowercase())) .filter(|e| e.1.ends_with(&channel_identifier.to_lowercase())) .collect::<Vec<_>>(); match soft_matches.len() { - 0 => return now!(Err(Error::ChannelIdentifierError(channel_identifier, ChannelIdentifierError::Invalid))), + 0 => { + return now!(Err(Error::ChannelIdentifierError( + channel_identifier, + ChannelIdentifierError::Invalid + ))) + } 1 => *soft_matches.get(0).unwrap().0, - _ => return now!(Err(Error::ChannelIdentifierError(channel_identifier, ChannelIdentifierError::Invalid))), + _ => { + return now!(Err(Error::ChannelIdentifierError( + channel_identifier, + ChannelIdentifierError::Invalid + ))) + } } - }, + } 1 => *matches.get(0).unwrap().0, - _ => return now!(Err(Error::ChannelIdentifierError(channel_identifier, ChannelIdentifierError::Ambiguous))), + _ => { + return now!(Err(Error::ChannelIdentifierError( + channel_identifier, + ChannelIdentifierError::Ambiguous + ))) + } }; let mut msg = msgs::UserState::new(); @@ -116,11 +134,7 @@ impl State { self.server.as_ref().unwrap().channels(), self.server.as_ref().unwrap().users(), ); - now!( - Ok(Some(CommandResponse::ChannelList { - channels: list, - })) - ) + now!(Ok(Some(CommandResponse::ChannelList { channels: list }))) } Command::ServerConnect { host, @@ -157,14 +171,15 @@ impl State { accept_invalid_cert, ))) .unwrap(); - at!(TcpEvent::Connected, |e| { //runs the closure when the client is connected + at!(TcpEvent::Connected, |e| { + //runs the closure when the client is connected if let Some(TcpEventData::Connected(msg)) = e { Ok(Some(CommandResponse::ServerConnect { welcome_message: if msg.has_welcome_text() { Some(msg.get_welcome_text().to_string()) } else { None - } + }, })) } else { unreachable!("callback should be provided with a TcpEventData::Connected"); @@ -176,11 +191,9 @@ impl State { return now!(Err(Error::DisconnectedError)); } let state = self.server.as_ref().unwrap().into(); - now!( - Ok(Some(CommandResponse::Status { - server_state: state, //guaranteed not to panic because if we are connected, server is guaranteed to be Some - })) - ) + now!(Ok(Some(CommandResponse::Status { + server_state: state, //guaranteed not to panic because if we are connected, server is guaranteed to be Some + }))) } Command::ServerDisconnect => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) { @@ -240,15 +253,27 @@ impl State { 0 }; if let Some(channel) = self.server().unwrap().channels().get(&channel_id) { - notify::send(format!("{} connected and joined {}", &msg.get_name(), channel.name())); + notify::send(format!( + "{} connected and joined {}", + &msg.get_name(), + channel.name() + )); } } } - self.server_mut().unwrap().users_mut().insert(session, user::User::new(msg)); + self.server_mut() + .unwrap() + .users_mut() + .insert(session, user::User::new(msg)); } fn parse_updated_user_state(&mut self, session: u32, msg: msgs::UserState) -> UserDiff { - let user = self.server_mut().unwrap().users_mut().get_mut(&session).unwrap(); + let user = self + .server_mut() + .unwrap() + .users_mut() + .get_mut(&session) + .unwrap(); let mute = if msg.has_self_mute() && user.self_mute() != msg.get_self_mute() { Some(msg.get_self_mute()) @@ -270,9 +295,10 @@ impl State { if let Some(channel_id) = diff.channel_id { if let Some(channel) = self.server().unwrap().channels().get(&channel_id) { notify::send(format!( - "{} moved to channel {}", - &user.name(), - channel.name())); + "{} moved to channel {}", + &user.name(), + channel.name() + )); } else { warn!("{} moved to invalid channel {}", &user.name(), channel_id); } @@ -280,28 +306,27 @@ impl State { // send notification if a user muted/unmuted //TODO our channel only - let notif_desc = - if let Some(deaf) = deaf { - if deaf { - Some(format!("{} muted and deafend themselves", &user.name())) - } else if !deaf { - Some(format!("{} unmuted and undeafend themselves", &user.name())) - } else { - warn!("Invalid user state received"); - None - } - } else if let Some(mute) = mute { - if mute { - Some(format!("{} muted themselves", &user.name())) - } else if !mute { - Some(format!("{} unmuted themselves", &user.name())) - } else { - warn!("Invalid user state received"); - None - } + let notif_desc = if let Some(deaf) = deaf { + if deaf { + Some(format!("{} muted and deafend themselves", &user.name())) + } else if !deaf { + Some(format!("{} unmuted and undeafend themselves", &user.name())) } else { + warn!("Invalid user state received"); None - }; + } + } else if let Some(mute) = mute { + if mute { + Some(format!("{} muted themselves", &user.name())) + } else if !mute { + Some(format!("{} unmuted themselves", &user.name())) + } else { + warn!("Invalid user state received"); + None + } + } else { + None + }; if let Some(notif_desc) = notif_desc { notify::send(notif_desc); } @@ -319,7 +344,10 @@ impl State { } self.audio().remove_client(msg.get_session()); - self.server_mut().unwrap().users_mut().remove(&msg.get_session()); + self.server_mut() + .unwrap() + .users_mut() + .remove(&msg.get_session()); info!("User {} disconnected", msg.get_session()); } |
