diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2021-06-06 23:26:24 +0200 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2021-06-06 23:26:24 +0200 |
| commit | be76c2aa51733a0cf495e92659fbcbe527f41149 (patch) | |
| tree | 617fb1caa999c076a45233b4bedea6a78192db25 /mumd | |
| parent | 7fc5a1a36404ee4cbc09d20c955e6edd3d2ac523 (diff) | |
| download | mum-be76c2aa51733a0cf495e92659fbcbe527f41149.tar.gz | |
cargo fmt
Diffstat (limited to 'mumd')
| -rw-r--r-- | mumd/build.rs | 12 | ||||
| -rw-r--r-- | mumd/src/audio.rs | 91 | ||||
| -rw-r--r-- | mumd/src/audio/input.rs | 42 | ||||
| -rw-r--r-- | mumd/src/audio/noise_gate.rs | 133 | ||||
| -rw-r--r-- | mumd/src/audio/output.rs | 24 | ||||
| -rw-r--r-- | mumd/src/client.rs | 13 | ||||
| -rw-r--r-- | mumd/src/command.rs | 31 | ||||
| -rw-r--r-- | mumd/src/error.rs | 15 | ||||
| -rw-r--r-- | mumd/src/main.rs | 22 | ||||
| -rw-r--r-- | mumd/src/network.rs | 8 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 93 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 83 | ||||
| -rw-r--r-- | mumd/src/state.rs | 146 | ||||
| -rw-r--r-- | mumd/src/state/channel.rs | 5 | ||||
| -rw-r--r-- | mumd/src/state/server.rs | 11 |
15 files changed, 419 insertions, 310 deletions
diff --git a/mumd/build.rs b/mumd/build.rs index 0a4f506..c11146a 100644 --- a/mumd/build.rs +++ b/mumd/build.rs @@ -11,9 +11,11 @@ fn main() { fn commit_hash() -> Option<String> { let output = Command::new("git") - .arg("describe") - .arg("--tags") - .current_dir(env!("CARGO_MANIFEST_DIR")) - .output(); - output.ok().map(|o| String::from_utf8_lossy(&o.stdout).to_string()) + .arg("describe") + .arg("--tags") + .current_dir(env!("CARGO_MANIFEST_DIR")) + .output(); + output + .ok() + .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) } diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index df7af70..63adcc6 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -1,10 +1,12 @@ pub mod input; -pub mod output; mod noise_gate; +pub mod output; -use crate::audio::input::{DefaultAudioInputDevice, AudioInputDevice}; -use crate::audio::output::{DefaultAudioOutputDevice, AudioOutputDevice, ClientStream}; -use crate::audio::noise_gate::{from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt}; +use crate::audio::input::{AudioInputDevice, DefaultAudioInputDevice}; +use crate::audio::noise_gate::{ + from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt, +}; +use crate::audio::output::{AudioOutputDevice, ClientStream, DefaultAudioOutputDevice}; use crate::error::AudioError; use crate::network::VoiceStreamType; use crate::state::StatePhase; @@ -15,8 +17,8 @@ use dasp_signal::{self as signal, Signal}; use futures_util::stream::Stream; use futures_util::StreamExt; use log::*; +use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; use mumble_protocol::Serverbound; -use mumble_protocol::voice::{VoicePacketPayload, VoicePacket}; use mumlib::config::SoundEffect; use std::borrow::Cow; use std::collections::{hash_map::Entry, HashMap}; @@ -70,34 +72,36 @@ impl TryFrom<&str> for NotificationEvents { pub struct AudioInput { device: DefaultAudioInputDevice, - channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>, + channel_receiver: + Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>, } impl AudioInput { - pub fn new(input_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> { + pub fn new( + input_volume: f32, + phase_watcher: watch::Receiver<StatePhase>, + ) -> Result<Self, AudioError> { let mut default = DefaultAudioInputDevice::new(input_volume, phase_watcher)?; let sample_rate = SampleRate(SAMPLE_RATE); let opus_stream = OpusEncoder::new( - 4, - sample_rate.0, - default.num_channels(), - StreamingSignalExt::into_interleaved_samples( - StreamingNoiseGate::new( - from_interleaved_samples_stream::<_, f32>(default.sample_receiver()), //TODO group frames correctly - 10_000 - ) - ) - ).enumerate() - .map(|(i, e)| VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: i as u64, - payload: VoicePacketPayload::Opus(e.into(), false), - position_info: None, - } - ); + 4, + sample_rate.0, + default.num_channels(), + StreamingSignalExt::into_interleaved_samples(StreamingNoiseGate::new( + from_interleaved_samples_stream::<_, f32>(default.sample_receiver()), //TODO group frames correctly + 10_000, + )), + ) + .enumerate() + .map(|(i, e)| VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: i as u64, + payload: VoicePacketPayload::Opus(e.into(), false), + position_info: None, + }); default.play()?; @@ -108,7 +112,9 @@ impl AudioInput { Ok(res) } - pub fn receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> { + pub fn receiver( + &self, + ) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> { Arc::clone(&self.channel_receiver) } @@ -130,10 +136,7 @@ impl AudioOutput { pub fn new(output_volume: f32) -> Result<Self, AudioError> { let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); - let default = DefaultAudioOutputDevice::new( - output_volume, - Arc::clone(&user_volumes), - )?; + let default = DefaultAudioOutputDevice::new(output_volume, Arc::clone(&user_volumes))?; default.play()?; let client_streams = default.client_streams(); @@ -163,7 +166,8 @@ impl AudioOutput { self.sounds = NotificationEvents::iter() .map(|event| { - let bytes = overrides.get(&event) + let bytes = overrides + .get(&event) .map(|file| get_sfx(file)) .unwrap_or_else(get_default_sfx); let reader = hound::WavReader::new(bytes.as_ref()).unwrap(); @@ -181,7 +185,7 @@ impl AudioOutput { let iter: Box<dyn Iterator<Item = f32>> = match spec.channels { 1 => Box::new(samples.into_iter().flat_map(|e| vec![e, e])), 2 => Box::new(samples.into_iter()), - _ => unimplemented!("Only mono and stereo sound is supported. See #80.") + _ => unimplemented!("Only mono and stereo sound is supported. See #80."), }; let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter); let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal)); @@ -189,24 +193,29 @@ impl AudioOutput { .from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64) .until_exhausted() // if the source audio is stereo and is being played as mono, discard the right audio - .flat_map( - |e| if self.device.num_channels() == 1 { + .flat_map(|e| { + if self.device.num_channels() == 1 { vec![e[0]] } else { e.to_vec() } - ) + }) .collect::<Vec<f32>>(); (event, samples) }) .collect(); } - pub fn decode_packet_payload(&self, stream_type: VoiceStreamType, session_id: u32, payload: VoicePacketPayload) { - self.client_streams.lock().unwrap().decode_packet( - (stream_type, session_id), - payload, - ); + pub fn decode_packet_payload( + &self, + stream_type: VoiceStreamType, + session_id: u32, + payload: VoicePacketPayload, + ) { + self.client_streams + .lock() + .unwrap() + .decode_packet((stream_type, session_id), payload); } pub fn set_volume(&self, output_volume: f32) { diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 4a1ed3d..e45ff27 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -1,11 +1,11 @@ -use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use tokio::sync::watch; +use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; use log::*; +use tokio::sync::watch; -use crate::state::StatePhase; use crate::audio::SAMPLE_RATE; use crate::error::{AudioError, AudioStream}; +use crate::state::StatePhase; pub fn callback<T: Sample>( mut input_sender: futures_channel::mpsc::Sender<f32>, @@ -17,10 +17,7 @@ pub fn callback<T: Sample>( return; } let input_volume = *input_volume_receiver.borrow(); - for sample in data - .iter() - .map(|e| e.to_f32()) - .map(|e| e * input_volume) { + for sample in data.iter().map(|e| e.to_f32()).map(|e| e * input_volume) { if let Err(_e) = input_sender.try_send(sample) { warn!("Error sending audio: {}", _e); } @@ -44,7 +41,10 @@ pub struct DefaultAudioInputDevice { } impl DefaultAudioInputDevice { - pub fn new(input_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> { + pub fn new( + input_volume: f32, + phase_watcher: watch::Receiver<StatePhase>, + ) -> Result<Self, AudioError> { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); @@ -76,29 +76,17 @@ impl DefaultAudioInputDevice { let input_stream = match input_supported_sample_format { SampleFormat::F32 => input_device.build_input_stream( &input_config, - callback::<f32>( - sample_sender, - input_volume_receiver, - phase_watcher, - ), + callback::<f32>(sample_sender, input_volume_receiver, phase_watcher), err_fn, ), SampleFormat::I16 => input_device.build_input_stream( &input_config, - callback::<i16>( - sample_sender, - input_volume_receiver, - phase_watcher, - ), + callback::<i16>(sample_sender, input_volume_receiver, phase_watcher), err_fn, ), SampleFormat::U16 => input_device.build_input_stream( &input_config, - callback::<u16>( - sample_sender, - input_volume_receiver, - phase_watcher, - ), + callback::<u16>(sample_sender, input_volume_receiver, phase_watcher), err_fn, ), } @@ -116,10 +104,14 @@ impl DefaultAudioInputDevice { impl AudioInputDevice for DefaultAudioInputDevice { fn play(&self) -> Result<(), AudioError> { - self.stream.play().map_err(|e| AudioError::InputPlayError(e)) + self.stream + .play() + .map_err(|e| AudioError::InputPlayError(e)) } fn pause(&self) -> Result<(), AudioError> { - self.stream.pause().map_err(|e| AudioError::InputPauseError(e)) + self.stream + .pause() + .map_err(|e| AudioError::InputPauseError(e)) } fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); diff --git a/mumd/src/audio/noise_gate.rs b/mumd/src/audio/noise_gate.rs index 824ffe0..bd1a262 100644 --- a/mumd/src/audio/noise_gate.rs +++ b/mumd/src/audio/noise_gate.rs @@ -1,5 +1,5 @@ use dasp_frame::Frame; -use dasp_sample::{SignedSample, ToSample, Sample}; +use dasp_sample::{Sample, SignedSample, ToSample}; use dasp_signal::Signal; use futures_util::stream::Stream; use opus::Channels; @@ -17,10 +17,7 @@ pub struct StreamingNoiseGate<S: StreamingSignal> { } impl<S: StreamingSignal> StreamingNoiseGate<S> { - pub fn new( - signal: S, - deactivation_delay: usize, - ) -> StreamingNoiseGate<S> { + pub fn new(signal: S, deactivation_delay: usize) -> StreamingNoiseGate<S> { Self { open: 0, signal, @@ -31,10 +28,11 @@ impl<S: StreamingSignal> StreamingNoiseGate<S> { } impl<S> StreamingSignal for StreamingNoiseGate<S> - where - S: StreamingSignal + Unpin, - FloatSample<S::Frame>: Unpin, - <S as StreamingSignal>::Frame: Unpin { +where + S: StreamingSignal + Unpin, + FloatSample<S::Frame>: Unpin, + <S as StreamingSignal>::Frame: Unpin, +{ type Frame = S::Frame; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> { @@ -47,18 +45,30 @@ impl<S> StreamingSignal for StreamingNoiseGate<S> Poll::Pending => return Poll::Pending, }; - if let Some(highest) = frame.to_float_frame().channels().find(|e| abs(e.clone()) > s.alltime_high) { + if let Some(highest) = frame + .to_float_frame() + .channels() + .find(|e| abs(e.clone()) > s.alltime_high) + { s.alltime_high = highest; } match s.open { 0 => { - if frame.to_float_frame().channels().any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample())) { + if frame + .to_float_frame() + .channels() + .any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample())) + { s.open = s.deactivation_delay; } } _ => { - if frame.to_float_frame().channels().any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample())) { + if frame + .to_float_frame() + .channels() + .any(|e| abs(e) >= s.alltime_high.mul_amp(MUTE_PERCENTAGE.to_sample())) + { s.open = s.deactivation_delay; } else { s.open -= 1; @@ -98,8 +108,9 @@ pub trait StreamingSignal { } impl<S> StreamingSignal for S - where - S: Signal + Unpin { +where + S: Signal + Unpin, +{ type Frame = S::Frame; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Frame> { @@ -109,23 +120,24 @@ impl<S> StreamingSignal for S pub trait StreamingSignalExt: StreamingSignal { fn next(&mut self) -> Next<'_, Self> { - Next { - stream: self - } + Next { stream: self } } fn into_interleaved_samples(self) -> IntoInterleavedSamples<Self> - where - Self: Sized { - IntoInterleavedSamples { signal: self, current_frame: None } + where + Self: Sized, + { + IntoInterleavedSamples { + signal: self, + current_frame: None, + } } } -impl<S> StreamingSignalExt for S - where S: StreamingSignal {} +impl<S> StreamingSignalExt for S where S: StreamingSignal {} pub struct Next<'a, S: ?Sized> { - stream: &'a mut S + stream: &'a mut S, } impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> { @@ -133,10 +145,8 @@ impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { match S::poll_next(Pin::new(self.stream), cx) { - Poll::Ready(val) => { - Poll::Ready(val) - } - Poll::Pending => Poll::Pending + Poll::Ready(val) => Poll::Ready(val), + Poll::Pending => Poll::Pending, } } } @@ -147,9 +157,10 @@ pub struct IntoInterleavedSamples<S: StreamingSignal> { } impl<S> Stream for IntoInterleavedSamples<S> - where - S: StreamingSignal + Unpin, - <<S as StreamingSignal>::Frame as Frame>::Channels: Unpin { +where + S: StreamingSignal + Unpin, + <<S as StreamingSignal>::Frame as Frame>::Channels: Unpin, +{ type Item = <S::Frame as Frame>::Sample; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { @@ -176,9 +187,10 @@ struct FromStream<S> { } impl<S> StreamingSignal for FromStream<S> - where - S: Stream + Unpin, - S::Item: Frame + Unpin { +where + S: Stream + Unpin, + S::Item: Frame + Unpin, +{ type Frame = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> { @@ -187,9 +199,7 @@ impl<S> StreamingSignal for FromStream<S> return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM); } match S::poll_next(Pin::new(&mut s.stream), cx) { - Poll::Ready(Some(val)) => { - Poll::Ready(val) - } + Poll::Ready(Some(val)) => Poll::Ready(val), Poll::Ready(None) => { s.underlying_exhausted = true; return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM); @@ -203,20 +213,21 @@ impl<S> StreamingSignal for FromStream<S> } } - pub struct FromInterleavedSamplesStream<S, F> - where - F: Frame { +where + F: Frame, +{ stream: S, next_buf: Vec<F::Sample>, underlying_exhausted: bool, } pub fn from_interleaved_samples_stream<S, F>(stream: S) -> FromInterleavedSamplesStream<S, F> - where - S: Stream + Unpin, - S::Item: Sample, - F: Frame<Sample = S::Item> { +where + S: Stream + Unpin, + S::Item: Sample, + F: Frame<Sample = S::Item>, +{ FromInterleavedSamplesStream { stream, next_buf: Vec::new(), @@ -225,10 +236,11 @@ pub fn from_interleaved_samples_stream<S, F>(stream: S) -> FromInterleavedSample } impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F> - where - S: Stream + Unpin, - S::Item: Sample + Unpin, - F: Frame<Sample = S::Item> + Unpin { +where + S: Stream + Unpin, + S::Item: Sample + Unpin, + F: Frame<Sample = S::Item> + Unpin, +{ type Frame = F; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> { @@ -270,22 +282,21 @@ pub struct OpusEncoder<S> { } impl<S, I> OpusEncoder<S> - where - S: Stream<Item = I>, - I: ToSample<f32> { +where + S: Stream<Item = I>, + I: ToSample<f32>, +{ pub fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self { let encoder = opus::Encoder::new( sample_rate, match channels { 1 => Channels::Mono, 2 => Channels::Stereo, - _ => unimplemented!( - "Only 1 or 2 channels supported, got {})", - channels - ), + _ => unimplemented!("Only 1 or 2 channels supported, got {})", channels), }, opus::Application::Voip, - ).unwrap(); + ) + .unwrap(); Self { encoder, frame_size, @@ -298,9 +309,10 @@ impl<S, I> OpusEncoder<S> } impl<S, I> Stream for OpusEncoder<S> - where - S: Stream<Item = I> + Unpin, - I: Sample + ToSample<f32> { +where + S: Stream<Item = I> + Unpin, + I: Sample + ToSample<f32>, +{ type Item = Vec<u8>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { @@ -328,7 +340,10 @@ impl<S, I> Stream for OpusEncoder<S> s.input_buffer.clear(); } - let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap(); + let encoded = s + .encoder + .encode_vec_float(&s.input_buffer, opus_frame_size) + .unwrap(); s.input_buffer.clear(); Poll::Ready(Some(encoded)) } diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 658c1c8..a2f6bcc 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,10 +1,10 @@ -use crate::network::VoiceStreamType; use crate::audio::SAMPLE_RATE; use crate::error::{AudioError, AudioStream}; +use crate::network::VoiceStreamType; -use log::*; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{SampleFormat, SampleRate, StreamConfig, OutputCallbackInfo, Sample}; +use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; +use log::*; use mumble_protocol::voice::VoicePacketPayload; use std::collections::{HashMap, VecDeque}; use std::ops::AddAssign; @@ -39,10 +39,7 @@ impl ClientStream { let sample_rate = self.sample_rate; let channels = self.channels; self.buffer_clients.entry(client).or_insert_with(|| { - let opus_decoder = opus::Decoder::new( - sample_rate, - channels - ).unwrap(); + let opus_decoder = opus::Decoder::new(sample_rate, channels).unwrap(); (VecDeque::new(), opus_decoder) }) } @@ -139,7 +136,10 @@ impl DefaultAudioOutputDevice { .with_sample_rate(sample_rate); let output_supported_sample_format = output_supported_config.sample_format(); let output_config: StreamConfig = output_supported_config.into(); - let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(sample_rate.0, output_config.channels))); + let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new( + sample_rate.0, + output_config.channels, + ))); let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); @@ -187,11 +187,15 @@ impl DefaultAudioOutputDevice { impl AudioOutputDevice for DefaultAudioOutputDevice { fn play(&self) -> Result<(), AudioError> { - self.stream.play().map_err(|e| AudioError::OutputPlayError(e)) + self.stream + .play() + .map_err(|e| AudioError::OutputPlayError(e)) } fn pause(&self) -> Result<(), AudioError> { - self.stream.pause().map_err(|e| AudioError::OutputPauseError(e)) + self.stream + .pause() + .map_err(|e| AudioError::OutputPauseError(e)) } fn set_volume(&self, volume: f32) { diff --git a/mumd/src/client.rs b/mumd/src/client.rs index ba9cad4..9e8ca18 100644 --- a/mumd/src/client.rs +++ b/mumd/src/client.rs @@ -1,10 +1,10 @@ -use crate::{command, network::tcp::TcpEventQueue}; use crate::error::ClientError; use crate::network::{tcp, udp, ConnectionInfo}; use crate::state::State; +use crate::{command, network::tcp::TcpEventQueue}; use futures_util::{select, FutureExt}; -use mumble_protocol::{Serverbound, control::ControlPacket, crypt::ClientCryptState}; +use mumble_protocol::{control::ControlPacket, crypt::ClientCryptState, Serverbound}; use mumlib::command::{Command, CommandResponse}; use std::sync::{Arc, RwLock}; use tokio::sync::{mpsc, watch}; @@ -18,12 +18,9 @@ pub async fn handle( ) -> Result<(), ClientError> { let (connection_info_sender, connection_info_receiver) = watch::channel::<Option<ConnectionInfo>>(None); - let (crypt_state_sender, crypt_state_receiver) = - mpsc::channel::<ClientCryptState>(1); - let (packet_sender, packet_receiver) = - mpsc::unbounded_channel::<ControlPacket<Serverbound>>(); - let (ping_request_sender, ping_request_receiver) = - mpsc::unbounded_channel(); + let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::<ClientCryptState>(1); + let (packet_sender, packet_receiver) = mpsc::unbounded_channel::<ControlPacket<Serverbound>>(); + let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel(); let event_queue = TcpEventQueue::new(); let state = Arc::new(RwLock::new(state)); diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 5255afa..410751a 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,10 +1,13 @@ -use crate::network::{ConnectionInfo, tcp::TcpEventQueue, udp::PingRequest}; +use crate::network::{tcp::TcpEventQueue, udp::PingRequest, ConnectionInfo}; use crate::state::{ExecutionContext, State}; use log::*; -use mumble_protocol::{Serverbound, control::ControlPacket}; +use mumble_protocol::{control::ControlPacket, Serverbound}; use mumlib::command::{Command, CommandResponse}; -use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, +}; use tokio::sync::{mpsc, watch}; pub async fn handle( @@ -22,11 +25,16 @@ pub async fn handle( let ping_count = AtomicU64::new(0); while let Some((command, mut response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); - let event = crate::state::handle_command(Arc::clone(&state), command, &mut packet_sender, &mut connection_info_sender); + let event = crate::state::handle_command( + Arc::clone(&state), + command, + &mut packet_sender, + &mut connection_info_sender, + ); match event { ExecutionContext::TcpEventCallback(event, generator) => { tcp_event_queue.register_callback( - event, + event, Box::new(move |e| { let response = generator(e); for response in response { @@ -35,17 +43,14 @@ pub async fn handle( }), ); } - ExecutionContext::TcpEventSubscriber(event, mut handler) => { - tcp_event_queue.register_subscriber( + ExecutionContext::TcpEventSubscriber(event, mut handler) => tcp_event_queue + .register_subscriber( event, - Box::new(move |event| { - handler(event, &mut response_sender) - }), - ) - } + Box::new(move |event| handler(event, &mut response_sender)), + ), ExecutionContext::Now(generator) => { for response in generator() { - response_sender.send(response).unwrap(); + response_sender.send(response).unwrap(); } drop(response_sender); } diff --git a/mumd/src/error.rs b/mumd/src/error.rs index eb63df8..da1bdd3 100644 --- a/mumd/src/error.rs +++ b/mumd/src/error.rs @@ -1,4 +1,4 @@ -use mumble_protocol::{Serverbound, control::ControlPacket}; +use mumble_protocol::{control::ControlPacket, Serverbound}; use mumlib::error::ConfigError; use std::fmt; use tokio::sync::mpsc; @@ -17,12 +17,11 @@ pub enum TcpError { impl fmt::Display for TcpError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - TcpError::NoConnectionInfoReceived - => write!(f, "No connection info received"), - TcpError::TlsConnectorBuilderError(e) - => write!(f, "Error building TLS connector: {}", e), - TcpError::TlsConnectError(e) - => write!(f, "TLS error when connecting: {}", e), + TcpError::NoConnectionInfoReceived => write!(f, "No connection info received"), + TcpError::TlsConnectorBuilderError(e) => { + write!(f, "Error building TLS connector: {}", e) + } + TcpError::TlsConnectError(e) => write!(f, "TLS error when connecting: {}", e), TcpError::SendError(e) => write!(f, "Couldn't send packet: {}", e), TcpError::IOError(e) => write!(f, "IO error: {}", e), } @@ -44,7 +43,7 @@ impl From<ServerSendError> for TcpError { pub enum UdpError { NoConnectionInfoReceived, DisconnectBeforeCryptSetup, - + IOError(std::io::Error), } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 0c175c2..bc72779 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -14,12 +14,18 @@ use log::*; use mumlib::command::{Command, CommandResponse}; use mumlib::setup_logger; use std::io::ErrorKind; -use tokio::{net::{UnixListener, UnixStream}, sync::mpsc}; +use tokio::{ + net::{UnixListener, UnixStream}, + sync::mpsc, +}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; #[tokio::main] async fn main() { - if std::env::args().find(|s| s.as_str() == "--version").is_some() { + if std::env::args() + .find(|s| s.as_str() == "--version") + .is_some() + { println!("mumd {}", env!("VERSION")); return; } @@ -38,7 +44,10 @@ async fn main() { bincode::serialize_into((&mut command).writer(), &Command::Ping).unwrap(); if let Ok(()) = writer.send(command.freeze()).await { if let Some(Ok(buf)) = reader.next().await { - if let Ok(Ok::<Option<CommandResponse>, mumlib::Error>(Some(CommandResponse::Pong))) = bincode::deserialize(&buf) { + if let Ok(Ok::<Option<CommandResponse>, mumlib::Error>(Some( + CommandResponse::Pong, + ))) = bincode::deserialize(&buf) + { error!("Another instance of mumd is already running"); return; } @@ -94,7 +103,7 @@ async fn receive_commands( let (reader, writer) = incoming.into_split(); let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new()); let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new()); - + while let Some(next) = reader.next().await { let buf = match next { Ok(buf) => buf, @@ -115,8 +124,9 @@ async fn receive_commands( bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); if let Err(e) = writer.send(serialized.freeze()).await { - if e.kind() != ErrorKind::BrokenPipe { //if the client closed the connection, ignore logging the error - //we just assume that they just don't want any more packets + if e.kind() != ErrorKind::BrokenPipe { + //if the client closed the connection, ignore logging the error + //we just assume that they just don't want any more packets error!("Error sending response: {:?}", e); } break; diff --git a/mumd/src/network.rs b/mumd/src/network.rs index 4eca90d..2b803c0 100644 --- a/mumd/src/network.rs +++ b/mumd/src/network.rs @@ -4,7 +4,10 @@ pub mod udp; use futures_util::FutureExt; use log::*; use std::{future::Future, net::SocketAddr}; -use tokio::{select, sync::{oneshot, watch}}; +use tokio::{ + select, + sync::{oneshot, watch}, +}; use crate::state::StatePhase; @@ -36,7 +39,8 @@ async fn run_until<F, R>( fut: F, mut phase_watcher: watch::Receiver<StatePhase>, ) -> Option<R> - where F: Future<Output = R>, +where + F: Future<Output = R>, { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index b513797..5cc2bf7 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,11 +1,14 @@ -use crate::{error::{ServerSendError, TcpError}, notifications}; use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; +use crate::{ + error::{ServerSendError, TcpError}, + notifications, +}; use log::*; -use futures_util::{FutureExt, SinkExt, StreamExt}; use futures_util::select; use futures_util::stream::{SplitSink, SplitStream, Stream}; +use futures_util::{FutureExt, SinkExt, StreamExt}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; @@ -73,24 +76,44 @@ impl TcpEventQueue { /// Registers a new callback to be triggered when an event is fired. pub fn register_callback(&self, at: TcpEvent, callback: TcpEventCallback) { - self.callbacks.write().unwrap().entry(at).or_default().push(callback); + self.callbacks + .write() + .unwrap() + .entry(at) + .or_default() + .push(callback); } /// Registers a new callback to be triggered when an event is fired. pub fn register_subscriber(&self, at: TcpEvent, callback: TcpEventSubscriber) { - self.subscribers.write().unwrap().entry(at).or_default().push(callback); + self.subscribers + .write() + .unwrap() + .entry(at) + .or_default() + .push(callback); } /// Fires all callbacks related to a specific TCP event and removes them from the event queue. /// Also calls all event subscribers, but keeps them in the queue pub fn resolve<'a>(&self, data: TcpEventData<'a>) { - if let Some(vec) = self.callbacks.write().unwrap().get_mut(&TcpEvent::from(&data)) { + if let Some(vec) = self + .callbacks + .write() + .unwrap() + .get_mut(&TcpEvent::from(&data)) + { let old = std::mem::take(vec); for handler in old { handler(data.clone()); } } - if let Some(vec) = self.subscribers.write().unwrap().get_mut(&TcpEvent::from(&data)) { + if let Some(vec) = self + .subscribers + .write() + .unwrap() + .get_mut(&TcpEvent::from(&data)) + { let old = std::mem::take(vec); for mut e in old { if e(data.clone()) { @@ -128,14 +151,18 @@ pub async fn handle( // Handshake (omitting `Version` message for brevity) let (username, password) = { let state_lock = state.read().unwrap(); - (state_lock.username().unwrap().to_string(), - state_lock.password().map(|x| x.to_string())) + ( + state_lock.username().unwrap().to_string(), + state_lock.password().map(|x| x.to_string()), + ) }; authenticate(&mut sink, username, password).await?; let (phase_watcher, input_receiver) = { let state_lock = state.read().unwrap(); - (state_lock.phase_receiver(), - state_lock.audio_input().receiver()) + ( + state_lock.phase_receiver(), + state_lock.audio_input().receiver(), + ) }; info!("Logging in..."); @@ -162,7 +189,9 @@ pub async fn handle( } }, phase_watcher, - ).await.unwrap_or(Ok(()))?; + ) + .await + .unwrap_or(Ok(()))?; event_queue.resolve(TcpEventData::Disconnected); @@ -197,7 +226,7 @@ async fn connect( async fn authenticate( sink: &mut TcpSender, username: String, - password: Option<String> + password: Option<String>, ) -> Result<(), TcpError> { let mut msg = msgs::Authenticate::new(); msg.set_username(username); @@ -242,7 +271,10 @@ async fn send_voice( let mut inner_phase_watcher = phase_watcher.clone(); loop { inner_phase_watcher.changed().await.unwrap(); - if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::TCP)) { + if matches!( + *inner_phase_watcher.borrow(), + StatePhase::Connected(VoiceStreamType::TCP) + ) { break; } } @@ -257,11 +289,14 @@ async fn send_voice( .next() .await .expect("No audio stream") - .into())?; + .into(), + )?; } }, inner_phase_watcher.clone(), - ).await.unwrap_or(Ok::<(), ServerSendError>(()))?; + ) + .await + .unwrap_or(Ok::<(), ServerSendError>(()))?; } } @@ -285,18 +320,23 @@ async fn listen( // We end up here if the login was rejected. We probably want // to exit before that. warn!("TCP stream gone"); - state.read().unwrap().broadcast_phase(StatePhase::Disconnected); + state + .read() + .unwrap() + .broadcast_phase(StatePhase::Disconnected); break; } }; match packet { ControlPacket::TextMessage(msg) => { let mut state = state.write().unwrap(); - let user = state.server() + let user = state + .server() .and_then(|server| server.users().get(&msg.get_actor())) .map(|user| user.name()); if let Some(user) = user { - notifications::send(format!("{}: {}", user, msg.get_message())); //TODO: probably want a config flag for this + notifications::send(format!("{}: {}", user, msg.get_message())); + //TODO: probably want a config flag for this } state.register_message((msg.get_message().to_owned(), msg.get_actor())); drop(state); @@ -345,7 +385,9 @@ async fn listen( debug!("Login rejected: {:?}", msg); match msg.get_field_type() { msgs::Reject_RejectType::WrongServerPW => { - event_queue.resolve(TcpEventData::Connected(Err(mumlib::Error::InvalidServerPassword))); + event_queue.resolve(TcpEventData::Connected(Err( + mumlib::Error::InvalidServerPassword, + ))); } ty => { warn!("Unhandled reject type: {:?}", ty); @@ -385,14 +427,11 @@ async fn listen( // position_info, .. } => { - state - .read() - .unwrap() - .audio_output() - .decode_packet_payload( - VoiceStreamType::TCP, - session_id, - payload); + state.read().unwrap().audio_output().decode_packet_payload( + VoiceStreamType::TCP, + session_id, + payload, + ); } } } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 0958912..95dcf33 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -2,9 +2,9 @@ use crate::error::UdpError; use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; -use futures_util::{FutureExt, SinkExt, StreamExt}; use futures_util::future::join4; use futures_util::stream::{SplitSink, SplitStream, Stream}; +use futures_util::{FutureExt, SinkExt, StreamExt}; use log::*; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; @@ -13,10 +13,13 @@ use mumble_protocol::Serverbound; use std::collections::{hash_map::Entry, HashMap}; use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; -use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock}; -use tokio::{join, net::UdpSocket}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, +}; use tokio::sync::{mpsc, oneshot, watch, Mutex}; use tokio::time::{interval, timeout, Duration}; +use tokio::{join, net::UdpSocket}; use tokio_util::udp::UdpFramed; use super::{run_until, VoiceStreamType}; @@ -53,11 +56,7 @@ pub async fn handle( run_until( |phase| matches!(phase, StatePhase::Disconnected), join4( - listen( - Arc::clone(&state), - Arc::clone(&source), - &last_ping_recv, - ), + listen(Arc::clone(&state), Arc::clone(&source), &last_ping_recv), send_voice( Arc::clone(&sink), connection_info.socket_addr, @@ -71,9 +70,11 @@ pub async fn handle( &last_ping_recv, ), new_crypt_state(&mut crypt_state_receiver, sink, source), - ).map(|_| ()), + ) + .map(|_| ()), phase_watcher, - ).await; + ) + .await; debug!("Fully disconnected UDP stream, waiting for new connection info"); } @@ -83,8 +84,7 @@ async fn connect( crypt_state: &mut mpsc::Receiver<ClientCryptState>, ) -> Result<(UdpSender, UdpReceiver), UdpError> { // Bind UDP socket - let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) - .await?; + let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)).await?; // Wait for initial CryptState let crypt_state = match crypt_state.recv().await { @@ -146,11 +146,11 @@ async fn listen( // position_info, .. } => { - state - .read() - .unwrap() - .audio_output() - .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); + state.read().unwrap().audio_output().decode_packet_payload( + VoiceStreamType::UDP, + session_id, + payload, + ); } } } @@ -178,12 +178,17 @@ async fn send_pings( match sink .lock() .await - .send((VoicePacket::Ping { timestamp: last_recv + 1 }, server_addr)) + .send(( + VoicePacket::Ping { + timestamp: last_recv + 1, + }, + server_addr, + )) .await { Ok(_) => { last_send = Some(last_recv + 1); - }, + } Err(e) => { debug!("Error sending UDP ping: {}", e); } @@ -201,7 +206,10 @@ async fn send_voice( let mut inner_phase_watcher = phase_watcher.clone(); loop { inner_phase_watcher.changed().await.unwrap(); - if matches!(*inner_phase_watcher.borrow(), StatePhase::Connected(VoiceStreamType::UDP)) { + if matches!( + *inner_phase_watcher.borrow(), + StatePhase::Connected(VoiceStreamType::UDP) + ) { break; } } @@ -215,13 +223,12 @@ async fn send_voice( } }, phase_watcher.clone(), - ).await; + ) + .await; } } -pub async fn handle_pings( - mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>, -) { +pub async fn handle_pings(mut ping_request_receiver: mpsc::UnboundedReceiver<PingRequest>) { let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await .expect("Failed to bind UDP socket"); @@ -246,19 +253,23 @@ pub async fn handle_pings( } tokio::spawn(async move { - handle( - match timeout(Duration::from_secs(1), rx).await { - Ok(Ok(r)) => Some(r), - Ok(Err(_)) => { - warn!("Ping response sender for server {}, ping id {} dropped", socket_addr, id); - None - } - Err(_) => { - debug!("Server {} timed out when sending ping id {}", socket_addr, id); - None - } + handle(match timeout(Duration::from_secs(1), rx).await { + Ok(Ok(r)) => Some(r), + Ok(Err(_)) => { + warn!( + "Ping response sender for server {}, ping id {} dropped", + socket_addr, id + ); + None } - ); + Err(_) => { + debug!( + "Server {} timed out when sending ping id {}", + socket_addr, id + ); + None + } + }); }); } }; diff --git a/mumd/src/state.rs b/mumd/src/state.rs index a553e18..84583e0 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -4,11 +4,12 @@ pub mod user; use crate::audio::{AudioInput, AudioOutput, NotificationEvents}; use crate::error::StateError; -use crate::network::{ConnectionInfo, VoiceStreamType}; use crate::network::tcp::{TcpEvent, TcpEventData}; +use crate::network::{ConnectionInfo, VoiceStreamType}; use crate::notifications; use crate::state::server::Server; +use crate::state::user::UserDiff; use log::*; use mumble_protocol::control::msgs; use mumble_protocol::control::ControlPacket; @@ -17,8 +18,11 @@ use mumble_protocol::voice::Serverbound; use mumlib::command::{Command, CommandResponse, MessageTarget}; use mumlib::config::Config; use mumlib::Error; -use crate::state::user::UserDiff; -use std::{iter, net::{SocketAddr, ToSocketAddrs}, sync::{Arc, RwLock}}; +use std::{ + iter, + net::{SocketAddr, ToSocketAddrs}, + sync::{Arc, RwLock}, +}; use tokio::sync::{mpsc, watch}; macro_rules! at { @@ -37,18 +41,22 @@ type Responses = Box<dyn Iterator<Item = mumlib::error::Result<Option<CommandRes //TODO give me a better name pub enum ExecutionContext { - TcpEventCallback( - TcpEvent, - Box<dyn FnOnce(TcpEventData) -> Responses>, - ), + TcpEventCallback(TcpEvent, Box<dyn FnOnce(TcpEventData) -> Responses>), TcpEventSubscriber( TcpEvent, - Box<dyn FnMut(TcpEventData, &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>) -> bool>, + Box< + dyn FnMut( + TcpEventData, + &mut mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>, + ) -> bool, + >, ), Now(Box<dyn FnOnce() -> Responses>), Ping( Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>, - Box<dyn FnOnce(Option<PongPacket>) -> mumlib::error::Result<Option<CommandResponse>> + Send>, + Box< + dyn FnOnce(Option<PongPacket>) -> mumlib::error::Result<Option<CommandResponse>> + Send, + >, ), } @@ -76,10 +84,10 @@ impl State { let audio_input = AudioInput::new( config.audio.input_volume.unwrap_or(1.0), phase_watcher.1.clone(), - ).map_err(|e| StateError::AudioError(e))?; - let audio_output = AudioOutput::new( - config.audio.output_volume.unwrap_or(1.0), - ).map_err(|e| StateError::AudioError(e))?; + ) + .map_err(|e| StateError::AudioError(e))?; + let audio_output = AudioOutput::new(config.audio.output_volume.unwrap_or(1.0)) + .map_err(|e| StateError::AudioError(e))?; let mut state = Self { config, server: None, @@ -92,7 +100,6 @@ impl State { Ok(state) } - pub fn parse_user_state(&mut self, msg: msgs::UserState) { if !msg.has_session() { warn!("Can't parse user state without session"); @@ -135,7 +142,8 @@ impl State { )); } - self.audio_output.play_effect(NotificationEvents::UserConnected); + self.audio_output + .play_effect(NotificationEvents::UserConnected); } } } @@ -189,11 +197,12 @@ impl State { } else { warn!("{} moved to invalid channel {}", user.name(), to_channel); } - self.audio_output.play_effect(if from_channel == this_channel { - NotificationEvents::UserJoinedChannel - } else { - NotificationEvents::UserLeftChannel - }); + self.audio_output + .play_effect(if from_channel == this_channel { + NotificationEvents::UserJoinedChannel + } else { + NotificationEvents::UserLeftChannel + }); } } @@ -224,7 +233,8 @@ impl State { let this_channel = self.get_users_channel(self.server().unwrap().session_id().unwrap()); let other_channel = self.get_users_channel(msg.get_session()); if this_channel == other_channel { - self.audio_output.play_effect(NotificationEvents::UserDisconnected); + self.audio_output + .play_effect(NotificationEvents::UserDisconnected); if let Some(user) = self.server().unwrap().users().get(&msg.get_session()) { notifications::send(format!("{} disconnected", &user.name())); } @@ -254,21 +264,19 @@ impl State { self.audio_output.load_sound_effects(sound_effects); } } - + pub fn register_message(&mut self, msg: (String, u32)) { self.message_buffer.push(msg); } pub fn broadcast_phase(&self, phase: StatePhase) { - self.phase_watcher - .0 - .send(phase) - .unwrap(); + self.phase_watcher.0.send(phase).unwrap(); } pub fn initialized(&self) { self.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); - self.audio_output.play_effect(NotificationEvents::ServerConnect); + self.audio_output + .play_effect(NotificationEvents::ServerConnect); } pub fn audio_input(&self) -> &AudioInput { @@ -307,10 +315,12 @@ impl State { /// If we are connected to the server but the user with the id doesn't exist, the string "Unknown user {id}" /// is returned instead. If we aren't connected to a server, None is returned instead. fn get_user_name(&self, user: u32) -> Option<String> { - self.server() - .map(|e| e.users() - .get(&user).map(|e| e.name().to_string()) - .unwrap_or(format!("Unknown user {}", user))) + self.server().map(|e| { + e.users() + .get(&user) + .map(|e| e.name().to_string()) + .unwrap_or(format!("Unknown user {}", user)) + }) } } @@ -404,7 +414,9 @@ pub fn handle_command( packet_sender.send(msg.into()).unwrap(); } - now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }))) + now!(Ok( + new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }) + )) } Command::InputVolumeSet(volume) => { state.audio_input.set_volume(volume); @@ -498,7 +510,9 @@ pub fn handle_command( packet_sender.send(msg.into()).unwrap(); } - now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }))) + now!(Ok( + new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }) + )) } Command::OutputVolumeSet(volume) => { state.audio_output.set_volume(volume); @@ -522,10 +536,7 @@ pub fn handle_command( *server.password_mut() = password; *server.host_mut() = Some(format!("{}:{}", host, port)); state.server = Some(server); - state.phase_watcher - .0 - .send(StatePhase::Connecting) - .unwrap(); + state.phase_watcher.0.send(StatePhase::Connecting).unwrap(); let socket_addr = match (host.as_ref(), port) .to_socket_addrs() @@ -568,11 +579,14 @@ pub fn handle_command( state.server = None; - state.phase_watcher + state + .phase_watcher .0 .send(StatePhase::Disconnected) .unwrap(); - state.audio_output.play_effect(NotificationEvents::ServerDisconnect); + state + .audio_output + .play_effect(NotificationEvents::ServerDisconnect); now!(Ok(None)) } Command::ServerStatus { host, port } => ExecutionContext::Ping( @@ -586,12 +600,14 @@ pub fn handle_command( } }), Box::new(move |pong| { - Ok(pong.map(|pong| (CommandResponse::ServerStatus { - version: pong.version, - users: pong.users, - max_users: pong.max_users, - bandwidth: pong.bandwidth, - }))) + Ok(pong.map(|pong| { + (CommandResponse::ServerStatus { + version: pong.version, + users: pong.users, + max_users: pong.max_users, + bandwidth: pong.bandwidth, + }) + })) }), ), Command::Status => { @@ -624,7 +640,7 @@ pub fn handle_command( } Command::PastMessages { block } => { //does it make sense to wait for messages while not connected? - if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { + if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { return now!(Err(Error::Disconnected)); } if block { @@ -634,10 +650,16 @@ pub fn handle_command( Box::new(move |data, sender| { if let TcpEventData::TextMessage(a) = data { let message = ( - a.get_message().to_owned(), - ref_state.read().unwrap().get_user_name(a.get_actor()).unwrap() + a.get_message().to_owned(), + ref_state + .read() + .unwrap() + .get_user_name(a.get_actor()) + .unwrap(), ); - sender.send(Ok(Some(CommandResponse::PastMessage { message }))).is_ok() + sender + .send(Ok(Some(CommandResponse::PastMessage { message }))) + .is_ok() } else { unreachable!("Should only receive a TextMessage data when listening to TextMessage events"); } @@ -645,21 +667,20 @@ pub fn handle_command( ) } else { let messages = std::mem::take(&mut state.message_buffer); - let messages: Vec<_> = messages.into_iter() + let messages: Vec<_> = messages + .into_iter() .map(|(msg, user)| (msg, state.get_user_name(user).unwrap())) .map(|e| Ok(Some(CommandResponse::PastMessage { message: e }))) .collect(); - - ExecutionContext::Now(Box::new(move || { - Box::new(messages.into_iter()) - })) + + ExecutionContext::Now(Box::new(move || Box::new(messages.into_iter()))) } } Command::SendMessage { message, targets } => { if !matches!(*state.phase_receiver().borrow(), StatePhase::Connected(_)) { return now!(Err(Error::Disconnected)); } - + let mut msg = msgs::TextMessage::new(); msg.set_message(message); @@ -667,21 +688,20 @@ pub fn handle_command( for target in targets { match target { MessageTarget::Channel { recursive, name } => { - let channel_id = state - .server() - .unwrap() - .channel_name(&name); - + let channel_id = state.server().unwrap().channel_name(&name); + let channel_id = match channel_id { Ok(id) => id, Err(e) => return now!(Err(Error::ChannelIdentifierError(name, e))), - }.0; - + } + .0; + if recursive { msg.mut_tree_id() } else { msg.mut_channel_id() - }.push(channel_id); + } + .push(channel_id); } MessageTarget::User { name } => { let id = state diff --git a/mumd/src/state/channel.rs b/mumd/src/state/channel.rs index f58ed15..6995e1e 100644 --- a/mumd/src/state/channel.rs +++ b/mumd/src/state/channel.rs @@ -157,10 +157,7 @@ pub fn into_channel( let mut proto_tree = ProtoTree { channel: Some(channels.get(&0).unwrap()), children: HashMap::new(), - users: channel_lookup - .get(&0) - .cloned() - .unwrap_or_default(), + users: channel_lookup.get(&0).cloned().unwrap_or_default(), }; for (walk, channel) in walks { diff --git a/mumd/src/state/server.rs b/mumd/src/state/server.rs index 78a10b9..869940a 100644 --- a/mumd/src/state/server.rs +++ b/mumd/src/state/server.rs @@ -103,15 +103,20 @@ impl Server { /// server.channels.insert(0, channel.clone); /// assert_eq!(server.channel_name("Foobar"), Ok((0, &channel))); /// ``` - pub fn channel_name(&self, channel_name: &str) -> Result<(u32, &Channel), ChannelIdentifierError> { - let matches = self.channels + pub fn channel_name( + &self, + channel_name: &str, + ) -> Result<(u32, &Channel), ChannelIdentifierError> { + let matches = self + .channels .iter() .map(|e| ((*e.0, e.1), e.1.path(&self.channels))) .filter(|e| e.1.ends_with(channel_name)) .collect::<Vec<_>>(); Ok(match matches.len() { 0 => { - let soft_matches = self.channels + let soft_matches = self + .channels .iter() .map(|e| ((*e.0, e.1), e.1.path(&self.channels).to_lowercase())) .filter(|e| e.1.ends_with(&channel_name.to_lowercase())) |
