aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2020-10-29 21:28:12 +0100
committerGustav Sörnäs <gustav@sornas.net>2020-10-29 21:28:12 +0100
commit4010e0f5abb28bae3207c78dba74f0896eedea51 (patch)
tree2e1c215705d04f86e47a1a0a9015dfd6ed6feeaf /mumd/src
parent3e7e375e65760a03b6692106ab0ed806ca65e470 (diff)
downloadmum-4010e0f5abb28bae3207c78dba74f0896eedea51.tar.gz
cargo fmt
Diffstat (limited to 'mumd/src')
-rw-r--r--mumd/src/audio.rs26
-rw-r--r--mumd/src/audio/input.rs8
-rw-r--r--mumd/src/command.rs15
-rw-r--r--mumd/src/network/tcp.rs58
-rw-r--r--mumd/src/notify.rs8
-rw-r--r--mumd/src/state.rs130
6 files changed, 140 insertions, 105 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index 7a673ff..ad4a762 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -33,12 +33,13 @@ impl Audio {
let output_supported_config = output_device
.supported_output_configs()
.expect("error querying output configs")
- .find_map(|c|
- if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
- Some(c)
- } else {
- None
- })
+ .find_map(|c| {
+ if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
+ Some(c)
+ } else {
+ None
+ }
+ })
.unwrap()
.with_sample_rate(sample_rate);
let output_supported_sample_format = output_supported_config.sample_format();
@@ -50,12 +51,13 @@ impl Audio {
let input_supported_config = input_device
.supported_input_configs()
.expect("error querying output configs")
- .find_map(|c|
- if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
- Some(c)
- } else {
- None
- })
+ .find_map(|c| {
+ if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
+ Some(c)
+ } else {
+ None
+ }
+ })
.unwrap()
.with_sample_rate(sample_rate);
let input_supported_sample_format = input_supported_config.sample_format();
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
index 4e95360..7405fdb 100644
--- a/mumd/src/audio/input.rs
+++ b/mumd/src/audio/input.rs
@@ -28,9 +28,11 @@ pub fn callback<T: Sample>(
move |data: &[T], _info: &InputCallbackInfo| {
let mut buf = buf.lock().unwrap();
let input_volume = *input_volume_receiver.borrow();
- let out: Vec<f32> = data.iter().map(|e| e.to_f32())
- .map(|e| e * input_volume)
- .collect();
+ let out: Vec<f32> = data
+ .iter()
+ .map(|e| e.to_f32())
+ .map(|e| e * input_volume)
+ .collect();
buf.extend(out);
while buf.len() >= opus_frame_size as usize {
let tail = buf.split_off(opus_frame_size as usize);
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index 5407ea3..d4b25d0 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -1,11 +1,11 @@
use crate::state::State;
+use crate::network::tcp::{TcpEvent, TcpEventCallback};
use ipc_channel::ipc::IpcSender;
use log::*;
use mumlib::command::{Command, CommandResponse};
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};
-use crate::network::tcp::{TcpEvent, TcpEventCallback};
pub async fn handle(
state: Arc<Mutex<State>>,
@@ -24,11 +24,14 @@ pub async fn handle(
if let Some(event) = event {
let (tx, rx) = oneshot::channel();
//TODO handle this error
- let _ = tcp_event_register_sender.send((event, Box::new(move |e| {
- let response = generator(Some(e));
- response_sender.send(response).unwrap();
- tx.send(()).unwrap();
- })));
+ let _ = tcp_event_register_sender.send((
+ event,
+ Box::new(move |e| {
+ let response = generator(Some(e));
+ response_sender.send(response).unwrap();
+ tx.send(()).unwrap();
+ }),
+ ));
rx.await.unwrap();
} else {
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 630f46a..cd11690 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -7,18 +7,18 @@ use futures_util::stream::{SplitSink, SplitStream};
use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket};
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::{Clientbound, Serverbound};
+use std::cell::RefCell;
+use std::collections::HashMap;
use std::convert::{Into, TryInto};
+use std::future::Future;
use std::net::SocketAddr;
+use std::rc::Rc;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::{self, Duration};
use tokio_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
-use std::collections::HashMap;
-use std::future::Future;
-use std::rc::Rc;
-use std::cell::RefCell;
type TcpSender = SplitSink<
Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>,
@@ -31,7 +31,7 @@ pub(crate) type TcpEventCallback = Box<dyn FnOnce(&TcpEventData)>;
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum TcpEvent {
- Connected, //fires when the client has connected to a server
+ Connected, //fires when the client has connected to a server
Disconnected, //fires when the client has disconnected from a server
}
@@ -131,13 +131,13 @@ async fn send_pings(
delay_seconds: u64,
phase_watcher: watch::Receiver<StatePhase>,
) {
- let interval = Rc::new(RefCell::new(time::interval(Duration::from_secs(delay_seconds))));
+ let interval = Rc::new(RefCell::new(time::interval(Duration::from_secs(
+ delay_seconds,
+ ))));
let packet_sender = Rc::new(RefCell::new(packet_sender));
run_until_disconnection(
- || async {
- Some(interval.borrow_mut().tick().await)
- },
+ || async { Some(interval.borrow_mut().tick().await) },
|_| async {
trace!("Sending ping");
let msg = msgs::Ping::new();
@@ -145,7 +145,8 @@ async fn send_pings(
},
|| async {},
phase_watcher,
- ).await;
+ )
+ .await;
debug!("Ping sender process killed");
}
@@ -158,9 +159,7 @@ async fn send_packets(
let sink = Rc::new(RefCell::new(sink));
let packet_receiver = Rc::new(RefCell::new(packet_receiver));
run_until_disconnection(
- || async {
- packet_receiver.borrow_mut().recv().await
- },
+ || async { packet_receiver.borrow_mut().recv().await },
|packet| async {
sink.borrow_mut().send(packet).await.unwrap();
},
@@ -171,7 +170,8 @@ async fn send_packets(
sink.borrow_mut().close().await.unwrap();
},
phase_watcher,
- ).await;
+ )
+ .await;
debug!("TCP packet sender killed");
}
@@ -188,9 +188,7 @@ async fn listen(
let stream = Rc::new(RefCell::new(stream));
run_until_disconnection(
- || async {
- stream.borrow_mut().next().await
- },
+ || async { stream.borrow_mut().next().await },
|packet| async {
match packet.unwrap() {
ControlPacket::TextMessage(msg) => {
@@ -289,7 +287,8 @@ async fn listen(
}
},
phase_watcher,
- ).await;
+ )
+ .await;
debug!("Killing TCP listener block");
}
@@ -301,13 +300,19 @@ async fn register_events(
) {
let tcp_event_register_receiver = Rc::new(RefCell::new(tcp_event_register_receiver));
run_until_disconnection(
- || async {
- tcp_event_register_receiver.borrow_mut().recv().await
+ || async { tcp_event_register_receiver.borrow_mut().recv().await },
+ |(event, handler)| async {
+ event_data
+ .lock()
+ .unwrap()
+ .entry(event)
+ .or_default()
+ .push(handler);
},
- |(event, handler)| async { event_data.lock().unwrap().entry(event).or_default().push(handler); },
|| async {},
phase_watcher,
- ).await;
+ )
+ .await;
}
async fn run_until_disconnection<T, F, G, H>(
@@ -315,11 +320,10 @@ async fn run_until_disconnection<T, F, G, H>(
mut handler: impl FnMut(T) -> G,
mut shutdown: impl FnMut() -> H,
mut phase_watcher: watch::Receiver<StatePhase>,
-)
- where
- F: Future<Output = Option<T>>,
- G: Future<Output = ()>,
- H: Future<Output = ()>,
+) where
+ F: Future<Output = Option<T>>,
+ G: Future<Output = ()>,
+ H: Future<Output = ()>,
{
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
diff --git a/mumd/src/notify.rs b/mumd/src/notify.rs
index 5bb1a26..82ec6b6 100644
--- a/mumd/src/notify.rs
+++ b/mumd/src/notify.rs
@@ -5,12 +5,8 @@ pub fn init() {
}
pub fn send(msg: String) -> bool {
- match libnotify::Notification::new(
- "mumd",
- Some(msg.as_str()),
- None,
- ).show() {
- Ok(_) => { true }
+ match libnotify::Notification::new("mumd", Some(msg.as_str()), None).show() {
+ Ok(_) => true,
Err(_) => {
debug!("Unable to send notification");
false
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index ea081fc..81b6c98 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -1,5 +1,5 @@
-pub mod server;
pub mod channel;
+pub mod server;
pub mod user;
use crate::audio::Audio;
@@ -7,6 +7,7 @@ use crate::network::ConnectionInfo;
use crate::notify;
use crate::state::server::Server;
+use crate::network::tcp::{TcpEvent, TcpEventData};
use log::*;
use mumble_protocol::control::msgs;
use mumble_protocol::control::ControlPacket;
@@ -17,7 +18,6 @@ use mumlib::error::{ChannelIdentifierError, Error};
use mumlib::state::UserDiff;
use std::net::ToSocketAddrs;
use tokio::sync::{mpsc, watch};
-use crate::network::tcp::{TcpEvent, TcpEventData};
macro_rules! at {
($event:expr, $generator:expr) => {
@@ -71,35 +71,53 @@ impl State {
pub fn handle_command(
&mut self,
command: Command,
- ) -> (Option<TcpEvent>, Box<dyn FnOnce(Option<&TcpEventData>) -> mumlib::error::Result<Option<CommandResponse>>>) {
+ ) -> (
+ Option<TcpEvent>,
+ Box<dyn FnOnce(Option<&TcpEventData>) -> mumlib::error::Result<Option<CommandResponse>>>,
+ ) {
match command {
Command::ChannelJoin { channel_identifier } => {
if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) {
return now!(Err(Error::DisconnectedError));
}
- let channels = self.server()
- .unwrap()
- .channels();
+ let channels = self.server().unwrap().channels();
- let matches = channels.iter()
+ let matches = channels
+ .iter()
.map(|e| (e.0, e.1.path(channels)))
.filter(|e| e.1.ends_with(&channel_identifier))
.collect::<Vec<_>>();
let id = match matches.len() {
0 => {
- let soft_matches = channels.iter()
+ let soft_matches = channels
+ .iter()
.map(|e| (e.0, e.1.path(channels).to_lowercase()))
.filter(|e| e.1.ends_with(&channel_identifier.to_lowercase()))
.collect::<Vec<_>>();
match soft_matches.len() {
- 0 => return now!(Err(Error::ChannelIdentifierError(channel_identifier, ChannelIdentifierError::Invalid))),
+ 0 => {
+ return now!(Err(Error::ChannelIdentifierError(
+ channel_identifier,
+ ChannelIdentifierError::Invalid
+ )))
+ }
1 => *soft_matches.get(0).unwrap().0,
- _ => return now!(Err(Error::ChannelIdentifierError(channel_identifier, ChannelIdentifierError::Invalid))),
+ _ => {
+ return now!(Err(Error::ChannelIdentifierError(
+ channel_identifier,
+ ChannelIdentifierError::Invalid
+ )))
+ }
}
- },
+ }
1 => *matches.get(0).unwrap().0,
- _ => return now!(Err(Error::ChannelIdentifierError(channel_identifier, ChannelIdentifierError::Ambiguous))),
+ _ => {
+ return now!(Err(Error::ChannelIdentifierError(
+ channel_identifier,
+ ChannelIdentifierError::Ambiguous
+ )))
+ }
};
let mut msg = msgs::UserState::new();
@@ -116,11 +134,7 @@ impl State {
self.server.as_ref().unwrap().channels(),
self.server.as_ref().unwrap().users(),
);
- now!(
- Ok(Some(CommandResponse::ChannelList {
- channels: list,
- }))
- )
+ now!(Ok(Some(CommandResponse::ChannelList { channels: list })))
}
Command::ServerConnect {
host,
@@ -157,14 +171,15 @@ impl State {
accept_invalid_cert,
)))
.unwrap();
- at!(TcpEvent::Connected, |e| { //runs the closure when the client is connected
+ at!(TcpEvent::Connected, |e| {
+ //runs the closure when the client is connected
if let Some(TcpEventData::Connected(msg)) = e {
Ok(Some(CommandResponse::ServerConnect {
welcome_message: if msg.has_welcome_text() {
Some(msg.get_welcome_text().to_string())
} else {
None
- }
+ },
}))
} else {
unreachable!("callback should be provided with a TcpEventData::Connected");
@@ -176,11 +191,9 @@ impl State {
return now!(Err(Error::DisconnectedError));
}
let state = self.server.as_ref().unwrap().into();
- now!(
- Ok(Some(CommandResponse::Status {
- server_state: state, //guaranteed not to panic because if we are connected, server is guaranteed to be Some
- }))
- )
+ now!(Ok(Some(CommandResponse::Status {
+ server_state: state, //guaranteed not to panic because if we are connected, server is guaranteed to be Some
+ })))
}
Command::ServerDisconnect => {
if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) {
@@ -240,15 +253,27 @@ impl State {
0
};
if let Some(channel) = self.server().unwrap().channels().get(&channel_id) {
- notify::send(format!("{} connected and joined {}", &msg.get_name(), channel.name()));
+ notify::send(format!(
+ "{} connected and joined {}",
+ &msg.get_name(),
+ channel.name()
+ ));
}
}
}
- self.server_mut().unwrap().users_mut().insert(session, user::User::new(msg));
+ self.server_mut()
+ .unwrap()
+ .users_mut()
+ .insert(session, user::User::new(msg));
}
fn parse_updated_user_state(&mut self, session: u32, msg: msgs::UserState) -> UserDiff {
- let user = self.server_mut().unwrap().users_mut().get_mut(&session).unwrap();
+ let user = self
+ .server_mut()
+ .unwrap()
+ .users_mut()
+ .get_mut(&session)
+ .unwrap();
let mute = if msg.has_self_mute() && user.self_mute() != msg.get_self_mute() {
Some(msg.get_self_mute())
@@ -270,9 +295,10 @@ impl State {
if let Some(channel_id) = diff.channel_id {
if let Some(channel) = self.server().unwrap().channels().get(&channel_id) {
notify::send(format!(
- "{} moved to channel {}",
- &user.name(),
- channel.name()));
+ "{} moved to channel {}",
+ &user.name(),
+ channel.name()
+ ));
} else {
warn!("{} moved to invalid channel {}", &user.name(), channel_id);
}
@@ -280,28 +306,27 @@ impl State {
// send notification if a user muted/unmuted
//TODO our channel only
- let notif_desc =
- if let Some(deaf) = deaf {
- if deaf {
- Some(format!("{} muted and deafend themselves", &user.name()))
- } else if !deaf {
- Some(format!("{} unmuted and undeafend themselves", &user.name()))
- } else {
- warn!("Invalid user state received");
- None
- }
- } else if let Some(mute) = mute {
- if mute {
- Some(format!("{} muted themselves", &user.name()))
- } else if !mute {
- Some(format!("{} unmuted themselves", &user.name()))
- } else {
- warn!("Invalid user state received");
- None
- }
+ let notif_desc = if let Some(deaf) = deaf {
+ if deaf {
+ Some(format!("{} muted and deafend themselves", &user.name()))
+ } else if !deaf {
+ Some(format!("{} unmuted and undeafend themselves", &user.name()))
} else {
+ warn!("Invalid user state received");
None
- };
+ }
+ } else if let Some(mute) = mute {
+ if mute {
+ Some(format!("{} muted themselves", &user.name()))
+ } else if !mute {
+ Some(format!("{} unmuted themselves", &user.name()))
+ } else {
+ warn!("Invalid user state received");
+ None
+ }
+ } else {
+ None
+ };
if let Some(notif_desc) = notif_desc {
notify::send(notif_desc);
}
@@ -319,7 +344,10 @@ impl State {
}
self.audio().remove_client(msg.get_session());
- self.server_mut().unwrap().users_mut().remove(&msg.get_session());
+ self.server_mut()
+ .unwrap()
+ .users_mut()
+ .remove(&msg.get_session());
info!("User {} disconnected", msg.get_session());
}