diff options
Diffstat (limited to 'mumd')
| -rw-r--r-- | mumd/Cargo.toml | 4 | ||||
| -rw-r--r-- | mumd/src/audio.rs | 39 | ||||
| -rw-r--r-- | mumd/src/audio/input.rs | 35 | ||||
| -rw-r--r-- | mumd/src/audio/output.rs | 134 | ||||
| -rw-r--r-- | mumd/src/audio/transformers.rs | 7 | ||||
| -rw-r--r-- | mumd/src/client.rs | 2 | ||||
| -rw-r--r-- | mumd/src/error.rs | 6 | ||||
| -rw-r--r-- | mumd/src/main.rs | 38 | ||||
| -rw-r--r-- | mumd/src/network.rs | 4 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 41 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 19 | ||||
| -rw-r--r-- | mumd/src/state.rs | 43 | ||||
| -rw-r--r-- | mumd/src/state/channel.rs | 13 | ||||
| -rw-r--r-- | mumd/src/state/server.rs | 2 | ||||
| -rw-r--r-- | mumd/src/state/user.rs | 2 |
15 files changed, 279 insertions, 110 deletions
diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml index 1e8e63f..28ff2ce 100644 --- a/mumd/Cargo.toml +++ b/mumd/Cargo.toml @@ -23,8 +23,6 @@ cpal = "0.13" bytes = "1.0" dasp_interpolate = { version = "0.11", features = ["linear"] } dasp_signal = "0.11" -dasp_frame = "0.11" -dasp_sample = "0.11" dasp_ring_buffer = "0.11" futures-util = { version = "0.3", features = ["sink"]} futures-channel = "0.3" @@ -32,13 +30,11 @@ hound = "3.4" log = "0.4" mumble-protocol = "0.4.1" native-tls = "0.2" -openssl = { version = "0.10" } opus = "0.2" serde = { version = "1.0", features = ["derive"] } strum = "0.20" strum_macros = "0.20" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "net", "time", "fs"] } -tokio-stream = "0.1.0" tokio-native-tls = "0.3" tokio-util = { version = "0.6", features = ["codec", "net"] } bincode = "1.3.2" diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 2e20583..6860741 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -1,3 +1,7 @@ +//! All things audio. +//! +//! Audio is handled mostly as signals from [dasp_signal]. Input/output is handled by [cpal]. + pub mod input; pub mod output; pub mod transformers; @@ -27,8 +31,11 @@ use strum::IntoEnumIterator; use strum_macros::EnumIter; use tokio::sync::watch; +/// The sample rate used internally. const SAMPLE_RATE: u32 = 48000; +/// All types of notifications that can be shown. Each notification can be bound to its own audio +/// file. #[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, EnumIter)] pub enum NotificationEvents { ServerConnect, @@ -65,9 +72,12 @@ impl TryFrom<&str> for NotificationEvents { } } +/// Input audio state. Input audio is picket up from an [AudioInputDevice] (e.g. +/// a microphone) and sent over the network. pub struct AudioInput { device: DefaultAudioInputDevice, + /// Outgoing voice packets that should be sent over the network. channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>, } @@ -112,12 +122,30 @@ impl AudioInput { } } +impl Debug for AudioInput { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AudioInput") + .field("device", &self.device) + .field("channel_receiver", &"receiver") + .finish() + } +} + +#[derive(Debug)] +/// Audio output state. The audio is received from each client over the network, +/// decoded, merged and finally played to an [AudioOutputDevice] (e.g. speaker, +/// headphones, ...). pub struct AudioOutput { device: DefaultAudioOutputDevice, + /// The volume and mute-status of a user ID. user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, + /// The client stream per user ID. A separate stream is kept for UDP and TCP. + /// + /// Shared with [DefaultAudioOutputDevice]. client_streams: Arc<Mutex<ClientStream>>, + /// Which sound effect should be played on an event. sounds: HashMap<NotificationEvents, Vec<f32>>, } @@ -140,6 +168,7 @@ impl AudioOutput { Ok(res) } + /// Loads sound effects, getting unspecified effects from [get_default_sfx]. pub fn load_sound_effects(&mut self, sound_effects: &[SoundEffect]) { let overrides: HashMap<_, _> = sound_effects .iter() @@ -153,6 +182,7 @@ impl AudioOutput { }) .collect(); + // This makes sure that every [NotificationEvent] is present in [self.sounds]. self.sounds = NotificationEvents::iter() .map(|event| { let bytes = overrides @@ -195,6 +225,7 @@ impl AudioOutput { .collect(); } + /// Decodes a voice packet. pub fn decode_packet_payload( &self, stream_type: VoiceStreamType, @@ -207,10 +238,12 @@ impl AudioOutput { .decode_packet((stream_type, session_id), payload); } + /// Sets the volume of the output device. pub fn set_volume(&self, output_volume: f32) { self.device.set_volume(output_volume); } + /// Sets the incoming volume of a user. pub fn set_user_volume(&self, id: u32, volume: f32) { match self.user_volumes.lock().unwrap().entry(id) { Entry::Occupied(mut entry) => { @@ -222,6 +255,7 @@ impl AudioOutput { } } + /// Mutes another user. pub fn set_mute(&self, id: u32, mute: bool) { match self.user_volumes.lock().unwrap().entry(id) { Entry::Occupied(mut entry) => { @@ -233,12 +267,14 @@ impl AudioOutput { } } + /// Queues a sound effect. pub fn play_effect(&self, effect: NotificationEvents) { let samples = self.sounds.get(&effect).unwrap(); - self.client_streams.lock().unwrap().extend(None, samples); + self.client_streams.lock().unwrap().add_sound_effect(samples); } } +/// Reads a sound effect from disk. // moo fn get_sfx(file: &str) -> Cow<'static, [u8]> { let mut buf: Vec<u8> = Vec::new(); @@ -251,6 +287,7 @@ fn get_sfx(file: &str) -> Cow<'static, [u8]> { } } +/// Gets the default sound effect. fn get_default_sfx() -> Cow<'static, [u8]> { Cow::from(include_bytes!("fallback_sfx.wav").as_ref()) } diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index a1227e3..4dfc465 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -1,6 +1,8 @@ +//! Listens to the microphone and sends it to the networking. use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; use log::*; +use std::fmt::Debug; use tokio::sync::watch; use crate::audio::SAMPLE_RATE; @@ -8,6 +10,7 @@ use crate::audio::transformers::{NoiseGate, Transformer}; use crate::error::{AudioError, AudioStream}; use crate::state::StatePhase; +/// Generates a callback that receives [Sample]s and sends them as floats to a [futures_channel::mpsc::Sender]. pub fn callback<T: Sample>( mut input_sender: futures_channel::mpsc::Sender<Vec<u8>>, mut transformers: Vec<Box<dyn Transformer + Send + 'static>>, @@ -29,8 +32,8 @@ pub fn callback<T: Sample>( buffer.extend(data.by_ref().take(buffer_size - buffer.len())); let encoded = transformers .iter_mut() - .try_fold(&mut buffer[..], |acc, e| e.transform(acc)) - .map(|buf| opus_encoder.encode_vec_float(&*buf, buffer_size).unwrap()); + .try_fold((opus::Channels::Mono, &mut buffer[..]), |acc, e| e.transform(acc)) + .map(|buf| opus_encoder.encode_vec_float(&*buf.1, buffer_size).unwrap()); if let Some(encoded) = encoded { if let Err(e) = input_sender.try_send(encoded) { @@ -43,11 +46,19 @@ pub fn callback<T: Sample>( } } +/// Something that can listen to audio and send it somewhere. +/// +/// One sample is assumed to be an encoded opus frame. See [opus::Encoder]. pub trait AudioInputDevice { + /// Starts the device. fn play(&self) -> Result<(), AudioError>; + /// Stops the device. fn pause(&self) -> Result<(), AudioError>; + /// Sets the input volume of the device. fn set_volume(&self, volume: f32); + /// Returns a receiver to this device's values. fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>>; + /// The amount of channels this device has. fn num_channels(&self) -> usize; } @@ -59,6 +70,7 @@ pub struct DefaultAudioInputDevice { } impl DefaultAudioInputDevice { + /// Initializes the default audio input. pub fn new( input_volume: f32, phase_watcher: watch::Receiver<StatePhase>, @@ -160,20 +172,35 @@ impl AudioInputDevice for DefaultAudioInputDevice { fn play(&self) -> Result<(), AudioError> { self.stream .play() - .map_err(|e| AudioError::InputPlayError(e)) + .map_err(AudioError::InputPlayError) } + fn pause(&self) -> Result<(), AudioError> { self.stream .pause() - .map_err(|e| AudioError::InputPauseError(e)) + .map_err(AudioError::InputPauseError) } + fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); } + fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>> { self.sample_receiver.take() } + fn num_channels(&self) -> usize { self.channels as usize } } + +impl Debug for DefaultAudioInputDevice { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DefaultAudioInputDevice") + .field("sample_receiver", &self.sample_receiver) + .field("channels", &self.channels) + .field("volume_sender", &self.volume_sender) + .field("stream", &"cpal::Stream") + .finish() + } +} diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index a2f6bcc..6cec6fc 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,23 +1,79 @@ +//! Receives audio packets from the networking and plays them. + use crate::audio::SAMPLE_RATE; use crate::error::{AudioError, AudioStream}; use crate::network::VoiceStreamType; +use bytes::Bytes; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; +use dasp_ring_buffer::Bounded; use log::*; use mumble_protocol::voice::VoicePacketPayload; use std::collections::{HashMap, VecDeque}; +use std::fmt::Debug; +use std::iter; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; type ClientStreamKey = (VoiceStreamType, u32); +/// State for decoding audio received from another user. +#[derive(Debug)] +pub struct ClientAudioData { + buf: Bounded<Vec<f32>>, + output_channels: opus::Channels, + // We need both since a client can hypothetically send both mono + // and stereo packets, and we can't switch a decoder on the fly + // to reuse it. + mono_decoder: opus::Decoder, + stereo_decoder: opus::Decoder, +} + +impl ClientAudioData { + pub fn new(sample_rate: u32, output_channels: opus::Channels) -> Self { + Self { + mono_decoder: opus::Decoder::new(sample_rate, opus::Channels::Mono).unwrap(), + stereo_decoder: opus::Decoder::new(sample_rate, opus::Channels::Stereo).unwrap(), + output_channels, + buf: Bounded::from_full(vec![0.0; sample_rate as usize * output_channels as usize]), //buffer 1 s of audio + } + } + + pub fn store_packet(&mut self, bytes: Bytes) { + let packet_channels = opus::packet::get_nb_channels(&bytes).unwrap(); + let (decoder, channels) = match packet_channels { + opus::Channels::Mono => (&mut self.mono_decoder, 1), + opus::Channels::Stereo => (&mut self.stereo_decoder, 2), + }; + let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode + let parsed = decoder + .decode_float(&bytes, &mut out, false) + .expect("Error decoding"); + out.truncate(parsed); + match (packet_channels, self.output_channels) { + (opus::Channels::Mono, opus::Channels::Mono) | (opus::Channels::Stereo, opus::Channels::Stereo) => for sample in out { + self.buf.push(sample); + }, + (opus::Channels::Mono, opus::Channels::Stereo) => for sample in out { + self.buf.push(sample); + self.buf.push(sample); + }, + (opus::Channels::Stereo, opus::Channels::Mono) => for sample in out.into_iter().step_by(2) { + self.buf.push(sample); + }, + } + } +} + +/// Collected state for client opus decoders and sound effects. +#[derive(Debug)] pub struct ClientStream { - buffer_clients: HashMap<ClientStreamKey, (VecDeque<f32>, opus::Decoder)>, //TODO ring buffer? + buffer_clients: HashMap<ClientStreamKey, ClientAudioData>, buffer_effects: VecDeque<f32>, sample_rate: u32, - channels: opus::Channels, + output_channels: opus::Channels, } impl ClientStream { @@ -31,29 +87,21 @@ impl ClientStream { buffer_clients: HashMap::new(), buffer_effects: VecDeque::new(), sample_rate, - channels, + output_channels: channels, } } - fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque<f32>, opus::Decoder) { - let sample_rate = self.sample_rate; - let channels = self.channels; - self.buffer_clients.entry(client).or_insert_with(|| { - let opus_decoder = opus::Decoder::new(sample_rate, channels).unwrap(); - (VecDeque::new(), opus_decoder) - }) + fn get_client(&mut self, client: ClientStreamKey) -> &mut ClientAudioData { + self.buffer_clients.entry(client).or_insert( + ClientAudioData::new(self.sample_rate, self.output_channels) + ) } + /// Decodes a voice packet. pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec<f32> = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode - let (buffer, decoder) = self.get_client(client); - let parsed = decoder - .decode_float(&bytes, &mut out, false) - .expect("Error decoding"); - out.truncate(parsed); - buffer.extend(&out); + self.get_client(client).store_packet(bytes); } _ => { unimplemented!("Payload type not supported"); @@ -61,16 +109,19 @@ impl ClientStream { } } - pub fn extend(&mut self, client: Option<ClientStreamKey>, values: &[f32]) { - let buffer = match client { - Some(x) => &mut self.get_client(x).0, - None => &mut self.buffer_effects, - }; - buffer.extend(values.iter().copied()); + /// Extends the sound effect buffer queue with some received values. + pub fn add_sound_effect(&mut self, values: &[f32]) { + self.buffer_effects.extend(values.iter().copied()); } } +/// Adds two values in some saturating way. +/// +/// Since we support [f32], [i16] and [u16] we need some way of adding two values +/// without peaking above/below the edge values. This trait ensures that we can +/// use all three primitive types as a generic parameter. pub trait SaturatingAdd { + /// Adds two values in some saturating way. See trait documentation. fn saturating_add(self, rhs: Self) -> Self; } @@ -104,14 +155,20 @@ pub trait AudioOutputDevice { fn client_streams(&self) -> Arc<Mutex<ClientStream>>; } +/// The default audio output device, as determined by [cpal]. pub struct DefaultAudioOutputDevice { config: StreamConfig, stream: cpal::Stream, + /// The client stream per user ID. A separate stream is kept for UDP and TCP. + /// + /// Shared with [super::AudioOutput]. client_streams: Arc<Mutex<ClientStream>>, + /// Output volume configuration. volume_sender: watch::Sender<f32>, } impl DefaultAudioOutputDevice { + /// Initializes the default audio output. pub fn new( output_volume: f32, user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, @@ -148,7 +205,7 @@ impl DefaultAudioOutputDevice { let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( &output_config, - curry_callback::<f32>( + callback::<f32>( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -157,7 +214,7 @@ impl DefaultAudioOutputDevice { ), SampleFormat::I16 => output_device.build_output_stream( &output_config, - curry_callback::<i16>( + callback::<i16>( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -166,7 +223,7 @@ impl DefaultAudioOutputDevice { ), SampleFormat::U16 => output_device.build_output_stream( &output_config, - curry_callback::<u16>( + callback::<u16>( Arc::clone(&client_streams), output_volume_receiver, user_volumes, @@ -189,13 +246,13 @@ impl AudioOutputDevice for DefaultAudioOutputDevice { fn play(&self) -> Result<(), AudioError> { self.stream .play() - .map_err(|e| AudioError::OutputPlayError(e)) + .map_err(AudioError::OutputPlayError) } fn pause(&self) -> Result<(), AudioError> { self.stream .pause() - .map_err(|e| AudioError::OutputPauseError(e)) + .map_err(AudioError::OutputPauseError) } fn set_volume(&self, volume: f32) { @@ -211,7 +268,9 @@ impl AudioOutputDevice for DefaultAudioOutputDevice { } } -pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>( +/// Returns a function that fills a buffer with audio from client streams +/// modified according to some audio configuration. +pub fn callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>( user_bufs: Arc<Mutex<ClientStream>>, output_volume_receiver: watch::Receiver<f32>, user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, @@ -227,10 +286,10 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display> let user_volumes = user_volumes.lock().unwrap(); for (k, v) in user_bufs.buffer_clients.iter_mut() { let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false)); - for sample in data.iter_mut() { - if !muted { + if !muted { + for (sample, val) in data.iter_mut().zip(v.buf.drain().chain(iter::repeat(0.0))) { *sample = sample.saturating_add(Sample::from( - &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume), + &(val * volume * user_volume), )); } } @@ -242,3 +301,14 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display> } } } + +impl Debug for DefaultAudioOutputDevice { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DefaultAudioInputDevice") + .field("client_streams", &self.client_streams) + .field("config", &self.config) + .field("volume_sender", &self.volume_sender) + .field("stream", &"cpal::Stream") + .finish() + } +} diff --git a/mumd/src/audio/transformers.rs b/mumd/src/audio/transformers.rs index 25e28b8..21a71b5 100644 --- a/mumd/src/audio/transformers.rs +++ b/mumd/src/audio/transformers.rs @@ -2,10 +2,11 @@ pub trait Transformer { /// Do the transform. Returning `None` is interpreted as "the buffer is unwanted". /// The implementor is free to modify the buffer however it wants to. - fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]>; + fn transform<'a>(&mut self, buf: (opus::Channels, &'a mut [f32])) -> Option<(opus::Channels, &'a mut [f32])>; } /// A struct representing a noise gate transform. +#[derive(Debug)] pub struct NoiseGate { alltime_high: f32, open: usize, @@ -25,7 +26,7 @@ impl NoiseGate { } impl Transformer for NoiseGate { - fn transform<'a>(&mut self, buf: &'a mut [f32]) -> Option<&'a mut [f32]> { + fn transform<'a>(&mut self, (channels, buf): (opus::Channels, &'a mut [f32])) -> Option<(opus::Channels, &'a mut [f32])> { const MUTE_PERCENTAGE: f32 = 0.1; let max = buf.iter().map(|e| e.abs()).max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(); @@ -43,7 +44,7 @@ impl Transformer for NoiseGate { if self.open == 0 { None } else { - Some(buf) + Some((channels, buf)) } } } diff --git a/mumd/src/client.rs b/mumd/src/client.rs index 9e8ca18..753583f 100644 --- a/mumd/src/client.rs +++ b/mumd/src/client.rs @@ -33,7 +33,7 @@ pub async fn handle( packet_sender.clone(), packet_receiver, event_queue.clone(), - ).fuse() => r.map_err(|e| ClientError::TcpError(e)), + ).fuse() => r.map_err(ClientError::TcpError), _ = udp::handle( Arc::clone(&state), connection_info_receiver.clone(), diff --git a/mumd/src/error.rs b/mumd/src/error.rs index da1bdd3..4277d7f 100644 --- a/mumd/src/error.rs +++ b/mumd/src/error.rs @@ -5,6 +5,7 @@ use tokio::sync::mpsc; pub type ServerSendError = mpsc::error::SendError<ControlPacket<Serverbound>>; +#[derive(Debug)] pub enum TcpError { NoConnectionInfoReceived, TlsConnectorBuilderError(native_tls::Error), @@ -40,6 +41,7 @@ impl From<ServerSendError> for TcpError { } } +#[derive(Debug)] pub enum UdpError { NoConnectionInfoReceived, DisconnectBeforeCryptSetup, @@ -53,6 +55,7 @@ impl From<std::io::Error> for UdpError { } } +#[derive(Debug)] pub enum ClientError { TcpError(TcpError), } @@ -65,6 +68,7 @@ impl fmt::Display for ClientError { } } +#[derive(Debug)] pub enum AudioStream { Input, Output, @@ -79,6 +83,7 @@ impl fmt::Display for AudioStream { } } +#[derive(Debug)] pub enum AudioError { NoDevice(AudioStream), NoConfigs(AudioStream, cpal::SupportedStreamConfigsError), @@ -105,6 +110,7 @@ impl fmt::Display for AudioError { } } +#[derive(Debug)] pub enum StateError { AudioError(AudioError), ConfigError(ConfigError), diff --git a/mumd/src/main.rs b/mumd/src/main.rs index a96944c..e7ac033 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -1,10 +1,25 @@ -mod audio; -mod client; -mod command; -mod error; -mod network; -mod notifications; -mod state; +#![warn(elided_lifetimes_in_paths)] +#![warn(meta_variable_misuse)] +#![warn(missing_debug_implementations)] +#![warn(single_use_lifetimes)] +#![warn(unreachable_pub)] +#![warn(unused_crate_dependencies)] +#![warn(unused_import_braces)] +#![warn(unused_lifetimes)] +#![warn(unused_qualifications)] +#![deny(macro_use_extern_crate)] +#![deny(missing_abi)] +#![deny(future_incompatible)] +#![forbid(unsafe_code)] +#![forbid(non_ascii_idents)] + +pub mod audio; +pub mod client; +pub mod command; +pub mod error; +pub mod network; +pub mod notifications; +pub mod state; use crate::state::State; @@ -76,12 +91,9 @@ async fn main() { _ = receive_commands(command_sender).fuse() => Ok(()), }; - match run { - Err(e) => { - error!("mumd: {}", e); - std::process::exit(1); - } - _ => {} + if let Err(e) = run { + error!("mumd: {}", e); + std::process::exit(1); } } diff --git a/mumd/src/network.rs b/mumd/src/network.rs index 2b803c0..1066fef 100644 --- a/mumd/src/network.rs +++ b/mumd/src/network.rs @@ -30,8 +30,8 @@ impl ConnectionInfo { #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] pub enum VoiceStreamType { - TCP, - UDP, + Tcp, + Udp, } async fn run_until<F, R>( diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index f620a32..0fdc4c5 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -14,6 +14,7 @@ use mumble_protocol::{Clientbound, Serverbound}; use mumlib::command::MumbleEventKind; use std::collections::HashMap; use std::convert::{Into, TryInto}; +use std::fmt::Debug; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use tokio::net::TcpStream; @@ -31,8 +32,8 @@ type TcpSender = SplitSink< type TcpReceiver = SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>; -pub(crate) type TcpEventCallback = Box<dyn FnOnce(TcpEventData)>; -pub(crate) type TcpEventSubscriber = Box<dyn FnMut(TcpEventData) -> bool>; //the bool indicates if it should be kept or not +pub(crate) type TcpEventCallback = Box<dyn FnOnce(TcpEventData<'_>)>; +pub(crate) type TcpEventSubscriber = Box<dyn FnMut(TcpEventData<'_>) -> bool>; //the bool indicates if it should be kept or not /// Why the TCP was disconnected. #[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)] @@ -57,15 +58,15 @@ pub enum TcpEvent { /// Having two different types might feel a bit confusing. Essentially, a /// callback _registers_ to a [TcpEvent] but _takes_ a [TcpEventData] as /// parameter. -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum TcpEventData<'a> { Connected(Result<&'a msgs::ServerSync, mumlib::Error>), Disconnected(DisconnectedReason), TextMessage(&'a msgs::TextMessage), } -impl<'a> From<&TcpEventData<'a>> for TcpEvent { - fn from(t: &TcpEventData) -> Self { +impl From<&TcpEventData<'_>> for TcpEvent { + fn from(t: &TcpEventData<'_>) -> Self { match t { TcpEventData::Connected(_) => TcpEvent::Connected, TcpEventData::Disconnected(reason) => TcpEvent::Disconnected(*reason), @@ -74,7 +75,7 @@ impl<'a> From<&TcpEventData<'a>> for TcpEvent { } } -#[derive(Clone)] +#[derive(Clone, Default)] pub struct TcpEventQueue { callbacks: Arc<RwLock<HashMap<TcpEvent, Vec<TcpEventCallback>>>>, subscribers: Arc<RwLock<HashMap<TcpEvent, Vec<TcpEventSubscriber>>>>, @@ -111,7 +112,7 @@ impl TcpEventQueue { /// Fires all callbacks related to a specific TCP event and removes them from the event queue. /// Also calls all event subscribers, but keeps them in the queue - pub fn resolve<'a>(&self, data: TcpEventData<'a>) { + pub fn resolve(&self, data: TcpEventData<'_>) { if let Some(vec) = self .callbacks .write() @@ -139,6 +140,13 @@ impl TcpEventQueue { } } +impl Debug for TcpEventQueue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TcpEventQueue") + .finish() + } +} + pub async fn handle( state: Arc<RwLock<State>>, mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, @@ -148,13 +156,14 @@ pub async fn handle( event_queue: TcpEventQueue, ) -> Result<(), TcpError> { loop { - let connection_info = 'data: loop { - while connection_info_receiver.changed().await.is_ok() { + let connection_info = loop { + if connection_info_receiver.changed().await.is_ok() { if let Some(data) = connection_info_receiver.borrow().clone() { - break 'data data; + break data; } + } else { + return Err(TcpError::NoConnectionInfoReceived); } - return Err(TcpError::NoConnectionInfoReceived); }; let connect_result = connect( connection_info.socket_addr, @@ -242,12 +251,12 @@ async fn connect( builder.danger_accept_invalid_certs(accept_invalid_cert); let connector: TlsConnector = builder .build() - .map_err(|e| TcpError::TlsConnectorBuilderError(e))? + .map_err(TcpError::TlsConnectorBuilderError)? .into(); let tls_stream = connector .connect(&server_host, stream) .await - .map_err(|e| TcpError::TlsConnectError(e))?; + .map_err(TcpError::TlsConnectError)?; debug!("TLS connected"); // Wrap the TLS stream with Mumble's client-side control-channel codec @@ -304,13 +313,13 @@ async fn send_voice( inner_phase_watcher.changed().await.unwrap(); if matches!( *inner_phase_watcher.borrow(), - StatePhase::Connected(VoiceStreamType::TCP) + StatePhase::Connected(VoiceStreamType::Tcp) ) { break; } } run_until( - |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::TCP)), + |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::Tcp)), async { loop { packet_sender.send( @@ -465,7 +474,7 @@ async fn listen( .. } => { state.read().unwrap().audio_output().decode_packet_payload( - VoiceStreamType::TCP, + VoiceStreamType::Tcp, session_id, payload, ); diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 95dcf33..0f78638 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -37,13 +37,14 @@ pub async fn handle( let receiver = state.read().unwrap().audio_input().receiver(); loop { - let connection_info = 'data: loop { - while connection_info_receiver.changed().await.is_ok() { + let connection_info = loop { + if connection_info_receiver.changed().await.is_ok() { if let Some(data) = connection_info_receiver.borrow().clone() { - break 'data data; + break data; } + } else { + return Err(UdpError::NoConnectionInfoReceived); } - return Err(UdpError::NoConnectionInfoReceived); }; let (sink, source) = connect(&mut crypt_state_receiver).await?; @@ -136,7 +137,7 @@ async fn listen( state .read() .unwrap() - .broadcast_phase(StatePhase::Connected(VoiceStreamType::UDP)); + .broadcast_phase(StatePhase::Connected(VoiceStreamType::Udp)); last_ping_recv.store(timestamp, Ordering::Relaxed); } VoicePacket::Audio { @@ -147,7 +148,7 @@ async fn listen( .. } => { state.read().unwrap().audio_output().decode_packet_payload( - VoiceStreamType::UDP, + VoiceStreamType::Udp, session_id, payload, ); @@ -173,7 +174,7 @@ async fn send_pings( state .read() .unwrap() - .broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); + .broadcast_phase(StatePhase::Connected(VoiceStreamType::Tcp)); } match sink .lock() @@ -208,13 +209,13 @@ async fn send_voice( inner_phase_watcher.changed().await.unwrap(); if matches!( *inner_phase_watcher.borrow(), - StatePhase::Connected(VoiceStreamType::UDP) + StatePhase::Connected(VoiceStreamType::Udp) ) { break; } } run_until( - |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::UDP)), + |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::Udp)), async { let mut receiver = receiver.lock().await; loop { diff --git a/mumd/src/state.rs b/mumd/src/state.rs index d2d77b1..1992884 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -20,6 +20,7 @@ use mumlib::command::{ChannelTarget, Command, CommandResponse, MessageTarget, Mu use mumlib::config::Config; use mumlib::Error; use std::{ + fmt::Debug, iter, net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}, @@ -29,7 +30,7 @@ use tokio::sync::{mpsc, watch}; macro_rules! at { ( $( $event:expr => $generator:expr ),+ $(,)? ) => { ExecutionContext::TcpEventCallback(vec![ - $( ($event, Box::new($generator)), )* + $( ($event, Box::new($generator)), )+ ]) }; } @@ -42,18 +43,18 @@ macro_rules! now { type Responses = Box<dyn Iterator<Item = mumlib::error::Result<Option<CommandResponse>>>>; +type TcpEventCallback = Box<dyn FnOnce(TcpEventData<'_>) -> Responses>; +type TcpEventSubscriberCallback = Box< + dyn FnMut( + TcpEventData<'_>, + &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>, + ) -> bool +>; + //TODO give me a better name pub enum ExecutionContext { - TcpEventCallback(Vec<(TcpEvent, Box<dyn FnOnce(TcpEventData) -> Responses>)>), - TcpEventSubscriber( - TcpEvent, - Box< - dyn FnMut( - TcpEventData, - &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>, - ) -> bool, - >, - ), + TcpEventCallback(Vec<(TcpEvent, TcpEventCallback)>), + TcpEventSubscriber(TcpEvent, TcpEventSubscriberCallback), Now(Box<dyn FnOnce() -> Responses>), Ping( Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>, @@ -63,6 +64,17 @@ pub enum ExecutionContext { ), } +impl Debug for ExecutionContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple(match self { + ExecutionContext::TcpEventCallback(_) => "TcpEventCallback", + ExecutionContext::TcpEventSubscriber(_, _) => "TcpEventSubscriber", + ExecutionContext::Now(_) => "Now", + ExecutionContext::Ping(_, _) => "Ping", + }).finish() + } +} + #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum StatePhase { Disconnected, @@ -70,6 +82,7 @@ pub enum StatePhase { Connected(VoiceStreamType), } +#[derive(Debug)] pub struct State { config: Config, server: Option<Server>, @@ -90,9 +103,9 @@ impl State { config.audio.input_volume.unwrap_or(1.0), phase_watcher.1.clone(), ) - .map_err(|e| StateError::AudioError(e))?; + .map_err(StateError::AudioError)?; let audio_output = AudioOutput::new(config.audio.output_volume.unwrap_or(1.0)) - .map_err(|e| StateError::AudioError(e))?; + .map_err(StateError::AudioError)?; let mut state = Self { config, server: None, @@ -308,7 +321,7 @@ impl State { } pub fn initialized(&self) { - self.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); + self.broadcast_phase(StatePhase::Connected(VoiceStreamType::Tcp)); self.audio_output .play_effect(NotificationEvents::ServerConnect); } @@ -773,7 +786,7 @@ pub fn handle_command( .unwrap() .users() .iter() - .find(|(_, user)| user.name() == &name) + .find(|(_, user)| user.name() == name) .map(|(e, _)| *e); let id = match id { diff --git a/mumd/src/state/channel.rs b/mumd/src/state/channel.rs index 6995e1e..2ed05c5 100644 --- a/mumd/src/state/channel.rs +++ b/mumd/src/state/channel.rs @@ -169,13 +169,10 @@ pub fn into_channel( impl From<&Channel> for mumlib::state::Channel { fn from(channel: &Channel) -> Self { - mumlib::state::Channel { - description: channel.description.clone(), - links: Vec::new(), - max_users: channel.max_users, - name: channel.name.clone(), - children: Vec::new(), - users: Vec::new(), - } + mumlib::state::Channel::new( + channel.name.clone(), + channel.description.clone(), + channel.max_users, + ) } } diff --git a/mumd/src/state/server.rs b/mumd/src/state/server.rs index 4abde49..5d49457 100644 --- a/mumd/src/state/server.rs +++ b/mumd/src/state/server.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use std::collections::hash_map::Entry; use std::collections::HashMap; -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct Server { channels: HashMap<u32, Channel>, users: HashMap<u32, User>, diff --git a/mumd/src/state/user.rs b/mumd/src/state/user.rs index 5770bca..0ffde90 100644 --- a/mumd/src/state/user.rs +++ b/mumd/src/state/user.rs @@ -78,7 +78,7 @@ impl User { } } - pub fn apply_user_diff(&mut self, diff: &crate::state::user::UserDiff) { + pub fn apply_user_diff(&mut self, diff: &UserDiff) { if let Some(comment) = diff.comment.clone() { self.comment = Some(comment); } |
