aboutsummaryrefslogtreecommitdiffstats
path: root/mumd
diff options
context:
space:
mode:
authorKapten Z∅∅m <55669224+default-username-852@users.noreply.github.com>2021-01-04 22:46:58 +0100
committerGitHub <noreply@github.com>2021-01-04 22:46:58 +0100
commit50b322f4ef974765a2948dfb08b1c9e8128b1bed (patch)
tree88d41a434b3a0c242ac7b35c6afefff0f75ee656 /mumd
parent1af9b90133a8d6102a09102bbd6f726f598c24fc (diff)
parentbe7748be2f1e9d1e88ebd093da9eec16d1ad4049 (diff)
downloadmum-50b322f4ef974765a2948dfb08b1c9e8128b1bed.tar.gz
Merge branch 'main' into noise-gate
Diffstat (limited to 'mumd')
-rw-r--r--mumd/src/audio.rs2
-rw-r--r--mumd/src/client.rs55
-rw-r--r--mumd/src/command.rs17
-rw-r--r--mumd/src/main.rs43
-rw-r--r--mumd/src/network/tcp.rs4
-rw-r--r--mumd/src/network/udp.rs29
-rw-r--r--mumd/src/state.rs32
-rw-r--r--mumd/src/state/channel.rs10
-rw-r--r--mumd/src/state/server.rs2
9 files changed, 104 insertions, 90 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index bf3f7f4..83818d5 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -25,7 +25,7 @@ use tokio::sync::watch;
use mumble_protocol::Serverbound;
//TODO? move to mumlib
-pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[
+pub const EVENT_SOUNDS: &[(&[u8], NotificationEvents)] = &[
(include_bytes!("resources/connect.wav"), NotificationEvents::ServerConnect),
(
include_bytes!("resources/disconnect.wav"),
diff --git a/mumd/src/client.rs b/mumd/src/client.rs
new file mode 100644
index 0000000..3613061
--- /dev/null
+++ b/mumd/src/client.rs
@@ -0,0 +1,55 @@
+use crate::command;
+use crate::network::{tcp, udp, ConnectionInfo};
+use crate::state::State;
+
+use futures_util::join;
+use ipc_channel::ipc::IpcSender;
+use mumble_protocol::{Serverbound, control::ControlPacket, crypt::ClientCryptState};
+use mumlib::command::{Command, CommandResponse};
+use std::sync::{Arc, Mutex};
+use tokio::sync::{mpsc, watch};
+
+pub async fn handle(
+ command_receiver: mpsc::UnboundedReceiver<(
+ Command,
+ IpcSender<mumlib::error::Result<Option<CommandResponse>>>,
+ )>,
+) {
+ let (connection_info_sender, connection_info_receiver) =
+ watch::channel::<Option<ConnectionInfo>>(None);
+ let (crypt_state_sender, crypt_state_receiver) =
+ mpsc::channel::<ClientCryptState>(1);
+ let (packet_sender, packet_receiver) =
+ mpsc::unbounded_channel::<ControlPacket<Serverbound>>();
+ let (ping_request_sender, ping_request_receiver) =
+ mpsc::unbounded_channel();
+ let (response_sender, response_receiver) =
+ mpsc::unbounded_channel();
+
+ let state = State::new();
+ let state = Arc::new(Mutex::new(state));
+ join!(
+ tcp::handle(
+ Arc::clone(&state),
+ connection_info_receiver.clone(),
+ crypt_state_sender,
+ packet_sender.clone(),
+ packet_receiver,
+ response_receiver,
+ ),
+ udp::handle(
+ Arc::clone(&state),
+ connection_info_receiver.clone(),
+ crypt_state_receiver,
+ ),
+ command::handle(
+ state,
+ command_receiver,
+ response_sender,
+ ping_request_sender,
+ packet_sender,
+ connection_info_sender,
+ ),
+ udp::handle_pings(ping_request_receiver),
+ );
+}
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index 330e3fc..e77b34b 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -1,13 +1,16 @@
+use crate::network::{
+ ConnectionInfo,
+ tcp::{TcpEvent, TcpEventCallback},
+ udp::PingRequest
+};
use crate::state::{ExecutionContext, State};
-use crate::network::tcp::{TcpEvent, TcpEventCallback};
use ipc_channel::ipc::IpcSender;
use log::*;
-use mumble_protocol::ping::PongPacket;
+use mumble_protocol::{Serverbound, control::ControlPacket};
use mumlib::command::{Command, CommandResponse};
-use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, watch};
pub async fn handle(
state: Arc<Mutex<State>>,
@@ -16,13 +19,15 @@ pub async fn handle(
IpcSender<mumlib::error::Result<Option<CommandResponse>>>,
)>,
tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>,
- ping_request_sender: mpsc::UnboundedSender<(u64, SocketAddr, Box<dyn FnOnce(PongPacket)>)>,
+ ping_request_sender: mpsc::UnboundedSender<PingRequest>,
+ mut packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
+ mut connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
) {
debug!("Begin listening for commands");
while let Some((command, response_sender)) = command_receiver.recv().await {
debug!("Received command {:?}", command);
let mut state = state.lock().unwrap();
- let event = state.handle_command(command);
+ let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender);
drop(state);
match event {
ExecutionContext::TcpEvent(event, generator) => {
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index db6d2ef..67481f9 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -1,23 +1,17 @@
mod audio;
+mod client;
mod command;
mod network;
mod notify;
mod state;
-use crate::network::ConnectionInfo;
-use crate::state::State;
-
use futures::join;
use ipc_channel::ipc::{self, IpcOneShotServer, IpcSender};
use log::*;
-use mumble_protocol::control::ControlPacket;
-use mumble_protocol::crypt::ClientCryptState;
-use mumble_protocol::voice::Serverbound;
use mumlib::command::{Command, CommandResponse};
use mumlib::setup_logger;
use std::fs;
-use std::sync::{Arc, Mutex};
-use tokio::sync::{mpsc, watch};
+use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
#[tokio::main]
@@ -45,46 +39,17 @@ async fn main() {
}
}
- // Oneshot channel for setting UDP CryptState from control task
- // For simplicity we don't deal with re-syncing, real applications would have to.
- let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::<ClientCryptState>(1); // crypt state should always be consumed before sending a new one
- let (packet_sender, packet_receiver) = mpsc::unbounded_channel::<ControlPacket<Serverbound>>();
let (command_sender, command_receiver) = mpsc::unbounded_channel::<(
Command,
IpcSender<mumlib::error::Result<Option<CommandResponse>>>,
)>();
- let (connection_info_sender, connection_info_receiver) =
- watch::channel::<Option<ConnectionInfo>>(None);
- let (response_sender, response_receiver) = mpsc::unbounded_channel();
- let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel();
-
- let state = State::new(packet_sender, connection_info_sender);
- let state = Arc::new(Mutex::new(state));
- let (_, _, _, e, _) = join!(
- network::tcp::handle(
- Arc::clone(&state),
- connection_info_receiver.clone(),
- crypt_state_sender,
- packet_receiver,
- response_receiver,
- ),
- network::udp::handle(
- Arc::clone(&state),
- connection_info_receiver.clone(),
- crypt_state_receiver,
- ),
- command::handle(
- state,
- command_receiver,
- response_sender,
- ping_request_sender,
- ),
+ let (_, e) = join!(
+ client::handle(command_receiver),
spawn_blocking(move || {
// IpcSender is blocking
receive_oneshot_commands(command_sender);
}),
- network::udp::handle_pings(ping_request_receiver),
);
e.unwrap();
}
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 3c96ee1..47ea311 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -44,6 +44,7 @@ pub async fn handle(
state: Arc<Mutex<State>>,
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
crypt_state_sender: mpsc::Sender<ClientCryptState>,
+ packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>,
mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>,
) {
@@ -67,14 +68,13 @@ pub async fn handle(
let state_lock = state.lock().unwrap();
authenticate(&mut sink, state_lock.username().unwrap().to_string()).await;
let phase_watcher = state_lock.phase_receiver();
- let packet_sender = state_lock.packet_sender();
drop(state_lock);
let event_queue = Arc::new(Mutex::new(HashMap::new()));
info!("Logging in...");
join!(
- send_pings(packet_sender, 10, phase_watcher.clone()),
+ send_pings(packet_sender.clone(), 10, phase_watcher.clone()),
listen(
Arc::clone(&state),
stream,
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 4dde268..0c00029 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -1,10 +1,10 @@
use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
-use log::*;
use bytes::Bytes;
use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt, Stream};
use futures_util::stream::{SplitSink, SplitStream};
+use log::*;
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::ping::{PingPacket, PongPacket};
use mumble_protocol::voice::{VoicePacket, VoicePacketPayload};
@@ -18,6 +18,8 @@ use tokio::net::UdpSocket;
use tokio::sync::{mpsc, oneshot, watch};
use tokio_util::udp::UdpFramed;
+pub type PingRequest = (u64, SocketAddr, Box<dyn FnOnce(PongPacket)>);
+
type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>;
type UdpReceiver = SplitStream<UdpFramed<ClientCryptState>>;
@@ -90,17 +92,14 @@ async fn new_crypt_state(
source: Arc<Mutex<UdpReceiver>>,
) {
loop {
- match crypt_state.recv().await {
- Some(crypt_state) => {
- info!("Received new crypt state");
- let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
- .await
- .expect("Failed to bind UDP socket");
- let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split();
- *sink.lock().unwrap() = new_sink;
- *source.lock().unwrap() = new_source;
- },
- None => {},
+ if let Some(crypt_state) = crypt_state.recv().await {
+ info!("Received new crypt state");
+ let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
+ .await
+ .expect("Failed to bind UDP socket");
+ let (new_sink, new_source) = UdpFramed::new(udp_socket, crypt_state).split();
+ *sink.lock().unwrap() = new_sink;
+ *source.lock().unwrap() = new_source;
}
}
}
@@ -248,11 +247,7 @@ async fn send_voice(
}
pub async fn handle_pings(
- mut ping_request_receiver: mpsc::UnboundedReceiver<(
- u64,
- SocketAddr,
- Box<dyn FnOnce(PongPacket)>,
- )>,
+ mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>,
) {
let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
.await
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index 85e5449..d1f64a9 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -57,17 +57,11 @@ pub struct State {
server: Option<Server>,
audio: Audio,
- packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
- connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
-
phase_watcher: (watch::Sender<StatePhase>, watch::Receiver<StatePhase>),
}
impl State {
- pub fn new(
- packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
- connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
- ) -> Self {
+ pub fn new() -> Self {
let config = mumlib::config::read_default_cfg();
let audio = Audio::new(
config.audio.input_volume.unwrap_or(1.0),
@@ -77,15 +71,18 @@ impl State {
config,
server: None,
audio,
- packet_sender,
- connection_info_sender,
phase_watcher: watch::channel(StatePhase::Disconnected),
};
state.reload_config();
state
}
- pub fn handle_command(&mut self, command: Command) -> ExecutionContext {
+ pub fn handle_command(
+ &mut self,
+ command: Command,
+ packet_sender: &mut mpsc::UnboundedSender<ControlPacket<Serverbound>>,
+ connection_info_sender: &mut watch::Sender<Option<ConnectionInfo>>,
+ ) -> ExecutionContext {
match command {
Command::ChannelJoin { channel_identifier } => {
if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) {
@@ -134,7 +131,7 @@ impl State {
let mut msg = msgs::UserState::new();
msg.set_session(self.server.as_ref().unwrap().session_id().unwrap());
msg.set_channel_id(id);
- self.packet_sender.send(msg.into()).unwrap();
+ packet_sender.send(msg.into()).unwrap();
now!(Ok(None))
}
Command::ChannelList => {
@@ -200,7 +197,7 @@ impl State {
let server = self.server_mut().unwrap();
server.set_muted(mute);
server.set_deafened(deafen);
- self.packet_sender.send(msg.into()).unwrap();
+ packet_sender.send(msg.into()).unwrap();
}
now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b })))
@@ -219,7 +216,7 @@ impl State {
.unwrap()
.users_mut()
.iter_mut()
- .find(|(_, user)| user.name() == &string);
+ .find(|(_, user)| user.name() == string);
let (id, user) = match id {
Some(id) => (*id.0, id.1),
@@ -294,7 +291,7 @@ impl State {
let server = self.server_mut().unwrap();
server.set_muted(mute);
server.set_deafened(deafen);
- self.packet_sender.send(msg.into()).unwrap();
+ packet_sender.send(msg.into()).unwrap();
}
now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b })))
@@ -334,7 +331,7 @@ impl State {
return now!(Err(Error::InvalidServerAddrError(host, port)));
}
};
- self.connection_info_sender
+ connection_info_sender
.send(Some(ConnectionInfo::new(
socket_addr,
host,
@@ -408,7 +405,7 @@ impl State {
.unwrap()
.users()
.iter()
- .find(|e| e.1.name() == &string)
+ .find(|e| e.1.name() == string)
.map(|e| *e.0)
{
None => return now!(Err(Error::InvalidUsernameError(string))),
@@ -592,9 +589,6 @@ impl State {
pub fn audio_mut(&mut self) -> &mut Audio {
&mut self.audio
}
- pub fn packet_sender(&self) -> mpsc::UnboundedSender<ControlPacket<Serverbound>> {
- self.packet_sender.clone()
- }
pub fn phase_receiver(&self) -> watch::Receiver<StatePhase> {
self.phase_watcher.1.clone()
}
diff --git a/mumd/src/state/channel.rs b/mumd/src/state/channel.rs
index 8bbf919..5b6d669 100644
--- a/mumd/src/state/channel.rs
+++ b/mumd/src/state/channel.rs
@@ -88,7 +88,7 @@ impl<'a> ProtoTree<'a> {
users: Vec::new(),
});
pt.channel = Some(channel);
- pt.users = users.get(&node).map(|e| e.clone()).unwrap_or(Vec::new());
+ pt.users = users.get(&node).cloned().unwrap_or_default();
}
longer => {
self.children
@@ -135,7 +135,7 @@ pub fn into_channel(
for user in users.values() {
channel_lookup
.entry(user.channel())
- .or_insert(Vec::new())
+ .or_insert_with(Vec::new)
.push(user);
}
@@ -148,7 +148,7 @@ pub fn into_channel(
}
walk.reverse();
- if walk.len() > 0 {
+ if !walk.is_empty() {
walks.push((walk, channel));
}
}
@@ -159,8 +159,8 @@ pub fn into_channel(
children: HashMap::new(),
users: channel_lookup
.get(&0)
- .map(|e| e.clone())
- .unwrap_or(Vec::new()),
+ .cloned()
+ .unwrap_or_default(),
};
for (walk, channel) in walks {
diff --git a/mumd/src/state/server.rs b/mumd/src/state/server.rs
index a065df0..8a256b6 100644
--- a/mumd/src/state/server.rs
+++ b/mumd/src/state/server.rs
@@ -107,7 +107,7 @@ impl Server {
}
pub fn username(&self) -> Option<&str> {
- self.username.as_ref().map(|e| e.as_str())
+ self.username.as_deref()
}
pub fn username_mut(&mut self) -> &mut Option<String> {