aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src
diff options
context:
space:
mode:
Diffstat (limited to 'mumd/src')
-rw-r--r--mumd/src/audio.rs39
-rw-r--r--mumd/src/audio/input.rs35
-rw-r--r--mumd/src/audio/output.rs134
-rw-r--r--mumd/src/audio/transformers.rs7
-rw-r--r--mumd/src/client.rs2
-rw-r--r--mumd/src/error.rs6
-rw-r--r--mumd/src/main.rs38
-rw-r--r--mumd/src/network.rs4
-rw-r--r--mumd/src/network/tcp.rs41
-rw-r--r--mumd/src/network/udp.rs19
-rw-r--r--mumd/src/state.rs43
-rw-r--r--mumd/src/state/channel.rs13
-rw-r--r--mumd/src/state/server.rs2
-rw-r--r--mumd/src/state/user.rs2
14 files changed, 279 insertions, 106 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index 2e20583..6860741 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -1,3 +1,7 @@
+//! All things audio.
+//!
+//! Audio is handled mostly as signals from [dasp_signal]. Input/output is handled by [cpal].
+
pub mod input;
pub mod output;
pub mod transformers;
@@ -27,8 +31,11 @@ use strum::IntoEnumIterator;
use strum_macros::EnumIter;
use tokio::sync::watch;
+/// The sample rate used internally.
const SAMPLE_RATE: u32 = 48000;
+/// All types of notifications that can be shown. Each notification can be bound to its own audio
+/// file.
#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, EnumIter)]
pub enum NotificationEvents {
ServerConnect,
@@ -65,9 +72,12 @@ impl TryFrom<&str> for NotificationEvents {
}
}
+/// Input audio state. Input audio is picket up from an [AudioInputDevice] (e.g.
+/// a microphone) and sent over the network.
pub struct AudioInput {
device: DefaultAudioInputDevice,
+ /// Outgoing voice packets that should be sent over the network.
channel_receiver:
Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>,
}
@@ -112,12 +122,30 @@ impl AudioInput {
}
}
+impl Debug for AudioInput {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("AudioInput")
+ .field("device", &self.device)
+ .field("channel_receiver", &"receiver")
+ .finish()
+ }
+}
+
+#[derive(Debug)]
+/// Audio output state. The audio is received from each client over the network,
+/// decoded, merged and finally played to an [AudioOutputDevice] (e.g. speaker,
+/// headphones, ...).
pub struct AudioOutput {
device: DefaultAudioOutputDevice,
+ /// The volume and mute-status of a user ID.
user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
+ /// The client stream per user ID. A separate stream is kept for UDP and TCP.
+ ///
+ /// Shared with [DefaultAudioOutputDevice].
client_streams: Arc<Mutex<ClientStream>>,
+ /// Which sound effect should be played on an event.
sounds: HashMap<NotificationEvents, Vec<f32>>,
}
@@ -140,6 +168,7 @@ impl AudioOutput {
Ok(res)
}
+ /// Loads sound effects, getting unspecified effects from [get_default_sfx].
pub fn load_sound_effects(&mut self, sound_effects: &[SoundEffect]) {
let overrides: HashMap<_, _> = sound_effects
.iter()
@@ -153,6 +182,7 @@ impl AudioOutput {
})
.collect();
+ // This makes sure that every [NotificationEvent] is present in [self.sounds].
self.sounds = NotificationEvents::iter()
.map(|event| {
let bytes = overrides
@@ -195,6 +225,7 @@ impl AudioOutput {
.collect();
}
+ /// Decodes a voice packet.
pub fn decode_packet_payload(
&self,
stream_type: VoiceStreamType,
@@ -207,10 +238,12 @@ impl AudioOutput {
.decode_packet((stream_type, session_id), payload);
}
+ /// Sets the volume of the output device.
pub fn set_volume(&self, output_volume: f32) {
self.device.set_volume(output_volume);
}
+ /// Sets the incoming volume of a user.
pub fn set_user_volume(&self, id: u32, volume: f32) {
match self.user_volumes.lock().unwrap().entry(id) {
Entry::Occupied(mut entry) => {
@@ -222,6 +255,7 @@ impl AudioOutput {
}
}
+ /// Mutes another user.
pub fn set_mute(&self, id: u32, mute: bool) {
match self.user_volumes.lock().unwrap().entry(id) {
Entry::Occupied(mut entry) => {
@@ -233,12 +267,14 @@ impl AudioOutput {
}
}
+ /// Queues a sound effect.
pub fn play_effect(&self, effect: NotificationEvents) {
let samples = self.sounds.get(&effect).unwrap();
- self.client_streams.lock().unwrap().extend(None, samples);
+ self.client_streams.lock().unwrap().add_sound_effect(samples);
}
}
+/// Reads a sound effect from disk.
// moo
fn get_sfx(file: &str) -> Cow<'static, [u8]> {
let mut buf: Vec<u8> = Vec::new();
@@ -251,6 +287,7 @@ fn get_sfx(file: &str) -> Cow<'static, [u8]> {
}
}
+/// Gets the default sound effect.
fn get_default_sfx() -> Cow<'static, [u8]> {
Cow::from(include_bytes!("fallback_sfx.wav").as_ref())
}
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
index a1227e3..4dfc465 100644
--- a/mumd/src/audio/input.rs
+++ b/mumd/src/audio/input.rs
@@ -1,6 +1,8 @@
+//! Listens to the microphone and sends it to the networking.
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
use log::*;
+use std::fmt::Debug;
use tokio::sync::watch;
use crate::audio::SAMPLE_RATE;
@@ -8,6 +10,7 @@ use crate::audio::transformers::{NoiseGate, Transformer};
use crate::error::{AudioError, AudioStream};
use crate::state::StatePhase;
+/// Generates a callback that receives [Sample]s and sends them as floats to a [futures_channel::mpsc::Sender].
pub fn callback<T: Sample>(
mut input_sender: futures_channel::mpsc::Sender<Vec<u8>>,
mut transformers: Vec<Box<dyn Transformer + Send + 'static>>,
@@ -29,8 +32,8 @@ pub fn callback<T: Sample>(
buffer.extend(data.by_ref().take(buffer_size - buffer.len()));
let encoded = transformers
.iter_mut()
- .try_fold(&mut buffer[..], |acc, e| e.transform(acc))
- .map(|buf| opus_encoder.encode_vec_float(&*buf, buffer_size).unwrap());
+ .try_fold((opus::Channels::Mono, &mut buffer[..]), |acc, e| e.transform(acc))
+ .map(|buf| opus_encoder.encode_vec_float(&*buf.1, buffer_size).unwrap());
if let Some(encoded) = encoded {
if let Err(e) = input_sender.try_send(encoded) {
@@ -43,11 +46,19 @@ pub fn callback<T: Sample>(
}
}
+/// Something that can listen to audio and send it somewhere.
+///
+/// One sample is assumed to be an encoded opus frame. See [opus::Encoder].
pub trait AudioInputDevice {
+ /// Starts the device.
fn play(&self) -> Result<(), AudioError>;
+ /// Stops the device.
fn pause(&self) -> Result<(), AudioError>;
+ /// Sets the input volume of the device.
fn set_volume(&self, volume: f32);
+ /// Returns a receiver to this device's values.
fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>>;
+ /// The amount of channels this device has.
fn num_channels(&self) -> usize;
}
@@ -59,6 +70,7 @@ pub struct DefaultAudioInputDevice {
}
impl DefaultAudioInputDevice {
+ /// Initializes the default audio input.
pub fn new(
input_volume: f32,
phase_watcher: watch::Receiver<StatePhase>,
@@ -160,20 +172,35 @@ impl AudioInputDevice for DefaultAudioInputDevice {
fn play(&self) -> Result<(), AudioError> {
self.stream
.play()
- .map_err(|e| AudioError::InputPlayError(e))
+ .map_err(AudioError::InputPlayError)
}
+
fn pause(&self) -> Result<(), AudioError> {
self.stream
.pause()
- .map_err(|e| AudioError::InputPauseError(e))
+ .map_err(AudioError::InputPauseError)
}
+
fn set_volume(&self, volume: f32) {
self.volume_sender.send(volume).unwrap();
}
+
fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>> {
self.sample_receiver.take()
}
+
fn num_channels(&self) -> usize {
self.channels as usize
}
}
+
+impl Debug for DefaultAudioInputDevice {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("DefaultAudioInputDevice")
+ .field("sample_receiver", &self.sample_receiver)
+ .field("channels", &self.channels)
+ .field("volume_sender", &self.volume_sender)
+ .field("stream", &"cpal::Stream")
+ .finish()
+ }
+}
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs
index a2f6bcc..6cec6fc 100644
--- a/mumd/src/audio/output.rs
+++ b/mumd/src/audio/output.rs
@@ -1,23 +1,79 @@
+//! Receives audio packets from the networking and plays them.
+
use crate::audio::SAMPLE_RATE;
use crate::error::{AudioError, AudioStream};
use crate::network::VoiceStreamType;
+use bytes::Bytes;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
+use dasp_ring_buffer::Bounded;
use log::*;
use mumble_protocol::voice::VoicePacketPayload;
use std::collections::{HashMap, VecDeque};
+use std::fmt::Debug;
+use std::iter;
use std::ops::AddAssign;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
type ClientStreamKey = (VoiceStreamType, u32);
+/// State for decoding audio received from another user.
+#[derive(Debug)]
+pub struct ClientAudioData {
+ buf: Bounded<Vec<f32>>,
+ output_channels: opus::Channels,
+ // We need both since a client can hypothetically send both mono
+ // and stereo packets, and we can't switch a decoder on the fly
+ // to reuse it.
+ mono_decoder: opus::Decoder,
+ stereo_decoder: opus::Decoder,
+}
+
+impl ClientAudioData {
+ pub fn new(sample_rate: u32, output_channels: opus::Channels) -> Self {
+ Self {
+ mono_decoder: opus::Decoder::new(sample_rate, opus::Channels::Mono).unwrap(),
+ stereo_decoder: opus::Decoder::new(sample_rate, opus::Channels::Stereo).unwrap(),
+ output_channels,
+ buf: Bounded::from_full(vec![0.0; sample_rate as usize * output_channels as usize]), //buffer 1 s of audio
+ }
+ }
+
+ pub fn store_packet(&mut self, bytes: Bytes) {
+ let packet_channels = opus::packet::get_nb_channels(&bytes).unwrap();
+ let (decoder, channels) = match packet_channels {
+ opus::Channels::Mono => (&mut self.mono_decoder, 1),
+ opus::Channels::Stereo => (&mut self.stereo_decoder, 2),
+ };
+ let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode
+ let parsed = decoder
+ .decode_float(&bytes, &mut out, false)
+ .expect("Error decoding");
+ out.truncate(parsed);
+ match (packet_channels, self.output_channels) {
+ (opus::Channels::Mono, opus::Channels::Mono) | (opus::Channels::Stereo, opus::Channels::Stereo) => for sample in out {
+ self.buf.push(sample);
+ },
+ (opus::Channels::Mono, opus::Channels::Stereo) => for sample in out {
+ self.buf.push(sample);
+ self.buf.push(sample);
+ },
+ (opus::Channels::Stereo, opus::Channels::Mono) => for sample in out.into_iter().step_by(2) {
+ self.buf.push(sample);
+ },
+ }
+ }
+}
+
+/// Collected state for client opus decoders and sound effects.
+#[derive(Debug)]
pub struct ClientStream {
- buffer_clients: HashMap<ClientStreamKey, (VecDeque<f32>, opus::Decoder)>, //TODO ring buffer?
+ buffer_clients: HashMap<ClientStreamKey, ClientAudioData>,
buffer_effects: VecDeque<f32>,
sample_rate: u32,
- channels: opus::Channels,
+ output_channels: opus::Channels,
}
impl ClientStream {
@@ -31,29 +87,21 @@ impl ClientStream {
buffer_clients: HashMap::new(),
buffer_effects: VecDeque::new(),
sample_rate,
- channels,
+ output_channels: channels,
}
}
- fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque<f32>, opus::Decoder) {
- let sample_rate = self.sample_rate;
- let channels = self.channels;
- self.buffer_clients.entry(client).or_insert_with(|| {
- let opus_decoder = opus::Decoder::new(sample_rate, channels).unwrap();
- (VecDeque::new(), opus_decoder)
- })
+ fn get_client(&mut self, client: ClientStreamKey) -> &mut ClientAudioData {
+ self.buffer_clients.entry(client).or_insert(
+ ClientAudioData::new(self.sample_rate, self.output_channels)
+ )
}
+ /// Decodes a voice packet.
pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) {
match payload {
VoicePacketPayload::Opus(bytes, _eot) => {
- let mut out: Vec<f32> = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode
- let (buffer, decoder) = self.get_client(client);
- let parsed = decoder
- .decode_float(&bytes, &mut out, false)
- .expect("Error decoding");
- out.truncate(parsed);
- buffer.extend(&out);
+ self.get_client(client).store_packet(bytes);
}
_ => {
unimplemented!("Payload type not supported");
@@ -61,16 +109,19 @@ impl ClientStream {
}
}
- pub fn extend(&mut self, client: Option<ClientStreamKey>, values: &[f32]) {
- let buffer = match client {
- Some(x) => &mut self.get_client(x).0,
- None => &mut self.buffer_effects,
- };
- buffer.extend(values.iter().copied());
+ /// Extends the sound effect buffer queue with some received values.
+ pub fn add_sound_effect(&mut self, values: &[f32]) {
+ self.buffer_effects.extend(values.iter().copied());
}
}
+/// Adds two values in some saturating way.
+///
+/// Since we support [f32], [i16] and [u16] we need some way of adding two values
+/// without peaking above/below the edge values. This trait ensures that we can
+/// use all three primitive types as a generic parameter.
pub trait SaturatingAdd {
+ /// Adds two values in some saturating way. See trait documentation.
fn saturating_add(self, rhs: Self) -> Self;
}
@@ -104,14 +155,20 @@ pub trait AudioOutputDevice {
fn client_streams(&self) -> Arc<Mutex<ClientStream>>;
}
+/// The default audio output device, as determined by [cpal].
pub struct DefaultAudioOutputDevice {
config: StreamConfig,
stream: cpal::Stream,
+ /// The client stream per user ID. A separate stream is kept for UDP and TCP.
+ ///
+ /// Shared with [super::AudioOutput].
client_streams: Arc<Mutex<ClientStream>>,
+ /// Output volume configuration.
volume_sender: watch::Sender<f32>,
}
impl DefaultAudioOutputDevice {
+ /// Initializes the default audio output.
pub fn new(
output_volume: f32,
user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
@@ -148,7 +205,7 @@ impl DefaultAudioOutputDevice {
let output_stream = match output_supported_sample_format {
SampleFormat::F32 => output_device.build_output_stream(
&output_config,
- curry_callback::<f32>(
+ callback::<f32>(
Arc::clone(&client_streams),
output_volume_receiver,
user_volumes,
@@ -157,7 +214,7 @@ impl DefaultAudioOutputDevice {
),
SampleFormat::I16 => output_device.build_output_stream(
&output_config,
- curry_callback::<i16>(
+ callback::<i16>(
Arc::clone(&client_streams),
output_volume_receiver,
user_volumes,
@@ -166,7 +223,7 @@ impl DefaultAudioOutputDevice {
),
SampleFormat::U16 => output_device.build_output_stream(
&output_config,
- curry_callback::<u16>(
+ callback::<u16>(
Arc::clone(&client_streams),
output_volume_receiver,
user_volumes,
@@ -189,13 +246,13 @@ impl AudioOutputDevice for DefaultAudioOutputDevice {
fn play(&self) -> Result<(), AudioError> {
self.stream
.play()
- .map_err(|e| AudioError::OutputPlayError(e))
+ .map_err(AudioError::OutputPlayError)
}
fn pause(&self) -> Result<(), AudioError> {
self.stream
.pause()
- .map_err(|e| AudioError::OutputPauseError(e))
+ .map_err(AudioError::OutputPauseError)
}
fn set_volume(&self, volume: f32) {
@@ -211,7 +268,9 @@ impl AudioOutputDevice for DefaultAudioOutputDevice {
}
}
-pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>(
+/// Returns a function that fills a buffer with audio from client streams
+/// modified according to some audio configuration.
+pub fn callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>(
user_bufs: Arc<Mutex<ClientStream>>,
output_volume_receiver: watch::Receiver<f32>,
user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
@@ -227,10 +286,10 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>
let user_volumes = user_volumes.lock().unwrap();
for (k, v) in user_bufs.buffer_clients.iter_mut() {
let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false));
- for sample in data.iter_mut() {
- if !muted {
+ if !muted {
+ for (sample, val) in data.iter_mut().zip(v.buf.drain().chain(iter::repeat(0.0))) {
*sample = sample.saturating_add(Sample::from(
- &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume),
+ &(val * volume * user_volume),
));
}
}
@@ -242,3 +301,14 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>
}
}
}
+
+impl Debug for DefaultAudioOutputDevice {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("DefaultAudioInputDevice")
+ .field("client_streams", &self.client_streams)
+ .field("config", &self.config)
+ .field("volume_sender", &self.volume_sender)
+ .field("stream", &"cpal::Stream")
+ .finish()
+ }
+}
diff --git a/mumd/src/audio/transformers.rs b/mumd/src/audio/transformers.rs
index 25e28b8..21a71b5 100644
--- a/mumd/src/audio/transformers.rs
+++ b/mumd/src/audio/transformers.rs
@@ -2,10 +2,11 @@
pub trait Transformer {
/// Do the transform. Returning `None` is interpreted as "the buffer is unwanted".
/// The implementor is free to modify the buffer however it wants to.
- fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]>;
+ fn transform<'a>(&mut self, buf: (opus::Channels, &'a mut [f32])) -> Option<(opus::Channels, &'a mut [f32])>;
}
/// A struct representing a noise gate transform.
+#[derive(Debug)]
pub struct NoiseGate {
alltime_high: f32,
open: usize,
@@ -25,7 +26,7 @@ impl NoiseGate {
}
impl Transformer for NoiseGate {
- fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]> {
+ fn transform<'a>(&mut self, (channels, buf): (opus::Channels, &'a mut [f32])) -> Option<(opus::Channels, &'a mut [f32])> {
const MUTE_PERCENTAGE: f32 = 0.1;
let max = buf.iter().map(|e| e.abs()).max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap();
@@ -43,7 +44,7 @@ impl Transformer for NoiseGate {
if self.open == 0 {
None
} else {
- Some(buf)
+ Some((channels, buf))
}
}
}
diff --git a/mumd/src/client.rs b/mumd/src/client.rs
index 9e8ca18..753583f 100644
--- a/mumd/src/client.rs
+++ b/mumd/src/client.rs
@@ -33,7 +33,7 @@ pub async fn handle(
packet_sender.clone(),
packet_receiver,
event_queue.clone(),
- ).fuse() => r.map_err(|e| ClientError::TcpError(e)),
+ ).fuse() => r.map_err(ClientError::TcpError),
_ = udp::handle(
Arc::clone(&state),
connection_info_receiver.clone(),
diff --git a/mumd/src/error.rs b/mumd/src/error.rs
index da1bdd3..4277d7f 100644
--- a/mumd/src/error.rs
+++ b/mumd/src/error.rs
@@ -5,6 +5,7 @@ use tokio::sync::mpsc;
pub type ServerSendError = mpsc::error::SendError<ControlPacket<Serverbound>>;
+#[derive(Debug)]
pub enum TcpError {
NoConnectionInfoReceived,
TlsConnectorBuilderError(native_tls::Error),
@@ -40,6 +41,7 @@ impl From<ServerSendError> for TcpError {
}
}
+#[derive(Debug)]
pub enum UdpError {
NoConnectionInfoReceived,
DisconnectBeforeCryptSetup,
@@ -53,6 +55,7 @@ impl From<std::io::Error> for UdpError {
}
}
+#[derive(Debug)]
pub enum ClientError {
TcpError(TcpError),
}
@@ -65,6 +68,7 @@ impl fmt::Display for ClientError {
}
}
+#[derive(Debug)]
pub enum AudioStream {
Input,
Output,
@@ -79,6 +83,7 @@ impl fmt::Display for AudioStream {
}
}
+#[derive(Debug)]
pub enum AudioError {
NoDevice(AudioStream),
NoConfigs(AudioStream, cpal::SupportedStreamConfigsError),
@@ -105,6 +110,7 @@ impl fmt::Display for AudioError {
}
}
+#[derive(Debug)]
pub enum StateError {
AudioError(AudioError),
ConfigError(ConfigError),
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index a96944c..e7ac033 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -1,10 +1,25 @@
-mod audio;
-mod client;
-mod command;
-mod error;
-mod network;
-mod notifications;
-mod state;
+#![warn(elided_lifetimes_in_paths)]
+#![warn(meta_variable_misuse)]
+#![warn(missing_debug_implementations)]
+#![warn(single_use_lifetimes)]
+#![warn(unreachable_pub)]
+#![warn(unused_crate_dependencies)]
+#![warn(unused_import_braces)]
+#![warn(unused_lifetimes)]
+#![warn(unused_qualifications)]
+#![deny(macro_use_extern_crate)]
+#![deny(missing_abi)]
+#![deny(future_incompatible)]
+#![forbid(unsafe_code)]
+#![forbid(non_ascii_idents)]
+
+pub mod audio;
+pub mod client;
+pub mod command;
+pub mod error;
+pub mod network;
+pub mod notifications;
+pub mod state;
use crate::state::State;
@@ -76,12 +91,9 @@ async fn main() {
_ = receive_commands(command_sender).fuse() => Ok(()),
};
- match run {
- Err(e) => {
- error!("mumd: {}", e);
- std::process::exit(1);
- }
- _ => {}
+ if let Err(e) = run {
+ error!("mumd: {}", e);
+ std::process::exit(1);
}
}
diff --git a/mumd/src/network.rs b/mumd/src/network.rs
index 2b803c0..1066fef 100644
--- a/mumd/src/network.rs
+++ b/mumd/src/network.rs
@@ -30,8 +30,8 @@ impl ConnectionInfo {
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub enum VoiceStreamType {
- TCP,
- UDP,
+ Tcp,
+ Udp,
}
async fn run_until<F, R>(
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index f620a32..0fdc4c5 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -14,6 +14,7 @@ use mumble_protocol::{Clientbound, Serverbound};
use mumlib::command::MumbleEventKind;
use std::collections::HashMap;
use std::convert::{Into, TryInto};
+use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use tokio::net::TcpStream;
@@ -31,8 +32,8 @@ type TcpSender = SplitSink<
type TcpReceiver =
SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>;
-pub(crate) type TcpEventCallback = Box<dyn FnOnce(TcpEventData)>;
-pub(crate) type TcpEventSubscriber = Box<dyn FnMut(TcpEventData) -> bool>; //the bool indicates if it should be kept or not
+pub(crate) type TcpEventCallback = Box<dyn FnOnce(TcpEventData<'_>)>;
+pub(crate) type TcpEventSubscriber = Box<dyn FnMut(TcpEventData<'_>) -> bool>; //the bool indicates if it should be kept or not
/// Why the TCP was disconnected.
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
@@ -57,15 +58,15 @@ pub enum TcpEvent {
/// Having two different types might feel a bit confusing. Essentially, a
/// callback _registers_ to a [TcpEvent] but _takes_ a [TcpEventData] as
/// parameter.
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub enum TcpEventData<'a> {
Connected(Result<&'a msgs::ServerSync, mumlib::Error>),
Disconnected(DisconnectedReason),
TextMessage(&'a msgs::TextMessage),
}
-impl<'a> From<&TcpEventData<'a>> for TcpEvent {
- fn from(t: &TcpEventData) -> Self {
+impl From<&TcpEventData<'_>> for TcpEvent {
+ fn from(t: &TcpEventData<'_>) -> Self {
match t {
TcpEventData::Connected(_) => TcpEvent::Connected,
TcpEventData::Disconnected(reason) => TcpEvent::Disconnected(*reason),
@@ -74,7 +75,7 @@ impl<'a> From<&TcpEventData<'a>> for TcpEvent {
}
}
-#[derive(Clone)]
+#[derive(Clone, Default)]
pub struct TcpEventQueue {
callbacks: Arc<RwLock<HashMap<TcpEvent, Vec<TcpEventCallback>>>>,
subscribers: Arc<RwLock<HashMap<TcpEvent, Vec<TcpEventSubscriber>>>>,
@@ -111,7 +112,7 @@ impl TcpEventQueue {
/// Fires all callbacks related to a specific TCP event and removes them from the event queue.
/// Also calls all event subscribers, but keeps them in the queue
- pub fn resolve<'a>(&self, data: TcpEventData<'a>) {
+ pub fn resolve(&self, data: TcpEventData<'_>) {
if let Some(vec) = self
.callbacks
.write()
@@ -139,6 +140,13 @@ impl TcpEventQueue {
}
}
+impl Debug for TcpEventQueue {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("TcpEventQueue")
+ .finish()
+ }
+}
+
pub async fn handle(
state: Arc<RwLock<State>>,
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
@@ -148,13 +156,14 @@ pub async fn handle(
event_queue: TcpEventQueue,
) -> Result<(), TcpError> {
loop {
- let connection_info = 'data: loop {
- while connection_info_receiver.changed().await.is_ok() {
+ let connection_info = loop {
+ if connection_info_receiver.changed().await.is_ok() {
if let Some(data) = connection_info_receiver.borrow().clone() {
- break 'data data;
+ break data;
}
+ } else {
+ return Err(TcpError::NoConnectionInfoReceived);
}
- return Err(TcpError::NoConnectionInfoReceived);
};
let connect_result = connect(
connection_info.socket_addr,
@@ -242,12 +251,12 @@ async fn connect(
builder.danger_accept_invalid_certs(accept_invalid_cert);
let connector: TlsConnector = builder
.build()
- .map_err(|e| TcpError::TlsConnectorBuilderError(e))?
+ .map_err(TcpError::TlsConnectorBuilderError)?
.into();
let tls_stream = connector
.connect(&server_host, stream)
.await
- .map_err(|e| TcpError::TlsConnectError(e))?;
+ .map_err(TcpError::TlsConnectError)?;
debug!("TLS connected");
// Wrap the TLS stream with Mumble's client-side control-channel codec
@@ -304,13 +313,13 @@ async fn send_voice(
inner_phase_watcher.changed().await.unwrap();
if matches!(
*inner_phase_watcher.borrow(),
- StatePhase::Connected(VoiceStreamType::TCP)
+ StatePhase::Connected(VoiceStreamType::Tcp)
) {
break;
}
}
run_until(
- |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::TCP)),
+ |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::Tcp)),
async {
loop {
packet_sender.send(
@@ -465,7 +474,7 @@ async fn listen(
..
} => {
state.read().unwrap().audio_output().decode_packet_payload(
- VoiceStreamType::TCP,
+ VoiceStreamType::Tcp,
session_id,
payload,
);
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 95dcf33..0f78638 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -37,13 +37,14 @@ pub async fn handle(
let receiver = state.read().unwrap().audio_input().receiver();
loop {
- let connection_info = 'data: loop {
- while connection_info_receiver.changed().await.is_ok() {
+ let connection_info = loop {
+ if connection_info_receiver.changed().await.is_ok() {
if let Some(data) = connection_info_receiver.borrow().clone() {
- break 'data data;
+ break data;
}
+ } else {
+ return Err(UdpError::NoConnectionInfoReceived);
}
- return Err(UdpError::NoConnectionInfoReceived);
};
let (sink, source) = connect(&mut crypt_state_receiver).await?;
@@ -136,7 +137,7 @@ async fn listen(
state
.read()
.unwrap()
- .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP));
+ .broadcast_phase(StatePhase::Connected(VoiceStreamType::Udp));
last_ping_recv.store(timestamp, Ordering::Relaxed);
}
VoicePacket::Audio {
@@ -147,7 +148,7 @@ async fn listen(
..
} => {
state.read().unwrap().audio_output().decode_packet_payload(
- VoiceStreamType::UDP,
+ VoiceStreamType::Udp,
session_id,
payload,
);
@@ -173,7 +174,7 @@ async fn send_pings(
state
.read()
.unwrap()
- .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP));
+ .broadcast_phase(StatePhase::Connected(VoiceStreamType::Tcp));
}
match sink
.lock()
@@ -208,13 +209,13 @@ async fn send_voice(
inner_phase_watcher.changed().await.unwrap();
if matches!(
*inner_phase_watcher.borrow(),
- StatePhase::Connected(VoiceStreamType::UDP)
+ StatePhase::Connected(VoiceStreamType::Udp)
) {
break;
}
}
run_until(
- |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::UDP)),
+ |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::Udp)),
async {
let mut receiver = receiver.lock().await;
loop {
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index d2d77b1..1992884 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -20,6 +20,7 @@ use mumlib::command::{ChannelTarget, Command, CommandResponse, MessageTarget, Mu
use mumlib::config::Config;
use mumlib::Error;
use std::{
+ fmt::Debug,
iter,
net::{SocketAddr, ToSocketAddrs},
sync::{Arc, RwLock},
@@ -29,7 +30,7 @@ use tokio::sync::{mpsc, watch};
macro_rules! at {
( $( $event:expr => $generator:expr ),+ $(,)? ) => {
ExecutionContext::TcpEventCallback(vec![
- $( ($event, Box::new($generator)), )*
+ $( ($event, Box::new($generator)), )+
])
};
}
@@ -42,18 +43,18 @@ macro_rules! now {
type Responses = Box<dyn Iterator<Item = mumlib::error::Result<Option<CommandResponse>>>>;
+type TcpEventCallback = Box<dyn FnOnce(TcpEventData<'_>) -> Responses>;
+type TcpEventSubscriberCallback = Box<
+ dyn FnMut(
+ TcpEventData<'_>,
+ &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
+ ) -> bool
+>;
+
//TODO give me a better name
pub enum ExecutionContext {
- TcpEventCallback(Vec<(TcpEvent, Box<dyn FnOnce(TcpEventData) -> Responses>)>),
- TcpEventSubscriber(
- TcpEvent,
- Box<
- dyn FnMut(
- TcpEventData,
- &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
- ) -> bool,
- >,
- ),
+ TcpEventCallback(Vec<(TcpEvent, TcpEventCallback)>),
+ TcpEventSubscriber(TcpEvent, TcpEventSubscriberCallback),
Now(Box<dyn FnOnce() -> Responses>),
Ping(
Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>,
@@ -63,6 +64,17 @@ pub enum ExecutionContext {
),
}
+impl Debug for ExecutionContext {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_tuple(match self {
+ ExecutionContext::TcpEventCallback(_) => "TcpEventCallback",
+ ExecutionContext::TcpEventSubscriber(_, _) => "TcpEventSubscriber",
+ ExecutionContext::Now(_) => "Now",
+ ExecutionContext::Ping(_, _) => "Ping",
+ }).finish()
+ }
+}
+
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum StatePhase {
Disconnected,
@@ -70,6 +82,7 @@ pub enum StatePhase {
Connected(VoiceStreamType),
}
+#[derive(Debug)]
pub struct State {
config: Config,
server: Option<Server>,
@@ -90,9 +103,9 @@ impl State {
config.audio.input_volume.unwrap_or(1.0),
phase_watcher.1.clone(),
)
- .map_err(|e| StateError::AudioError(e))?;
+ .map_err(StateError::AudioError)?;
let audio_output = AudioOutput::new(config.audio.output_volume.unwrap_or(1.0))
- .map_err(|e| StateError::AudioError(e))?;
+ .map_err(StateError::AudioError)?;
let mut state = Self {
config,
server: None,
@@ -308,7 +321,7 @@ impl State {
}
pub fn initialized(&self) {
- self.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP));
+ self.broadcast_phase(StatePhase::Connected(VoiceStreamType::Tcp));
self.audio_output
.play_effect(NotificationEvents::ServerConnect);
}
@@ -773,7 +786,7 @@ pub fn handle_command(
.unwrap()
.users()
.iter()
- .find(|(_, user)| user.name() == &name)
+ .find(|(_, user)| user.name() == name)
.map(|(e, _)| *e);
let id = match id {
diff --git a/mumd/src/state/channel.rs b/mumd/src/state/channel.rs
index 6995e1e..2ed05c5 100644
--- a/mumd/src/state/channel.rs
+++ b/mumd/src/state/channel.rs
@@ -169,13 +169,10 @@ pub fn into_channel(
impl From<&Channel> for mumlib::state::Channel {
fn from(channel: &Channel) -> Self {
- mumlib::state::Channel {
- description: channel.description.clone(),
- links: Vec::new(),
- max_users: channel.max_users,
- name: channel.name.clone(),
- children: Vec::new(),
- users: Vec::new(),
- }
+ mumlib::state::Channel::new(
+ channel.name.clone(),
+ channel.description.clone(),
+ channel.max_users,
+ )
}
}
diff --git a/mumd/src/state/server.rs b/mumd/src/state/server.rs
index 4abde49..5d49457 100644
--- a/mumd/src/state/server.rs
+++ b/mumd/src/state/server.rs
@@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
-#[derive(Clone, Debug, Deserialize, Serialize)]
+#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct Server {
channels: HashMap<u32, Channel>,
users: HashMap<u32, User>,
diff --git a/mumd/src/state/user.rs b/mumd/src/state/user.rs
index 5770bca..0ffde90 100644
--- a/mumd/src/state/user.rs
+++ b/mumd/src/state/user.rs
@@ -78,7 +78,7 @@ impl User {
}
}
- pub fn apply_user_diff(&mut self, diff: &crate::state::user::UserDiff) {
+ pub fn apply_user_diff(&mut self, diff: &UserDiff) {
if let Some(comment) = diff.comment.clone() {
self.comment = Some(comment);
}