aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2021-06-06 23:26:24 +0200
committerGustav Sörnäs <gustav@sornas.net>2021-06-06 23:26:24 +0200
commitbe76c2aa51733a0cf495e92659fbcbe527f41149 (patch)
tree617fb1caa999c076a45233b4bedea6a78192db25 /mumd/src
parent7fc5a1a36404ee4cbc09d20c955e6edd3d2ac523 (diff)
downloadmum-be76c2aa51733a0cf495e92659fbcbe527f41149.tar.gz
cargo fmt
Diffstat (limited to 'mumd/src')
-rw-r--r--mumd/src/audio.rs91
-rw-r--r--mumd/src/audio/input.rs42
-rw-r--r--mumd/src/audio/noise_gate.rs133
-rw-r--r--mumd/src/audio/output.rs24
-rw-r--r--mumd/src/client.rs13
-rw-r--r--mumd/src/command.rs31
-rw-r--r--mumd/src/error.rs15
-rw-r--r--mumd/src/main.rs22
-rw-r--r--mumd/src/network.rs8
-rw-r--r--mumd/src/network/tcp.rs93
-rw-r--r--mumd/src/network/udp.rs83
-rw-r--r--mumd/src/state.rs146
-rw-r--r--mumd/src/state/channel.rs5
-rw-r--r--mumd/src/state/server.rs11
14 files changed, 412 insertions, 305 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index df7af70..63adcc6 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -1,10 +1,12 @@
pub mod input;
-pub mod output;
mod noise_gate;
+pub mod output;
-use crate::audio::input::{DefaultAudioInputDevice, AudioInputDevice};
-use crate::audio::output::{DefaultAudioOutputDevice, AudioOutputDevice, ClientStream};
-use crate::audio::noise_gate::{from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt};
+use crate::audio::input::{AudioInputDevice, DefaultAudioInputDevice};
+use crate::audio::noise_gate::{
+ from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt,
+};
+use crate::audio::output::{AudioOutputDevice, ClientStream, DefaultAudioOutputDevice};
use crate::error::AudioError;
use crate::network::VoiceStreamType;
use crate::state::StatePhase;
@@ -15,8 +17,8 @@ use dasp_signal::{self as signal, Signal};
use futures_util::stream::Stream;
use futures_util::StreamExt;
use log::*;
+use mumble_protocol::voice::{VoicePacket, VoicePacketPayload};
use mumble_protocol::Serverbound;
-use mumble_protocol::voice::{VoicePacketPayload, VoicePacket};
use mumlib::config::SoundEffect;
use std::borrow::Cow;
use std::collections::{hash_map::Entry, HashMap};
@@ -70,34 +72,36 @@ impl TryFrom<&str> for NotificationEvents {
pub struct AudioInput {
device: DefaultAudioInputDevice,
- channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>,
+ channel_receiver:
+ Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>,
}
impl AudioInput {
- pub fn new(input_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> {
+ pub fn new(
+ input_volume: f32,
+ phase_watcher: watch::Receiver<StatePhase>,
+ ) -> Result<Self, AudioError> {
let mut default = DefaultAudioInputDevice::new(input_volume, phase_watcher)?;
let sample_rate = SampleRate(SAMPLE_RATE);
let opus_stream = OpusEncoder::new(
- 4,
- sample_rate.0,
- default.num_channels(),
- StreamingSignalExt::into_interleaved_samples(
- StreamingNoiseGate::new(
- from_interleaved_samples_stream::<_, f32>(default.sample_receiver()), //TODO group frames correctly
- 10_000
- )
- )
- ).enumerate()
- .map(|(i, e)| VoicePacket::Audio {
- _dst: std::marker::PhantomData,
- target: 0, // normal speech
- session_id: (), // unused for server-bound packets
- seq_num: i as u64,
- payload: VoicePacketPayload::Opus(e.into(), false),
- position_info: None,
- }
- );
+ 4,
+ sample_rate.0,
+ default.num_channels(),
+ StreamingSignalExt::into_interleaved_samples(StreamingNoiseGate::new(
+ from_interleaved_samples_stream::<_, f32>(default.sample_receiver()), //TODO group frames correctly
+ 10_000,
+ )),
+ )
+ .enumerate()
+ .map(|(i, e)| VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0, // normal speech
+ session_id: (), // unused for server-bound packets
+ seq_num: i as u64,
+ payload: VoicePacketPayload::Opus(e.into(), false),
+ position_info: None,
+ });
default.play()?;
@@ -108,7 +112,9 @@ impl AudioInput {
Ok(res)
}
- pub fn receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> {
+ pub fn receiver(
+ &self,
+ ) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> {
Arc::clone(&self.channel_receiver)
}
@@ -130,10 +136,7 @@ impl AudioOutput {
pub fn new(output_volume: f32) -> Result<Self, AudioError> {
let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new()));
- let default = DefaultAudioOutputDevice::new(
- output_volume,
- Arc::clone(&user_volumes),
- )?;
+ let default = DefaultAudioOutputDevice::new(output_volume, Arc::clone(&user_volumes))?;
default.play()?;
let client_streams = default.client_streams();
@@ -163,7 +166,8 @@ impl AudioOutput {
self.sounds = NotificationEvents::iter()
.map(|event| {
- let bytes = overrides.get(&event)
+ let bytes = overrides
+ .get(&event)
.map(|file| get_sfx(file))
.unwrap_or_else(get_default_sfx);
let reader = hound::WavReader::new(bytes.as_ref()).unwrap();
@@ -181,7 +185,7 @@ impl AudioOutput {
let iter: Box<dyn Iterator<Item = f32>> = match spec.channels {
1 => Box::new(samples.into_iter().flat_map(|e| vec![e, e])),
2 => Box::new(samples.into_iter()),
- _ => unimplemented!("Only mono and stereo sound is supported. See #80.")
+ _ => unimplemented!("Only mono and stereo sound is supported. See #80."),
};
let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter);
let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal));
@@ -189,24 +193,29 @@ impl AudioOutput {
.from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64)
.until_exhausted()
// if the source audio is stereo and is being played as mono, discard the right audio
- .flat_map(
- |e| if self.device.num_channels() == 1 {
+ .flat_map(|e| {
+ if self.device.num_channels() == 1 {
vec![e[0]]
} else {
e.to_vec()
}
- )
+ })
.collect::<Vec<f32>>();
(event, samples)
})
.collect();
}
- pub fn decode_packet_payload(&self, stream_type: VoiceStreamType, session_id: u32, payload: VoicePacketPayload) {
- self.client_streams.lock().unwrap().decode_packet(
- (stream_type, session_id),
- payload,
- );
+ pub fn decode_packet_payload(
+ &self,
+ stream_type: VoiceStreamType,
+ session_id: u32,
+ payload: VoicePacketPayload,
+ ) {
+ self.client_streams
+ .lock()
+ .unwrap()
+ .decode_packet((stream_type, session_id), payload);
}
pub fn set_volume(&self, output_volume: f32) {
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
index 4a1ed3d..e45ff27 100644
--- a/mumd/src/audio/input.rs
+++ b/mumd/src/audio/input.rs
@@ -1,11 +1,11 @@
-use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
-use tokio::sync::watch;
+use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
use log::*;
+use tokio::sync::watch;
-use crate::state::StatePhase;
use crate::audio::SAMPLE_RATE;
use crate::error::{AudioError, AudioStream};
+use crate::state::StatePhase;
pub fn callback<T: Sample>(
mut input_sender: futures_channel::mpsc::Sender<f32>,
@@ -17,10 +17,7 @@ pub fn callback<T: Sample>(
return;
}
let input_volume = *input_volume_receiver.borrow();
- for sample in data
- .iter()
- .map(|e| e.to_f32())
- .map(|e| e * input_volume) {
+ for sample in data.iter().map(|e| e.to_f32()).map(|e| e * input_volume) {
if let Err(_e) = input_sender.try_send(sample) {
warn!("Error sending audio: {}", _e);
}
@@ -44,7 +41,10 @@ pub struct DefaultAudioInputDevice {
}
impl DefaultAudioInputDevice {
- pub fn new(input_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> {
+ pub fn new(
+ input_volume: f32,
+ phase_watcher: watch::Receiver<StatePhase>,
+ ) -> Result<Self, AudioError> {
let sample_rate = SampleRate(SAMPLE_RATE);
let host = cpal::default_host();
@@ -76,29 +76,17 @@ impl DefaultAudioInputDevice {
let input_stream = match input_supported_sample_format {
SampleFormat::F32 => input_device.build_input_stream(
&input_config,
- callback::<f32>(
- sample_sender,
- input_volume_receiver,
- phase_watcher,
- ),
+ callback::<f32>(sample_sender, input_volume_receiver, phase_watcher),
err_fn,
),
SampleFormat::I16 => input_device.build_input_stream(
&input_config,
- callback::<i16>(
- sample_sender,
- input_volume_receiver,
- phase_watcher,
- ),
+ callback::<i16>(sample_sender, input_volume_receiver, phase_watcher),
err_fn,
),
SampleFormat::U16 => input_device.build_input_stream(
&input_config,
- callback::<u16>(
- sample_sender,
- input_volume_receiver,
- phase_watcher,
- ),
+ callback::<u16>(sample_sender, input_volume_receiver, phase_watcher),
err_fn,
),
}
@@ -116,10 +104,14 @@ impl DefaultAudioInputDevice {
impl AudioInputDevice for DefaultAudioInputDevice {
fn play(&self) -> Result<(), AudioError> {
- self.stream.play().map_err(|e| AudioError::InputPlayError(e))
+ self.stream
+ .play()
+ .map_err(|e| AudioError::InputPlayError(e))
}
fn pause(&self) -> Result<(), AudioError> {
- self.stream.pause().map_err(|e| AudioError::InputPauseError(e))
+ self.stream
+ .pause()
+ .map_err(|e| AudioError::InputPauseError(e))
}
fn set_volume(&self, volume: f32) {
self.volume_sender.send(volume).unwrap();
diff --git a/mumd/src/audio/noise_gate.rs b/mumd/src/audio/noise_gate.rs
index 824ffe0..bd1a262 100644
--- a/mumd/src/audio/noise_gate.rs
+++ b/mumd/src/audio/noise_gate.rs
@@ -1,5 +1,5 @@
use dasp_frame::Frame;
-use dasp_sample::{SignedSample, ToSample, Sample};
+use dasp_sample::{Sample, SignedSample, ToSample};
use dasp_signal::Signal;
use futures_util::stream::Stream;
use opus::Channels;
@@ -17,10 +17,7 @@ pub struct StreamingNoiseGate<S: StreamingSignal> {
}
impl<S: StreamingSignal> StreamingNoiseGate<S> {
- pub fn new(
- signal: S,
- deactivation_delay: usize,
- ) -> StreamingNoiseGate<S> {
+ pub fn new(signal: S, deactivation_delay: usize) -> StreamingNoiseGate<S> {
Self {
open: 0,
signal,
@@ -31,10 +28,11 @@ impl<S: StreamingSignal> StreamingNoiseGate<S> {
}
impl<S> StreamingSignal for StreamingNoiseGate<S>
- where
- S: StreamingSignal + Unpin,
- FloatSample<S::Frame>: Unpin,
- <S as StreamingSignal>::Frame: Unpin {
+where
+ S: StreamingSignal + Unpin,
+ FloatSample<S::Frame>: Unpin,
+ <S as StreamingSignal>::Frame: Unpin,
+{
type Frame = S::Frame;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
@@ -47,18 +45,30 @@ impl<S> StreamingSignal for StreamingNoiseGate<S>
Poll::Pending => return Poll::Pending,
};
- if let Some(highest) = frame.to_float_frame().channels().find(|e| abs(e.clone()) > s.alltime_high) {
+ if let Some(highest) = frame
+ .to_float_frame()
+ .channels()
+ .find(|e| abs(e.clone()) > s.alltime_high)
+ {
s.alltime_high = highest;
}
match s.open {
0 => {
- if frame.to_float_frame().channels().any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample())) {
+ if frame
+ .to_float_frame()
+ .channels()
+ .any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample()))
+ {
s.open = s.deactivation_delay;
}
}
_ => {
- if frame.to_float_frame().channels().any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample())) {
+ if frame
+ .to_float_frame()
+ .channels()
+ .any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample()))
+ {
s.open = s.deactivation_delay;
} else {
s.open -= 1;
@@ -98,8 +108,9 @@ pub trait StreamingSignal {
}
impl<S> StreamingSignal for S
- where
- S: Signal + Unpin {
+where
+ S: Signal + Unpin,
+{
type Frame = S::Frame;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Frame> {
@@ -109,23 +120,24 @@ impl<S> StreamingSignal for S
pub trait StreamingSignalExt: StreamingSignal {
fn next(&mut self) -> Next<'_, Self> {
- Next {
- stream: self
- }
+ Next { stream: self }
}
fn into_interleaved_samples(self) -> IntoInterleavedSamples<Self>
- where
- Self: Sized {
- IntoInterleavedSamples { signal: self, current_frame: None }
+ where
+ Self: Sized,
+ {
+ IntoInterleavedSamples {
+ signal: self,
+ current_frame: None,
+ }
}
}
-impl<S> StreamingSignalExt for S
- where S: StreamingSignal {}
+impl<S> StreamingSignalExt for S where S: StreamingSignal {}
pub struct Next<'a, S: ?Sized> {
- stream: &'a mut S
+ stream: &'a mut S,
}
impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> {
@@ -133,10 +145,8 @@ impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match S::poll_next(Pin::new(self.stream), cx) {
- Poll::Ready(val) => {
- Poll::Ready(val)
- }
- Poll::Pending => Poll::Pending
+ Poll::Ready(val) => Poll::Ready(val),
+ Poll::Pending => Poll::Pending,
}
}
}
@@ -147,9 +157,10 @@ pub struct IntoInterleavedSamples<S: StreamingSignal> {
}
impl<S> Stream for IntoInterleavedSamples<S>
- where
- S: StreamingSignal + Unpin,
- <<S as StreamingSignal>::Frame as Frame>::Channels: Unpin {
+where
+ S: StreamingSignal + Unpin,
+ <<S as StreamingSignal>::Frame as Frame>::Channels: Unpin,
+{
type Item = <S::Frame as Frame>::Sample;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@@ -176,9 +187,10 @@ struct FromStream<S> {
}
impl<S> StreamingSignal for FromStream<S>
- where
- S: Stream + Unpin,
- S::Item: Frame + Unpin {
+where
+ S: Stream + Unpin,
+ S::Item: Frame + Unpin,
+{
type Frame = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
@@ -187,9 +199,7 @@ impl<S> StreamingSignal for FromStream<S>
return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM);
}
match S::poll_next(Pin::new(&mut s.stream), cx) {
- Poll::Ready(Some(val)) => {
- Poll::Ready(val)
- }
+ Poll::Ready(Some(val)) => Poll::Ready(val),
Poll::Ready(None) => {
s.underlying_exhausted = true;
return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM);
@@ -203,20 +213,21 @@ impl<S> StreamingSignal for FromStream<S>
}
}
-
pub struct FromInterleavedSamplesStream<S, F>
- where
- F: Frame {
+where
+ F: Frame,
+{
stream: S,
next_buf: Vec<F::Sample>,
underlying_exhausted: bool,
}
pub fn from_interleaved_samples_stream<S, F>(stream: S) -> FromInterleavedSamplesStream<S, F>
- where
- S: Stream + Unpin,
- S::Item: Sample,
- F: Frame<Sample = S::Item> {
+where
+ S: Stream + Unpin,
+ S::Item: Sample,
+ F: Frame<Sample = S::Item>,
+{
FromInterleavedSamplesStream {
stream,
next_buf: Vec::new(),
@@ -225,10 +236,11 @@ pub fn from_interleaved_samples_stream<S, F>(stream: S) -> FromInterleavedSample
}
impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F>
- where
- S: Stream + Unpin,
- S::Item: Sample + Unpin,
- F: Frame<Sample = S::Item> + Unpin {
+where
+ S: Stream + Unpin,
+ S::Item: Sample + Unpin,
+ F: Frame<Sample = S::Item> + Unpin,
+{
type Frame = F;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
@@ -270,22 +282,21 @@ pub struct OpusEncoder<S> {
}
impl<S, I> OpusEncoder<S>
- where
- S: Stream<Item = I>,
- I: ToSample<f32> {
+where
+ S: Stream<Item = I>,
+ I: ToSample<f32>,
+{
pub fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self {
let encoder = opus::Encoder::new(
sample_rate,
match channels {
1 => Channels::Mono,
2 => Channels::Stereo,
- _ => unimplemented!(
- "Only 1 or 2 channels supported, got {})",
- channels
- ),
+ _ => unimplemented!("Only 1 or 2 channels supported, got {})", channels),
},
opus::Application::Voip,
- ).unwrap();
+ )
+ .unwrap();
Self {
encoder,
frame_size,
@@ -298,9 +309,10 @@ impl<S, I> OpusEncoder<S>
}
impl<S, I> Stream for OpusEncoder<S>
- where
- S: Stream<Item = I> + Unpin,
- I: Sample + ToSample<f32> {
+where
+ S: Stream<Item = I> + Unpin,
+ I: Sample + ToSample<f32>,
+{
type Item = Vec<u8>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@@ -328,7 +340,10 @@ impl<S, I> Stream for OpusEncoder<S>
s.input_buffer.clear();
}
- let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap();
+ let encoded = s
+ .encoder
+ .encode_vec_float(&s.input_buffer, opus_frame_size)
+ .unwrap();
s.input_buffer.clear();
Poll::Ready(Some(encoded))
}
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs
index 658c1c8..a2f6bcc 100644
--- a/mumd/src/audio/output.rs
+++ b/mumd/src/audio/output.rs
@@ -1,10 +1,10 @@
-use crate::network::VoiceStreamType;
use crate::audio::SAMPLE_RATE;
use crate::error::{AudioError, AudioStream};
+use crate::network::VoiceStreamType;
-use log::*;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
-use cpal::{SampleFormat, SampleRate, StreamConfig, OutputCallbackInfo, Sample};
+use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
+use log::*;
use mumble_protocol::voice::VoicePacketPayload;
use std::collections::{HashMap, VecDeque};
use std::ops::AddAssign;
@@ -39,10 +39,7 @@ impl ClientStream {
let sample_rate = self.sample_rate;
let channels = self.channels;
self.buffer_clients.entry(client).or_insert_with(|| {
- let opus_decoder = opus::Decoder::new(
- sample_rate,
- channels
- ).unwrap();
+ let opus_decoder = opus::Decoder::new(sample_rate, channels).unwrap();
(VecDeque::new(), opus_decoder)
})
}
@@ -139,7 +136,10 @@ impl DefaultAudioOutputDevice {
.with_sample_rate(sample_rate);
let output_supported_sample_format = output_supported_config.sample_format();
let output_config: StreamConfig = output_supported_config.into();
- let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(sample_rate.0, output_config.channels)));
+ let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(
+ sample_rate.0,
+ output_config.channels,
+ )));
let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
@@ -187,11 +187,15 @@ impl DefaultAudioOutputDevice {
impl AudioOutputDevice for DefaultAudioOutputDevice {
fn play(&self) -> Result<(), AudioError> {
- self.stream.play().map_err(|e| AudioError::OutputPlayError(e))
+ self.stream
+ .play()
+ .map_err(|e| AudioError::OutputPlayError(e))
}
fn pause(&self) -> Result<(), AudioError> {
- self.stream.pause().map_err(|e| AudioError::OutputPauseError(e))
+ self.stream
+ .pause()
+ .map_err(|e| AudioError::OutputPauseError(e))
}
fn set_volume(&self, volume: f32) {
diff --git a/mumd/src/client.rs b/mumd/src/client.rs
index ba9cad4..9e8ca18 100644
--- a/mumd/src/client.rs
+++ b/mumd/src/client.rs
@@ -1,10 +1,10 @@
-use crate::{command, network::tcp::TcpEventQueue};
use crate::error::ClientError;
use crate::network::{tcp, udp, ConnectionInfo};
use crate::state::State;
+use crate::{command, network::tcp::TcpEventQueue};
use futures_util::{select, FutureExt};
-use mumble_protocol::{Serverbound, control::ControlPacket, crypt::ClientCryptState};
+use mumble_protocol::{control::ControlPacket, crypt::ClientCryptState, Serverbound};
use mumlib::command::{Command, CommandResponse};
use std::sync::{Arc, RwLock};
use tokio::sync::{mpsc, watch};
@@ -18,12 +18,9 @@ pub async fn handle(
) -> Result<(), ClientError> {
let (connection_info_sender, connection_info_receiver) =
watch::channel::<Option<ConnectionInfo>>(None);
- let (crypt_state_sender, crypt_state_receiver) =
- mpsc::channel::<ClientCryptState>(1);
- let (packet_sender, packet_receiver) =
- mpsc::unbounded_channel::<ControlPacket<Serverbound>>();
- let (ping_request_sender, ping_request_receiver) =
- mpsc::unbounded_channel();
+ let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::<ClientCryptState>(1);
+ let (packet_sender, packet_receiver) = mpsc::unbounded_channel::<ControlPacket<Serverbound>>();
+ let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel();
let event_queue = TcpEventQueue::new();
let state = Arc::new(RwLock::new(state));
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index 5255afa..410751a 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -1,10 +1,13 @@
-use crate::network::{ConnectionInfo, tcp::TcpEventQueue, udp::PingRequest};
+use crate::network::{tcp::TcpEventQueue, udp::PingRequest, ConnectionInfo};
use crate::state::{ExecutionContext, State};
use log::*;
-use mumble_protocol::{Serverbound, control::ControlPacket};
+use mumble_protocol::{control::ControlPacket, Serverbound};
use mumlib::command::{Command, CommandResponse};
-use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock};
+use std::sync::{
+ atomic::{AtomicU64, Ordering},
+ Arc, RwLock,
+};
use tokio::sync::{mpsc, watch};
pub async fn handle(
@@ -22,11 +25,16 @@ pub async fn handle(
let ping_count = AtomicU64::new(0);
while let Some((command, mut response_sender)) = command_receiver.recv().await {
debug!("Received command {:?}", command);
- let event = crate::state::handle_command(Arc::clone(&state), command, &mut packet_sender, &mut connection_info_sender);
+ let event = crate::state::handle_command(
+ Arc::clone(&state),
+ command,
+ &mut packet_sender,
+ &mut connection_info_sender,
+ );
match event {
ExecutionContext::TcpEventCallback(event, generator) => {
tcp_event_queue.register_callback(
- event,
+ event,
Box::new(move |e| {
let response = generator(e);
for response in response {
@@ -35,17 +43,14 @@ pub async fn handle(
}),
);
}
- ExecutionContext::TcpEventSubscriber(event, mut handler) => {
- tcp_event_queue.register_subscriber(
+ ExecutionContext::TcpEventSubscriber(event, mut handler) => tcp_event_queue
+ .register_subscriber(
event,
- Box::new(move |event| {
- handler(event, &mut response_sender)
- }),
- )
- }
+ Box::new(move |event| handler(event, &mut response_sender)),
+ ),
ExecutionContext::Now(generator) => {
for response in generator() {
- response_sender.send(response).unwrap();
+ response_sender.send(response).unwrap();
}
drop(response_sender);
}
diff --git a/mumd/src/error.rs b/mumd/src/error.rs
index eb63df8..da1bdd3 100644
--- a/mumd/src/error.rs
+++ b/mumd/src/error.rs
@@ -1,4 +1,4 @@
-use mumble_protocol::{Serverbound, control::ControlPacket};
+use mumble_protocol::{control::ControlPacket, Serverbound};
use mumlib::error::ConfigError;
use std::fmt;
use tokio::sync::mpsc;
@@ -17,12 +17,11 @@ pub enum TcpError {
impl fmt::Display for TcpError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
- TcpError::NoConnectionInfoReceived
- => write!(f, "No connection info received"),
- TcpError::TlsConnectorBuilderError(e)
- => write!(f, "Error building TLS connector: {}", e),
- TcpError::TlsConnectError(e)
- => write!(f, "TLS error when connecting: {}", e),
+ TcpError::NoConnectionInfoReceived => write!(f, "No connection info received"),
+ TcpError::TlsConnectorBuilderError(e) => {
+ write!(f, "Error building TLS connector: {}", e)
+ }
+ TcpError::TlsConnectError(e) => write!(f, "TLS error when connecting: {}", e),
TcpError::SendError(e) => write!(f, "Couldn't send packet: {}", e),
TcpError::IOError(e) => write!(f, "IO error: {}", e),
}
@@ -44,7 +43,7 @@ impl From<ServerSendError> for TcpError {
pub enum UdpError {
NoConnectionInfoReceived,
DisconnectBeforeCryptSetup,
-
+
IOError(std::io::Error),
}
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index 0c175c2..bc72779 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -14,12 +14,18 @@ use log::*;
use mumlib::command::{Command, CommandResponse};
use mumlib::setup_logger;
use std::io::ErrorKind;
-use tokio::{net::{UnixListener, UnixStream}, sync::mpsc};
+use tokio::{
+ net::{UnixListener, UnixStream},
+ sync::mpsc,
+};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
#[tokio::main]
async fn main() {
- if std::env::args().find(|s| s.as_str() == "--version").is_some() {
+ if std::env::args()
+ .find(|s| s.as_str() == "--version")
+ .is_some()
+ {
println!("mumd {}", env!("VERSION"));
return;
}
@@ -38,7 +44,10 @@ async fn main() {
bincode::serialize_into((&mut command).writer(), &Command::Ping).unwrap();
if let Ok(()) = writer.send(command.freeze()).await {
if let Some(Ok(buf)) = reader.next().await {
- if let Ok(Ok::<Option<CommandResponse>, mumlib::Error>(Some(CommandResponse::Pong))) = bincode::deserialize(&buf) {
+ if let Ok(Ok::<Option<CommandResponse>, mumlib::Error>(Some(
+ CommandResponse::Pong,
+ ))) = bincode::deserialize(&buf)
+ {
error!("Another instance of mumd is already running");
return;
}
@@ -94,7 +103,7 @@ async fn receive_commands(
let (reader, writer) = incoming.into_split();
let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new());
let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
-
+
while let Some(next) = reader.next().await {
let buf = match next {
Ok(buf) => buf,
@@ -115,8 +124,9 @@ async fn receive_commands(
bincode::serialize_into((&mut serialized).writer(), &response).unwrap();
if let Err(e) = writer.send(serialized.freeze()).await {
- if e.kind() != ErrorKind::BrokenPipe { //if the client closed the connection, ignore logging the error
- //we just assume that they just don't want any more packets
+ if e.kind() != ErrorKind::BrokenPipe {
+ //if the client closed the connection, ignore logging the error
+ //we just assume that they just don't want any more packets
error!("Error sending response: {:?}", e);
}
break;
diff --git a/mumd/src/network.rs b/mumd/src/network.rs
index 4eca90d..2b803c0 100644
--- a/mumd/src/network.rs
+++ b/mumd/src/network.rs
@@ -4,7 +4,10 @@ pub mod udp;
use futures_util::FutureExt;
use log::*;
use std::{future::Future, net::SocketAddr};
-use tokio::{select, sync::{oneshot, watch}};
+use tokio::{
+ select,
+ sync::{oneshot, watch},
+};
use crate::state::StatePhase;
@@ -36,7 +39,8 @@ async fn run_until<F, R>(
fut: F,
mut phase_watcher: watch::Receiver<StatePhase>,
) -> Option<R>
- where F: Future<Output = R>,
+where
+ F: Future<Output = R>,
{
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index b513797..5cc2bf7 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -1,11 +1,14 @@
-use crate::{error::{ServerSendError, TcpError}, notifications};
use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
+use crate::{
+ error::{ServerSendError, TcpError},
+ notifications,
+};
use log::*;
-use futures_util::{FutureExt, SinkExt, StreamExt};
use futures_util::select;
use futures_util::stream::{SplitSink, SplitStream, Stream};
+use futures_util::{FutureExt, SinkExt, StreamExt};
use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket};
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::voice::VoicePacket;
@@ -73,24 +76,44 @@ impl TcpEventQueue {
/// Registers a new callback to be triggered when an event is fired.
pub fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) {
- self.callbacks.write().unwrap().entry(at).or_default().push(callback);
+ self.callbacks
+ .write()
+ .unwrap()
+ .entry(at)
+ .or_default()
+ .push(callback);
}
/// Registers a new callback to be triggered when an event is fired.
pub fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) {
- self.subscribers.write().unwrap().entry(at).or_default().push(callback);
+ self.subscribers
+ .write()
+ .unwrap()
+ .entry(at)
+ .or_default()
+ .push(callback);
}
/// Fires all callbacks related to a specific TCP event and removes them from the event queue.
/// Also calls all event subscribers, but keeps them in the queue
pub fn resolve<'a>(&self, data: TcpEventData<'a>) {
- if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) {
+ if let Some(vec) = self
+ .callbacks
+ .write()
+ .unwrap()
+ .get_mut(&TcpEvent::from(&data))
+ {
let old = std::mem::take(vec);
for handler in old {
handler(data.clone());
}
}
- if let Some(vec) = self.subscribers.write().unwrap().get_mut(&TcpEvent::from(&data)) {
+ if let Some(vec) = self
+ .subscribers
+ .write()
+ .unwrap()
+ .get_mut(&TcpEvent::from(&data))
+ {
let old = std::mem::take(vec);
for mut e in old {
if e(data.clone()) {
@@ -128,14 +151,18 @@ pub async fn handle(
// Handshake (omitting `Version` message for brevity)
let (username, password) = {
let state_lock = state.read().unwrap();
- (state_lock.username().unwrap().to_string(),
- state_lock.password().map(|x| x.to_string()))
+ (
+ state_lock.username().unwrap().to_string(),
+ state_lock.password().map(|x| x.to_string()),
+ )
};
authenticate(&mut sink, username, password).await?;
let (phase_watcher, input_receiver) = {
let state_lock = state.read().unwrap();
- (state_lock.phase_receiver(),
- state_lock.audio_input().receiver())
+ (
+ state_lock.phase_receiver(),
+ state_lock.audio_input().receiver(),
+ )
};
info!("Logging in...");
@@ -162,7 +189,9 @@ pub async fn handle(
}
},
phase_watcher,
- ).await.unwrap_or(Ok(()))?;
+ )
+ .await
+ .unwrap_or(Ok(()))?;
event_queue.resolve(TcpEventData::Disconnected);
@@ -197,7 +226,7 @@ async fn connect(
async fn authenticate(
sink: &mut TcpSender,
username: String,
- password: Option<String>
+ password: Option<String>,
) -> Result<(), TcpError> {
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
@@ -242,7 +271,10 @@ async fn send_voice(
let mut inner_phase_watcher = phase_watcher.clone();
loop {
inner_phase_watcher.changed().await.unwrap();
- if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::TCP)) {
+ if matches!(
+ *inner_phase_watcher.borrow(),
+ StatePhase::Connected(VoiceStreamType::TCP)
+ ) {
break;
}
}
@@ -257,11 +289,14 @@ async fn send_voice(
.next()
.await
.expect("No audio stream")
- .into())?;
+ .into(),
+ )?;
}
},
inner_phase_watcher.clone(),
- ).await.unwrap_or(Ok::<(), ServerSendError>(()))?;
+ )
+ .await
+ .unwrap_or(Ok::<(), ServerSendError>(()))?;
}
}
@@ -285,18 +320,23 @@ async fn listen(
// We end up here if the login was rejected. We probably want
// to exit before that.
warn!("TCP stream gone");
- state.read().unwrap().broadcast_phase(StatePhase::Disconnected);
+ state
+ .read()
+ .unwrap()
+ .broadcast_phase(StatePhase::Disconnected);
break;
}
};
match packet {
ControlPacket::TextMessage(msg) => {
let mut state = state.write().unwrap();
- let user = state.server()
+ let user = state
+ .server()
.and_then(|server| server.users().get(&msg.get_actor()))
.map(|user| user.name());
if let Some(user) = user {
- notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this
+ notifications::send(format!("{}: {}", user, msg.get_message()));
+ //TODO: probably want a config flag for this
}
state.register_message((msg.get_message().to_owned(), msg.get_actor()));
drop(state);
@@ -345,7 +385,9 @@ async fn listen(
debug!("Login rejected: {:?}", msg);
match msg.get_field_type() {
msgs::Reject_RejectType::WrongServerPW => {
- event_queue.resolve(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword)));
+ event_queue.resolve(TcpEventData::Connected(Err(
+ mumlib::Error::InvalidServerPassword,
+ )));
}
ty => {
warn!("Unhandled reject type: {:?}", ty);
@@ -385,14 +427,11 @@ async fn listen(
// position_info,
..
} => {
- state
- .read()
- .unwrap()
- .audio_output()
- .decode_packet_payload(
- VoiceStreamType::TCP,
- session_id,
- payload);
+ state.read().unwrap().audio_output().decode_packet_payload(
+ VoiceStreamType::TCP,
+ session_id,
+ payload,
+ );
}
}
}
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 0958912..95dcf33 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -2,9 +2,9 @@ use crate::error::UdpError;
use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
-use futures_util::{FutureExt, SinkExt, StreamExt};
use futures_util::future::join4;
use futures_util::stream::{SplitSink, SplitStream, Stream};
+use futures_util::{FutureExt, SinkExt, StreamExt};
use log::*;
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::ping::{PingPacket, PongPacket};
@@ -13,10 +13,13 @@ use mumble_protocol::Serverbound;
use std::collections::{hash_map::Entry, HashMap};
use std::convert::TryFrom;
use std::net::{Ipv6Addr, SocketAddr};
-use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock};
-use tokio::{join, net::UdpSocket};
+use std::sync::{
+ atomic::{AtomicU64, Ordering},
+ Arc, RwLock,
+};
use tokio::sync::{mpsc, oneshot, watch, Mutex};
use tokio::time::{interval, timeout, Duration};
+use tokio::{join, net::UdpSocket};
use tokio_util::udp::UdpFramed;
use super::{run_until, VoiceStreamType};
@@ -53,11 +56,7 @@ pub async fn handle(
run_until(
|phase| matches!(phase, StatePhase::Disconnected),
join4(
- listen(
- Arc::clone(&state),
- Arc::clone(&source),
- &last_ping_recv,
- ),
+ listen(Arc::clone(&state), Arc::clone(&source), &last_ping_recv),
send_voice(
Arc::clone(&sink),
connection_info.socket_addr,
@@ -71,9 +70,11 @@ pub async fn handle(
&last_ping_recv,
),
new_crypt_state(&mut crypt_state_receiver, sink, source),
- ).map(|_| ()),
+ )
+ .map(|_| ()),
phase_watcher,
- ).await;
+ )
+ .await;
debug!("Fully disconnected UDP stream, waiting for new connection info");
}
@@ -83,8 +84,7 @@ async fn connect(
crypt_state: &mut mpsc::Receiver<ClientCryptState>,
) -> Result<(UdpSender, UdpReceiver), UdpError> {
// Bind UDP socket
- let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
- .await?;
+ let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)).await?;
// Wait for initial CryptState
let crypt_state = match crypt_state.recv().await {
@@ -146,11 +146,11 @@ async fn listen(
// position_info,
..
} => {
- state
- .read()
- .unwrap()
- .audio_output()
- .decode_packet_payload(VoiceStreamType::UDP, session_id, payload);
+ state.read().unwrap().audio_output().decode_packet_payload(
+ VoiceStreamType::UDP,
+ session_id,
+ payload,
+ );
}
}
}
@@ -178,12 +178,17 @@ async fn send_pings(
match sink
.lock()
.await
- .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr))
+ .send((
+ VoicePacket::Ping {
+ timestamp: last_recv + 1,
+ },
+ server_addr,
+ ))
.await
{
Ok(_) => {
last_send = Some(last_recv + 1);
- },
+ }
Err(e) => {
debug!("Error sending UDP ping: {}", e);
}
@@ -201,7 +206,10 @@ async fn send_voice(
let mut inner_phase_watcher = phase_watcher.clone();
loop {
inner_phase_watcher.changed().await.unwrap();
- if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::UDP)) {
+ if matches!(
+ *inner_phase_watcher.borrow(),
+ StatePhase::Connected(VoiceStreamType::UDP)
+ ) {
break;
}
}
@@ -215,13 +223,12 @@ async fn send_voice(
}
},
phase_watcher.clone(),
- ).await;
+ )
+ .await;
}
}
-pub async fn handle_pings(
- mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>,
-) {
+pub async fn handle_pings(mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>) {
let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
.await
.expect("Failed to bind UDP socket");
@@ -246,19 +253,23 @@ pub async fn handle_pings(
}
tokio::spawn(async move {
- handle(
- match timeout(Duration::from_secs(1), rx).await {
- Ok(Ok(r)) => Some(r),
- Ok(Err(_)) => {
- warn!("Ping response sender for server {}, ping id {} dropped", socket_addr, id);
- None
- }
- Err(_) => {
- debug!("Server {} timed out when sending ping id {}", socket_addr, id);
- None
- }
+ handle(match timeout(Duration::from_secs(1), rx).await {
+ Ok(Ok(r)) => Some(r),
+ Ok(Err(_)) => {
+ warn!(
+ "Ping response sender for server {}, ping id {} dropped",
+ socket_addr, id
+ );
+ None
}
- );
+ Err(_) => {
+ debug!(
+ "Server {} timed out when sending ping id {}",
+ socket_addr, id
+ );
+ None
+ }
+ });
});
}
};
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index a553e18..84583e0 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -4,11 +4,12 @@ pub mod user;
use crate::audio::{AudioInput, AudioOutput, NotificationEvents};
use crate::error::StateError;
-use crate::network::{ConnectionInfo, VoiceStreamType};
use crate::network::tcp::{TcpEvent, TcpEventData};
+use crate::network::{ConnectionInfo, VoiceStreamType};
use crate::notifications;
use crate::state::server::Server;
+use crate::state::user::UserDiff;
use log::*;
use mumble_protocol::control::msgs;
use mumble_protocol::control::ControlPacket;
@@ -17,8 +18,11 @@ use mumble_protocol::voice::Serverbound;
use mumlib::command::{Command, CommandResponse, MessageTarget};
use mumlib::config::Config;
use mumlib::Error;
-use crate::state::user::UserDiff;
-use std::{iter, net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}};
+use std::{
+ iter,
+ net::{SocketAddr, ToSocketAddrs},
+ sync::{Arc, RwLock},
+};
use tokio::sync::{mpsc, watch};
macro_rules! at {
@@ -37,18 +41,22 @@ type Responses = Box<dyn Iterator<Item = mumlib::error::Result<Option<CommandRes
//TODO give me a better name
pub enum ExecutionContext {
- TcpEventCallback(
- TcpEvent,
- Box<dyn FnOnce(TcpEventData) -> Responses>,
- ),
+ TcpEventCallback(TcpEvent, Box<dyn FnOnce(TcpEventData) -> Responses>),
TcpEventSubscriber(
TcpEvent,
- Box<dyn FnMut(TcpEventData, &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>) -> bool>,
+ Box<
+ dyn FnMut(
+ TcpEventData,
+ &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
+ ) -> bool,
+ >,
),
Now(Box<dyn FnOnce() -> Responses>),
Ping(
Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>,
- Box<dyn FnOnce(Option<PongPacket>) -> mumlib::error::Result<Option<CommandResponse>> + Send>,
+ Box<
+ dyn FnOnce(Option<PongPacket>) -> mumlib::error::Result<Option<CommandResponse>> + Send,
+ >,
),
}
@@ -76,10 +84,10 @@ impl State {
let audio_input = AudioInput::new(
config.audio.input_volume.unwrap_or(1.0),
phase_watcher.1.clone(),
- ).map_err(|e| StateError::AudioError(e))?;
- let audio_output = AudioOutput::new(
- config.audio.output_volume.unwrap_or(1.0),
- ).map_err(|e| StateError::AudioError(e))?;
+ )
+ .map_err(|e| StateError::AudioError(e))?;
+ let audio_output = AudioOutput::new(config.audio.output_volume.unwrap_or(1.0))
+ .map_err(|e| StateError::AudioError(e))?;
let mut state = Self {
config,
server: None,
@@ -92,7 +100,6 @@ impl State {
Ok(state)
}
-
pub fn parse_user_state(&mut self, msg: msgs::UserState) {
if !msg.has_session() {
warn!("Can't parse user state without session");
@@ -135,7 +142,8 @@ impl State {
));
}
- self.audio_output.play_effect(NotificationEvents::UserConnected);
+ self.audio_output
+ .play_effect(NotificationEvents::UserConnected);
}
}
}
@@ -189,11 +197,12 @@ impl State {
} else {
warn!("{} moved to invalid channel {}", user.name(), to_channel);
}
- self.audio_output.play_effect(if from_channel == this_channel {
- NotificationEvents::UserJoinedChannel
- } else {
- NotificationEvents::UserLeftChannel
- });
+ self.audio_output
+ .play_effect(if from_channel == this_channel {
+ NotificationEvents::UserJoinedChannel
+ } else {
+ NotificationEvents::UserLeftChannel
+ });
}
}
@@ -224,7 +233,8 @@ impl State {
let this_channel = self.get_users_channel(self.server().unwrap().session_id().unwrap());
let other_channel = self.get_users_channel(msg.get_session());
if this_channel == other_channel {
- self.audio_output.play_effect(NotificationEvents::UserDisconnected);
+ self.audio_output
+ .play_effect(NotificationEvents::UserDisconnected);
if let Some(user) = self.server().unwrap().users().get(&msg.get_session()) {
notifications::send(format!("{} disconnected", &user.name()));
}
@@ -254,21 +264,19 @@ impl State {
self.audio_output.load_sound_effects(sound_effects);
}
}
-
+
pub fn register_message(&mut self, msg: (String, u32)) {
self.message_buffer.push(msg);
}
pub fn broadcast_phase(&self, phase: StatePhase) {
- self.phase_watcher
- .0
- .send(phase)
- .unwrap();
+ self.phase_watcher.0.send(phase).unwrap();
}
pub fn initialized(&self) {
self.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP));
- self.audio_output.play_effect(NotificationEvents::ServerConnect);
+ self.audio_output
+ .play_effect(NotificationEvents::ServerConnect);
}
pub fn audio_input(&self) -> &AudioInput {
@@ -307,10 +315,12 @@ impl State {
/// If we are connected to the server but the user with the id doesn't exist, the string "Unknown user {id}"
/// is returned instead. If we aren't connected to a server, None is returned instead.
fn get_user_name(&self, user: u32) -> Option<String> {
- self.server()
- .map(|e| e.users()
- .get(&user).map(|e| e.name().to_string())
- .unwrap_or(format!("Unknown user {}", user)))
+ self.server().map(|e| {
+ e.users()
+ .get(&user)
+ .map(|e| e.name().to_string())
+ .unwrap_or(format!("Unknown user {}", user))
+ })
}
}
@@ -404,7 +414,9 @@ pub fn handle_command(
packet_sender.send(msg.into()).unwrap();
}
- now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b })))
+ now!(Ok(
+ new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b })
+ ))
}
Command::InputVolumeSet(volume) => {
state.audio_input.set_volume(volume);
@@ -498,7 +510,9 @@ pub fn handle_command(
packet_sender.send(msg.into()).unwrap();
}
- now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b })))
+ now!(Ok(
+ new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b })
+ ))
}
Command::OutputVolumeSet(volume) => {
state.audio_output.set_volume(volume);
@@ -522,10 +536,7 @@ pub fn handle_command(
*server.password_mut() = password;
*server.host_mut() = Some(format!("{}:{}", host, port));
state.server = Some(server);
- state.phase_watcher
- .0
- .send(StatePhase::Connecting)
- .unwrap();
+ state.phase_watcher.0.send(StatePhase::Connecting).unwrap();
let socket_addr = match (host.as_ref(), port)
.to_socket_addrs()
@@ -568,11 +579,14 @@ pub fn handle_command(
state.server = None;
- state.phase_watcher
+ state
+ .phase_watcher
.0
.send(StatePhase::Disconnected)
.unwrap();
- state.audio_output.play_effect(NotificationEvents::ServerDisconnect);
+ state
+ .audio_output
+ .play_effect(NotificationEvents::ServerDisconnect);
now!(Ok(None))
}
Command::ServerStatus { host, port } => ExecutionContext::Ping(
@@ -586,12 +600,14 @@ pub fn handle_command(
}
}),
Box::new(move |pong| {
- Ok(pong.map(|pong| (CommandResponse::ServerStatus {
- version: pong.version,
- users: pong.users,
- max_users: pong.max_users,
- bandwidth: pong.bandwidth,
- })))
+ Ok(pong.map(|pong| {
+ (CommandResponse::ServerStatus {
+ version: pong.version,
+ users: pong.users,
+ max_users: pong.max_users,
+ bandwidth: pong.bandwidth,
+ })
+ }))
}),
),
Command::Status => {
@@ -624,7 +640,7 @@ pub fn handle_command(
}
Command::PastMessages { block } => {
//does it make sense to wait for messages while not connected?
- if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) {
+ if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) {
return now!(Err(Error::Disconnected));
}
if block {
@@ -634,10 +650,16 @@ pub fn handle_command(
Box::new(move |data, sender| {
if let TcpEventData::TextMessage(a) = data {
let message = (
- a.get_message().to_owned(),
- ref_state.read().unwrap().get_user_name(a.get_actor()).unwrap()
+ a.get_message().to_owned(),
+ ref_state
+ .read()
+ .unwrap()
+ .get_user_name(a.get_actor())
+ .unwrap(),
);
- sender.send(Ok(Some(CommandResponse::PastMessage { message }))).is_ok()
+ sender
+ .send(Ok(Some(CommandResponse::PastMessage { message })))
+ .is_ok()
} else {
unreachable!("Should only receive a TextMessage data when listening to TextMessage events");
}
@@ -645,21 +667,20 @@ pub fn handle_command(
)
} else {
let messages = std::mem::take(&mut state.message_buffer);
- let messages: Vec<_> = messages.into_iter()
+ let messages: Vec<_> = messages
+ .into_iter()
.map(|(msg, user)| (msg, state.get_user_name(user).unwrap()))
.map(|e| Ok(Some(CommandResponse::PastMessage { message: e })))
.collect();
-
- ExecutionContext::Now(Box::new(move || {
- Box::new(messages.into_iter())
- }))
+
+ ExecutionContext::Now(Box::new(move || Box::new(messages.into_iter())))
}
}
Command::SendMessage { message, targets } => {
if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) {
return now!(Err(Error::Disconnected));
}
-
+
let mut msg = msgs::TextMessage::new();
msg.set_message(message);
@@ -667,21 +688,20 @@ pub fn handle_command(
for target in targets {
match target {
MessageTarget::Channel { recursive, name } => {
- let channel_id = state
- .server()
- .unwrap()
- .channel_name(&name);
-
+ let channel_id = state.server().unwrap().channel_name(&name);
+
let channel_id = match channel_id {
Ok(id) => id,
Err(e) => return now!(Err(Error::ChannelIdentifierError(name, e))),
- }.0;
-
+ }
+ .0;
+
if recursive {
msg.mut_tree_id()
} else {
msg.mut_channel_id()
- }.push(channel_id);
+ }
+ .push(channel_id);
}
MessageTarget::User { name } => {
let id = state
diff --git a/mumd/src/state/channel.rs b/mumd/src/state/channel.rs
index f58ed15..6995e1e 100644
--- a/mumd/src/state/channel.rs
+++ b/mumd/src/state/channel.rs
@@ -157,10 +157,7 @@ pub fn into_channel(
let mut proto_tree = ProtoTree {
channel: Some(channels.get(&0).unwrap()),
children: HashMap::new(),
- users: channel_lookup
- .get(&0)
- .cloned()
- .unwrap_or_default(),
+ users: channel_lookup.get(&0).cloned().unwrap_or_default(),
};
for (walk, channel) in walks {
diff --git a/mumd/src/state/server.rs b/mumd/src/state/server.rs
index 78a10b9..869940a 100644
--- a/mumd/src/state/server.rs
+++ b/mumd/src/state/server.rs
@@ -103,15 +103,20 @@ impl Server {
/// server.channels.insert(0, channel.clone);
/// assert_eq!(server.channel_name("Foobar"), Ok((0, &channel)));
/// ```
- pub fn channel_name(&self, channel_name: &str) -> Result<(u32, &Channel), ChannelIdentifierError> {
- let matches = self.channels
+ pub fn channel_name(
+ &self,
+ channel_name: &str,
+ ) -> Result<(u32, &Channel), ChannelIdentifierError> {
+ let matches = self
+ .channels
.iter()
.map(|e| ((*e.0, e.1), e.1.path(&self.channels)))
.filter(|e| e.1.ends_with(channel_name))
.collect::<Vec<_>>();
Ok(match matches.len() {
0 => {
- let soft_matches = self.channels
+ let soft_matches = self
+ .channels
.iter()
.map(|e| ((*e.0, e.1), e.1.path(&self.channels).to_lowercase()))
.filter(|e| e.1.ends_with(&channel_name.to_lowercase()))