diff options
| -rw-r--r-- | CHANGELOG | 1 | ||||
| -rw-r--r-- | Cargo.lock | 40 | ||||
| -rw-r--r-- | mumctl/Cargo.toml | 4 | ||||
| -rw-r--r-- | mumctl/src/main.rs | 79 | ||||
| -rw-r--r-- | mumd/Cargo.toml | 4 | ||||
| -rw-r--r-- | mumd/src/audio.rs | 288 | ||||
| -rw-r--r-- | mumd/src/audio/input.rs | 110 | ||||
| -rw-r--r-- | mumd/src/audio/output.rs | 194 | ||||
| -rw-r--r-- | mumd/src/command.rs | 10 | ||||
| -rw-r--r-- | mumd/src/error.rs | 6 | ||||
| -rw-r--r-- | mumd/src/main.rs | 8 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 4 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 73 | ||||
| -rw-r--r-- | mumd/src/state.rs | 68 | ||||
| -rw-r--r-- | mumlib/Cargo.toml | 3 | ||||
| -rw-r--r-- | mumlib/src/lib.rs | 3 |
16 files changed, 554 insertions, 341 deletions
@@ -37,6 +37,7 @@ Fixed * Informative error message instead of panic when a running mumd-process can't be found. * Lots of other minor informative error messages instead of panics. + * Status requests are sent in parallel. Other ~~~~~ @@ -161,6 +161,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] +name = "chrono" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +dependencies = [ + "libc", + "num-integer", + "num-traits", + "time", + "winapi", +] + +[[package]] name = "clang-sys" version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -841,7 +854,7 @@ dependencies = [ [[package]] name = "mumctl" -version = "0.3.0" +version = "0.4.0" dependencies = [ "bincode", "colored", @@ -852,7 +865,7 @@ dependencies = [ [[package]] name = "mumd" -version = "0.3.0" +version = "0.4.0" dependencies = [ "bincode", "bytes", @@ -883,8 +896,9 @@ dependencies = [ [[package]] name = "mumlib" -version = "0.3.0" +version = "0.4.0" dependencies = [ + "chrono", "colored", "dirs", "fern", @@ -999,6 +1013,16 @@ dependencies = [ ] [[package]] +name = "num-integer" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] name = "num-traits" version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1573,6 +1597,16 @@ dependencies = [ ] [[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi", +] + +[[package]] name = "tokio" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/mumctl/Cargo.toml b/mumctl/Cargo.toml index f19a67c..fff2a1c 100644 --- a/mumctl/Cargo.toml +++ b/mumctl/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mumctl" -version = "0.3.0" +version = "0.4.0" authors = ["Gustav Sörnäs <gustav@sornas.net>", "Eskil Queseth <eskilq@kth.se>"] edition = "2018" @@ -12,7 +12,7 @@ license = "MIT" readme = "../README.md" [dependencies] -mumlib = { version = "0.3", path = "../mumlib" } +mumlib = { version = "0.4", path = "../mumlib" } bincode = "1" colored = "2" diff --git a/mumctl/src/main.rs b/mumctl/src/main.rs index 0fe852c..191a973 100644 --- a/mumctl/src/main.rs +++ b/mumctl/src/main.rs @@ -3,7 +3,11 @@ use log::*; use mumlib::command::{Command as MumCommand, CommandResponse}; use mumlib::config::{self, Config, ServerConfig}; use mumlib::state::Channel as MumChannel; -use std::{fmt,io::{self, BufRead, Read, Write}, iter, os::unix::net::UnixStream}; +use std::fmt; +use std::io::{self, BufRead, Read, Write}; +use std::iter; +use std::os::unix::net::UnixStream; +use std::thread; use structopt::{clap::Shell, StructOpt}; const INDENTATION: &str = " "; @@ -297,7 +301,7 @@ fn match_opt() -> Result<(), Error> { } } _ => { - return Err(CliError::ConfigKeyNotFound(key))?; + return Err(CliError::ConfigKeyNotFound(key).into()); } }, Command::ConfigReload => { @@ -364,7 +368,7 @@ fn match_opt() -> Result<(), Error> { Ok(()) } -fn match_server_command(server_command: Server, config: &mut Config) -> Result<(), CliError> { +fn match_server_command(server_command: Server, config: &mut Config) -> Result<(), Error> { match server_command { Server::Config { server_name, @@ -474,7 +478,7 @@ fn match_server_command(server_command: Server, config: &mut Config) -> Result<( password, } => { if config.servers.iter().any(|s| s.name == name) { - return Err(CliError::ServerAlreadyExists(name))?; + return Err(CliError::ServerAlreadyExists(name).into()); } else { config.servers.push(ServerConfig { name, @@ -495,31 +499,56 @@ fn match_server_command(server_command: Server, config: &mut Config) -> Result<( } Server::List => { if config.servers.is_empty() { - return Err(CliError::NoServers)?; + return Err(CliError::NoServers.into()); } - let query = config + + let longest = config .servers .iter() - .map(|e| { - let response = send_command(MumCommand::ServerStatus { - host: e.host.clone(), - port: e.port.unwrap_or(mumlib::DEFAULT_PORT), - }); - response.map(|f| (e, f)) + .map(|s| s.name.len()) + .max() + .unwrap() // ok since !config.servers.is_empty() above + + 1; + + let queries: Vec<_> = config + .servers + .iter() + .map(|s| { + let query = MumCommand::ServerStatus { + host: s.host.clone(), + port: s.port.unwrap_or(mumlib::DEFAULT_PORT), + }; + thread::spawn(move || { + send_command(query) + }) }) - .collect::<Result<Vec<_>, _>>()?; - for (server, response) in query - .into_iter() - .filter(|e| e.1.is_ok()) - .map(|e| (e.0, e.1.unwrap().unwrap())) - { - if let CommandResponse::ServerStatus { - users, max_users, .. - } = response - { - println!("{} [{}/{}]", server.name, users, max_users) - } else { - unreachable!() + .collect(); + + for (server, response) in config.servers.iter().zip(queries) { + match response.join().unwrap() { + Ok(Ok(Some(response))) => { + if let CommandResponse::ServerStatus { + users, + max_users, + .. + } = response + { + println!("{0:<1$} [{2:}/{3:}]", server.name, longest, users, max_users); + } else { + unreachable!(); + } + } + Ok(Ok(None)) => { + println!("{0:<1$} offline", server.name, longest); + } + Ok(Err(e)) => { + error!("{}", e); + return Err(e.into()); + } + Err(e) => { + error!("{}", e); + return Err(e.into()); + } } } } diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml index 9d7eeaa..d8e2635 100644 --- a/mumd/Cargo.toml +++ b/mumd/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mumd" -version = "0.3.0" +version = "0.4.0" authors = ["Gustav Sörnäs <gustav@sornas.net>", "Eskil Queseth <eskilq@kth.se>"] edition = "2018" @@ -17,7 +17,7 @@ default = ["notifications"] notifications = ["libnotify"] [dependencies] -mumlib = { version = "0.3", path = "../mumlib" } +mumlib = { version = "0.4", path = "../mumlib" } cpal = "0.13" bytes = "1.0" diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index fdbeaee..df7af70 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -2,14 +2,14 @@ pub mod input; pub mod output; mod noise_gate; -use crate::audio::output::SaturatingAdd; +use crate::audio::input::{DefaultAudioInputDevice, AudioInputDevice}; +use crate::audio::output::{DefaultAudioOutputDevice, AudioOutputDevice, ClientStream}; use crate::audio::noise_gate::{from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt}; -use crate::error::{AudioError, AudioStream}; +use crate::error::AudioError; use crate::network::VoiceStreamType; use crate::state::StatePhase; -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{SampleFormat, SampleRate, StreamConfig}; +use cpal::SampleRate; use dasp_interpolate::linear::Linear; use dasp_signal::{self as signal, Signal}; use futures_util::stream::Stream; @@ -19,7 +19,7 @@ use mumble_protocol::Serverbound; use mumble_protocol::voice::{VoicePacketPayload, VoicePacket}; use mumlib::config::SoundEffect; use std::borrow::Cow; -use std::collections::{hash_map::Entry, HashMap, VecDeque}; +use std::collections::{hash_map::Entry, HashMap}; use std::convert::TryFrom; use std::fmt::Debug; use std::fs::File; @@ -67,170 +67,82 @@ impl TryFrom<&str> for NotificationEvents { } } -pub struct Audio { - output_config: StreamConfig, - _output_stream: cpal::Stream, - _input_stream: cpal::Stream, +pub struct AudioInput { + device: DefaultAudioInputDevice, - input_channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>, - input_volume_sender: watch::Sender<f32>, - - output_volume_sender: watch::Sender<f32>, - - user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, - - client_streams: Arc<Mutex<HashMap<(VoiceStreamType, u32), output::ClientStream>>>, - - sounds: HashMap<NotificationEvents, Vec<f32>>, - play_sounds: Arc<Mutex<VecDeque<f32>>>, + channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>, } -impl Audio { - pub fn new(input_volume: f32, output_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> { +impl AudioInput { + pub fn new(input_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> { + let mut default = DefaultAudioInputDevice::new(input_volume, phase_watcher)?; let sample_rate = SampleRate(SAMPLE_RATE); - let host = cpal::default_host(); - let output_device = host - .default_output_device() - .ok_or(AudioError::NoDevice(AudioStream::Output))?; - let output_supported_config = output_device - .supported_output_configs() - .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))? - .find_map(|c| { - if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { - Some(c) - } else { - None - } - }) - .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))? - .with_sample_rate(sample_rate); - let output_supported_sample_format = output_supported_config.sample_format(); - let output_config: StreamConfig = output_supported_config.into(); + let opus_stream = OpusEncoder::new( + 4, + sample_rate.0, + default.num_channels(), + StreamingSignalExt::into_interleaved_samples( + StreamingNoiseGate::new( + from_interleaved_samples_stream::<_, f32>(default.sample_receiver()), //TODO group frames correctly + 10_000 + ) + ) + ).enumerate() + .map(|(i, e)| VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: i as u64, + payload: VoicePacketPayload::Opus(e.into(), false), + position_info: None, + } + ); - let input_device = host - .default_input_device() - .ok_or(AudioError::NoDevice(AudioStream::Input))?; - let input_supported_config = input_device - .supported_input_configs() - .map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))? - .find_map(|c| { - if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { - Some(c) - } else { - None - } - }) - .ok_or(AudioError::NoSupportedConfig(AudioStream::Input))? - .with_sample_rate(sample_rate); - let input_supported_sample_format = input_supported_config.sample_format(); - let input_config: StreamConfig = input_supported_config.into(); + default.play()?; - let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); + let res = Self { + device: default, + channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))), + }; + Ok(res) + } - let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); - let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume); - let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new())); + pub fn receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> { + Arc::clone(&self.channel_receiver) + } - let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new())); - let output_stream = match output_supported_sample_format { - SampleFormat::F32 => output_device.build_output_stream( - &output_config, - output::curry_callback::<f32>( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - SampleFormat::I16 => output_device.build_output_stream( - &output_config, - output::curry_callback::<i16>( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - SampleFormat::U16 => output_device.build_output_stream( - &output_config, - output::curry_callback::<u16>( - Arc::clone(&play_sounds), - Arc::clone(&client_streams), - output_volume_receiver, - Arc::clone(&user_volumes), - ), - err_fn, - ), - } - .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; + pub fn set_volume(&self, input_volume: f32) { + self.device.set_volume(input_volume); + } +} - let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000); +pub struct AudioOutput { + device: DefaultAudioOutputDevice, + user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, - let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume); + client_streams: Arc<Mutex<ClientStream>>, - let input_stream = match input_supported_sample_format { - SampleFormat::F32 => input_device.build_input_stream( - &input_config, - input::callback::<f32>( - sample_sender, - input_volume_receiver, - phase_watcher, - ), - err_fn, - ), - SampleFormat::I16 => input_device.build_input_stream( - &input_config, - input::callback::<i16>( - sample_sender, - input_volume_receiver, - phase_watcher, - ), - err_fn, - ), - SampleFormat::U16 => input_device.build_input_stream( - &input_config, - input::callback::<u16>( - sample_sender, - input_volume_receiver, - phase_watcher, - ), - err_fn, - ), - } - .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?; + sounds: HashMap<NotificationEvents, Vec<f32>>, +} - let opus_stream = OpusEncoder::new( - 4, - input_config.sample_rate.0, - input_config.channels as usize, - StreamingSignalExt::into_interleaved_samples( - StreamingNoiseGate::new( - from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly - 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: i as u64, - payload: VoicePacketPayload::Opus(e.into(), false), - position_info: None, - }); +impl AudioOutput { + pub fn new(output_volume: f32) -> Result<Self, AudioError> { + let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new())); + + let default = DefaultAudioOutputDevice::new( + output_volume, + Arc::clone(&user_volumes), + )?; + default.play()?; - output_stream.play().map_err(|e| AudioError::OutputPlayError(e))?; + let client_streams = default.client_streams(); let mut res = Self { - output_config, - _output_stream: output_stream, - _input_stream: input_stream, - input_volume_sender, - input_channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))), - client_streams, + device: default, sounds: HashMap::new(), - output_volume_sender, + client_streams, user_volumes, - play_sounds, }; res.load_sound_effects(&[]); Ok(res) @@ -278,7 +190,7 @@ impl Audio { .until_exhausted() // if the source audio is stereo and is being played as mono, discard the right audio .flat_map( - |e| if self.output_config.channels == 1 { + |e| if self.device.num_channels() == 1 { vec![e[0]] } else { e.to_vec() @@ -291,64 +203,14 @@ impl Audio { } pub fn decode_packet_payload(&self, stream_type: VoiceStreamType, session_id: u32, payload: VoicePacketPayload) { - match self.client_streams.lock().unwrap().entry((stream_type, session_id)) { - Entry::Occupied(mut entry) => { - entry - .get_mut() - .decode_packet(payload, self.output_config.channels as usize); - } - Entry::Vacant(_) => { - warn!("Can't find session id {}", session_id); - } - } - } - - pub fn add_client(&self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) { - Entry::Occupied(_) => { - warn!("Session id {} already exists", session_id); - } - Entry::Vacant(entry) => { - entry.insert(output::ClientStream::new( - self.output_config.sample_rate.0, - self.output_config.channels, - )); - } - } - } - } - - pub fn remove_client(&self, session_id: u32) { - for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() { - match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) { - Entry::Occupied(entry) => { - entry.remove(); - } - Entry::Vacant(_) => { - warn!( - "Tried to remove session id {} that doesn't exist", - session_id - ); - } - } - } - } - - pub fn input_receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> { - Arc::clone(&self.input_channel_receiver) + self.client_streams.lock().unwrap().decode_packet( + (stream_type, session_id), + payload, + ); } - pub fn clear_clients(&mut self) { - self.client_streams.lock().unwrap().clear(); - } - - pub fn set_input_volume(&self, input_volume: f32) { - self.input_volume_sender.send(input_volume).unwrap(); - } - - pub fn set_output_volume(&self, output_volume: f32) { - self.output_volume_sender.send(output_volume).unwrap(); + pub fn set_volume(&self, output_volume: f32) { + self.device.set_volume(output_volume); } pub fn set_user_volume(&self, id: u32, volume: f32) { @@ -375,15 +237,7 @@ impl Audio { pub fn play_effect(&self, effect: NotificationEvents) { let samples = self.sounds.get(&effect).unwrap(); - - let mut play_sounds = self.play_sounds.lock().unwrap(); - - for (val, e) in play_sounds.iter_mut().zip(samples.iter()) { - *val = val.saturating_add(*e); - } - - let l = play_sounds.len(); - play_sounds.extend(samples.iter().skip(l)); + self.client_streams.lock().unwrap().extend(None, samples); } } diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 176747d..4a1ed3d 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -1,8 +1,11 @@ -use cpal::{InputCallbackInfo, Sample}; +use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use tokio::sync::watch; use log::*; use crate::state::StatePhase; +use crate::audio::SAMPLE_RATE; +use crate::error::{AudioError, AudioStream}; pub fn callback<T: Sample>( mut input_sender: futures_channel::mpsc::Sender<f32>, @@ -24,3 +27,108 @@ pub fn callback<T: Sample>( } } } + +pub trait AudioInputDevice { + fn play(&self) -> Result<(), AudioError>; + fn pause(&self) -> Result<(), AudioError>; + fn set_volume(&self, volume: f32); + fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver<f32>; + fn num_channels(&self) -> usize; +} + +pub struct DefaultAudioInputDevice { + stream: cpal::Stream, + sample_receiver: Option<futures_channel::mpsc::Receiver<f32>>, + volume_sender: watch::Sender<f32>, + channels: u16, +} + +impl DefaultAudioInputDevice { + pub fn new(input_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> { + let sample_rate = SampleRate(SAMPLE_RATE); + + let host = cpal::default_host(); + + let input_device = host + .default_input_device() + .ok_or(AudioError::NoDevice(AudioStream::Input))?; + let input_supported_config = input_device + .supported_input_configs() + .map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))? + .find_map(|c| { + if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { + Some(c) + } else { + None + } + }) + .ok_or(AudioError::NoSupportedConfig(AudioStream::Input))? + .with_sample_rate(sample_rate); + let input_supported_sample_format = input_supported_config.sample_format(); + let input_config: StreamConfig = input_supported_config.into(); + + let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); + + let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000); + + let (volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume); + + let input_stream = match input_supported_sample_format { + SampleFormat::F32 => input_device.build_input_stream( + &input_config, + callback::<f32>( + sample_sender, + input_volume_receiver, + phase_watcher, + ), + err_fn, + ), + SampleFormat::I16 => input_device.build_input_stream( + &input_config, + callback::<i16>( + sample_sender, + input_volume_receiver, + phase_watcher, + ), + err_fn, + ), + SampleFormat::U16 => input_device.build_input_stream( + &input_config, + callback::<u16>( + sample_sender, + input_volume_receiver, + phase_watcher, + ), + err_fn, + ), + } + .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?; + + let res = Self { + stream: input_stream, + sample_receiver: Some(sample_receiver), + volume_sender, + channels: input_config.channels, + }; + Ok(res) + } +} + +impl AudioInputDevice for DefaultAudioInputDevice { + fn play(&self) -> Result<(), AudioError> { + self.stream.play().map_err(|e| AudioError::InputPlayError(e)) + } + fn pause(&self) -> Result<(), AudioError> { + self.stream.pause().map_err(|e| AudioError::InputPauseError(e)) + } + fn set_volume(&self, volume: f32) { + self.volume_sender.send(volume).unwrap(); + } + fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver<f32> { + let ret = self.sample_receiver.take(); + ret.unwrap() + } + fn num_channels(&self) -> usize { + self.channels as usize + } +} diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 421d395..658c1c8 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -1,50 +1,76 @@ use crate::network::VoiceStreamType; +use crate::audio::SAMPLE_RATE; +use crate::error::{AudioError, AudioStream}; -use cpal::{OutputCallbackInfo, Sample}; +use log::*; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use cpal::{SampleFormat, SampleRate, StreamConfig, OutputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; -use opus::Channels; use std::collections::{HashMap, VecDeque}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; use tokio::sync::watch; +type ClientStreamKey = (VoiceStreamType, u32); + pub struct ClientStream { - buffer: VecDeque<f32>, //TODO ring buffer? - opus_decoder: opus::Decoder, + buffer_clients: HashMap<ClientStreamKey, (VecDeque<f32>, opus::Decoder)>, //TODO ring buffer? + buffer_effects: VecDeque<f32>, + sample_rate: u32, + channels: opus::Channels, } impl ClientStream { pub fn new(sample_rate: u32, channels: u16) -> Self { + let channels = match channels { + 1 => opus::Channels::Mono, + 2 => opus::Channels::Stereo, + _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), + }; Self { - buffer: VecDeque::new(), - opus_decoder: opus::Decoder::new( - sample_rate, - match channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels), - }, - ) - .unwrap(), + buffer_clients: HashMap::new(), + buffer_effects: VecDeque::new(), + sample_rate, + channels, } } - pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) { + fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque<f32>, opus::Decoder) { + let sample_rate = self.sample_rate; + let channels = self.channels; + self.buffer_clients.entry(client).or_insert_with(|| { + let opus_decoder = opus::Decoder::new( + sample_rate, + channels + ).unwrap(); + (VecDeque::new(), opus_decoder) + }) + } + + pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) { match payload { VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode - let parsed = self - .opus_decoder + let mut out: Vec<f32> = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode + let (buffer, decoder) = self.get_client(client); + let parsed = decoder .decode_float(&bytes, &mut out, false) .expect("Error decoding"); out.truncate(parsed); - self.buffer.extend(out); + buffer.extend(&out); } _ => { unimplemented!("Payload type not supported"); } } } + + pub fn extend(&mut self, client: Option<ClientStreamKey>, values: &[f32]) { + let buffer = match client { + Some(x) => &mut self.get_client(x).0, + None => &mut self.buffer_effects, + }; + buffer.extend(values.iter().copied()); + } } pub trait SaturatingAdd { @@ -73,9 +99,116 @@ impl SaturatingAdd for u16 { } } +pub trait AudioOutputDevice { + fn play(&self) -> Result<(), AudioError>; + fn pause(&self) -> Result<(), AudioError>; + fn set_volume(&self, volume: f32); + fn num_channels(&self) -> usize; + fn client_streams(&self) -> Arc<Mutex<ClientStream>>; +} + +pub struct DefaultAudioOutputDevice { + config: StreamConfig, + stream: cpal::Stream, + client_streams: Arc<Mutex<ClientStream>>, + volume_sender: watch::Sender<f32>, +} + +impl DefaultAudioOutputDevice { + pub fn new( + output_volume: f32, + user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, + ) -> Result<Self, AudioError> { + let sample_rate = SampleRate(SAMPLE_RATE); + + let host = cpal::default_host(); + let output_device = host + .default_output_device() + .ok_or(AudioError::NoDevice(AudioStream::Output))?; + let output_supported_config = output_device + .supported_output_configs() + .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))? + .find_map(|c| { + if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { + Some(c) + } else { + None + } + }) + .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))? + .with_sample_rate(sample_rate); + let output_supported_sample_format = output_supported_config.sample_format(); + let output_config: StreamConfig = output_supported_config.into(); + let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(sample_rate.0, output_config.channels))); + + let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); + + let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume); + + let output_stream = match output_supported_sample_format { + SampleFormat::F32 => output_device.build_output_stream( + &output_config, + curry_callback::<f32>( + Arc::clone(&client_streams), + output_volume_receiver, + user_volumes, + ), + err_fn, + ), + SampleFormat::I16 => output_device.build_output_stream( + &output_config, + curry_callback::<i16>( + Arc::clone(&client_streams), + output_volume_receiver, + user_volumes, + ), + err_fn, + ), + SampleFormat::U16 => output_device.build_output_stream( + &output_config, + curry_callback::<u16>( + Arc::clone(&client_streams), + output_volume_receiver, + user_volumes, + ), + err_fn, + ), + } + .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?; + + Ok(Self { + config: output_config, + stream: output_stream, + volume_sender: output_volume_sender, + client_streams, + }) + } +} + +impl AudioOutputDevice for DefaultAudioOutputDevice { + fn play(&self) -> Result<(), AudioError> { + self.stream.play().map_err(|e| AudioError::OutputPlayError(e)) + } + + fn pause(&self) -> Result<(), AudioError> { + self.stream.pause().map_err(|e| AudioError::OutputPauseError(e)) + } + + fn set_volume(&self, volume: f32) { + self.volume_sender.send(volume).unwrap(); + } + + fn num_channels(&self) -> usize { + self.config.channels as usize + } + + fn client_streams(&self) -> Arc<Mutex<ClientStream>> { + Arc::clone(&self.client_streams) + } +} + pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>( - effect_sound: Arc<Mutex<VecDeque<f32>>>, - user_bufs: Arc<Mutex<HashMap<(VoiceStreamType, u32), ClientStream>>>, + user_bufs: Arc<Mutex<ClientStream>>, output_volume_receiver: watch::Receiver<f32>, user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { @@ -86,26 +219,21 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display> let volume = *output_volume_receiver.borrow(); - let mut effects_sound = effect_sound.lock().unwrap(); let mut user_bufs = user_bufs.lock().unwrap(); - for ((_, id), client_stream) in &mut *user_bufs { - let (user_volume, muted) = user_volumes - .lock() - .unwrap() - .get(id) - .cloned() - .unwrap_or((1.0, false)); + let user_volumes = user_volumes.lock().unwrap(); + for (k, v) in user_bufs.buffer_clients.iter_mut() { + let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false)); for sample in data.iter_mut() { - let s = client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume; if !muted { - *sample = sample.saturating_add(Sample::from(&s)); + *sample = sample.saturating_add(Sample::from( + &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume), + )); } } } - for sample in data.iter_mut() { *sample = sample.saturating_add(Sample::from( - &(effects_sound.pop_front().unwrap_or(0.0) * volume), + &(user_bufs.buffer_effects.pop_front().unwrap_or(0.0) * volume), )); } } diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 7eec388..1337dce 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -8,7 +8,7 @@ use crate::state::{ExecutionContext, State}; use log::*; use mumble_protocol::{Serverbound, control::ControlPacket}; use mumlib::command::{Command, CommandResponse}; -use std::sync::{Arc, RwLock}; +use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock}; use tokio::sync::{mpsc, oneshot, watch}; pub async fn handle( @@ -23,6 +23,7 @@ pub async fn handle( mut connection_info_sender: watch::Sender<Option<ConnectionInfo>>, ) { debug!("Begin listening for commands"); + let ping_count = AtomicU64::new(0); while let Some((command, response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); let mut state = state.write().unwrap(); @@ -47,10 +48,13 @@ pub async fn handle( response_sender.send(generator()).unwrap(); } ExecutionContext::Ping(generator, converter) => { - match generator() { + let ret = generator(); + debug!("Ping generated: {:?}", ret); + match ret { Ok(addr) => { + let id = ping_count.fetch_add(1, Ordering::Relaxed); let res = ping_request_sender.send(( - 0, + id, addr, Box::new(move |packet| { response_sender.send(converter(packet)).unwrap(); diff --git a/mumd/src/error.rs b/mumd/src/error.rs index f7818a1..eb63df8 100644 --- a/mumd/src/error.rs +++ b/mumd/src/error.rs @@ -86,6 +86,9 @@ pub enum AudioError { NoSupportedConfig(AudioStream), InvalidStream(AudioStream, cpal::BuildStreamError), OutputPlayError(cpal::PlayStreamError), + OutputPauseError(cpal::PauseStreamError), + InputPlayError(cpal::PlayStreamError), + InputPauseError(cpal::PauseStreamError), } impl fmt::Display for AudioError { @@ -96,6 +99,9 @@ impl fmt::Display for AudioError { AudioError::NoSupportedConfig(s) => write!(f, "No supported {} config found", s), AudioError::InvalidStream(s, e) => write!(f, "Invalid {} stream: {}", s, e), AudioError::OutputPlayError(e) => write!(f, "Playback error: {}", e), + AudioError::OutputPauseError(e) => write!(f, "Playback error: {}", e), + AudioError::InputPlayError(e) => write!(f, "Recording error: {}", e), + AudioError::InputPauseError(e) => write!(f, "Recording error: {}", e), } } } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index d7bc2c0..f298070 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -109,7 +109,13 @@ async fn receive_commands( sender.send((command, tx)).unwrap(); - let response = rx.await.unwrap(); + let response = match rx.await { + Ok(r) => r, + Err(_) => { + error!("Internal command response sender dropped"); + Ok(None) + } + }; let mut serialized = BytesMut::new(); bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 02477dc..7606987 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -112,7 +112,7 @@ pub async fn handle( let (phase_watcher, input_receiver) = { let state_lock = state.read().unwrap(); (state_lock.phase_receiver(), - state_lock.audio().input_receiver()) + state_lock.audio_input().receiver()) }; let event_queue = TcpEventQueue::new(); @@ -362,7 +362,7 @@ async fn listen( state .read() .unwrap() - .audio() + .audio_output() .decode_packet_payload( VoiceStreamType::TCP, session_id, diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index cc085b5..0958912 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -3,27 +3,25 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use futures_util::{FutureExt, SinkExt, StreamExt}; +use futures_util::future::join4; use futures_util::stream::{SplitSink, SplitStream, Stream}; use log::*; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; use mumble_protocol::voice::VoicePacket; use mumble_protocol::Serverbound; -use std::collections::HashMap; +use std::collections::{hash_map::Entry, HashMap}; use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; -use std::rc::Rc; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock}; use tokio::{join, net::UdpSocket}; -use tokio::sync::{mpsc, watch, Mutex}; -use tokio::time::{interval, Duration}; +use tokio::sync::{mpsc, oneshot, watch, Mutex}; +use tokio::time::{interval, timeout, Duration}; use tokio_util::udp::UdpFramed; use super::{run_until, VoiceStreamType}; -use futures_util::future::join4; -pub type PingRequest = (u64, SocketAddr, Box<dyn FnOnce(PongPacket)>); +pub type PingRequest = (u64, SocketAddr, Box<dyn FnOnce(Option<PongPacket>) + Send>); type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>; type UdpReceiver = SplitStream<UdpFramed<ClientCryptState>>; @@ -33,7 +31,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>, ) -> Result<(), UdpError> { - let receiver = state.read().unwrap().audio().input_receiver(); + let receiver = state.read().unwrap().audio_input().receiver(); loop { let connection_info = 'data: loop { @@ -151,7 +149,7 @@ async fn listen( state .read() .unwrap() - .audio() + .audio_output() .decode_packet_payload(VoiceStreamType::UDP, session_id, payload); } } @@ -228,31 +226,68 @@ pub async fn handle_pings( .await .expect("Failed to bind UDP socket"); - let pending = Rc::new(Mutex::new(HashMap::new())); + let pending = Mutex::new(HashMap::new()); - let sender_handle = async { + let sender = async { while let Some((id, socket_addr, handle)) = ping_request_receiver.recv().await { + debug!("Sending ping with id {} to {}", id, socket_addr); let packet = PingPacket { id }; let packet: [u8; 12] = packet.into(); udp_socket.send_to(&packet, &socket_addr).await.unwrap(); - pending.lock().await.insert(id, handle); + let (tx, rx) = oneshot::channel(); + match pending.lock().await.entry(id) { + Entry::Occupied(_) => { + warn!("Tried to send duplicate ping with id {}", id); + continue; + } + Entry::Vacant(v) => { + v.insert(tx); + } + } + + tokio::spawn(async move { + handle( + match timeout(Duration::from_secs(1), rx).await { + Ok(Ok(r)) => Some(r), + Ok(Err(_)) => { + warn!("Ping response sender for server {}, ping id {} dropped", socket_addr, id); + None + } + Err(_) => { + debug!("Server {} timed out when sending ping id {}", socket_addr, id); + None + } + } + ); + }); } }; - let receiver_handle = async { + let receiver = async { let mut buf = vec![0; 24]; + while let Ok(read) = udp_socket.recv(&mut buf).await { - assert_eq!(read, 24); + if read != 24 { + warn!("Ping response had length {}, expected 24", read); + continue; + } let packet = PongPacket::try_from(buf.as_slice()).unwrap(); - if let Some(handler) = pending.lock().await.remove(&packet.id) { - handler(packet); + match pending.lock().await.entry(packet.id) { + Entry::Occupied(o) => { + let id = *o.key(); + if o.remove().send(packet).is_err() { + debug!("Received response to ping with id {} too late", id); + } + } + Entry::Vacant(v) => { + warn!("Received ping with id {} that we didn't send", v.key()); + } } } }; debug!("Waiting for ping requests"); - - join!(sender_handle, receiver_handle); + join!(sender, receiver); } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 0cf876b..46df421 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -2,7 +2,7 @@ pub mod channel; pub mod server; pub mod user; -use crate::audio::{Audio, NotificationEvents}; +use crate::audio::{AudioInput, AudioOutput, NotificationEvents}; use crate::error::StateError; use crate::network::{ConnectionInfo, VoiceStreamType}; use crate::network::tcp::{TcpEvent, TcpEventData}; @@ -43,7 +43,7 @@ pub enum ExecutionContext { Now(Box<dyn FnOnce() -> mumlib::error::Result<Option<CommandResponse>>>), Ping( Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>, - Box<dyn FnOnce(PongPacket) -> mumlib::error::Result<Option<CommandResponse>>>, + Box<dyn FnOnce(Option<PongPacket>) -> mumlib::error::Result<Option<CommandResponse>> + Send>, ), } @@ -57,7 +57,8 @@ pub enum StatePhase { pub struct State { config: Config, server: Option<Server>, - audio: Audio, + audio_input: AudioInput, + audio_output: AudioOutput, phase_watcher: (watch::Sender<StatePhase>, watch::Receiver<StatePhase>), } @@ -66,15 +67,18 @@ impl State { pub fn new() -> Result<Self, StateError> { let config = mumlib::config::read_cfg(&mumlib::config::default_cfg_path())?; let phase_watcher = watch::channel(StatePhase::Disconnected); - let audio = Audio::new( + let audio_input = AudioInput::new( config.audio.input_volume.unwrap_or(1.0), - config.audio.output_volume.unwrap_or(1.0), phase_watcher.1.clone(), ).map_err(|e| StateError::AudioError(e))?; + let audio_output = AudioOutput::new( + config.audio.output_volume.unwrap_or(1.0), + ).map_err(|e| StateError::AudioError(e))?; let mut state = Self { config, server: None, - audio, + audio_input, + audio_output, phase_watcher, }; state.reload_config(); @@ -176,13 +180,13 @@ impl State { let mut new_deaf = None; if let Some((mute, deafen)) = action { if server.deafened() != deafen { - self.audio.play_effect(if deafen { + self.audio_output.play_effect(if deafen { NotificationEvents::Deafen } else { NotificationEvents::Undeafen }); } else if server.muted() != mute { - self.audio.play_effect(if mute { + self.audio_output.play_effect(if mute { NotificationEvents::Mute } else { NotificationEvents::Unmute @@ -207,7 +211,7 @@ impl State { now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b }))) } Command::InputVolumeSet(volume) => { - self.audio.set_input_volume(volume); + self.audio_input.set_volume(volume); now!(Ok(None)) } Command::MuteOther(string, toggle) => { @@ -240,7 +244,7 @@ impl State { if let Some(action) = action { user.set_suppressed(action); - self.audio.set_mute(id, action); + self.audio_output.set_mute(id, action); } return now!(Ok(None)); @@ -269,13 +273,13 @@ impl State { let mut new_mute = None; if let Some((mute, deafen)) = action { if server.deafened() != deafen { - self.audio.play_effect(if deafen { + self.audio_output.play_effect(if deafen { NotificationEvents::Deafen } else { NotificationEvents::Undeafen }); } else if server.muted() != mute { - self.audio.play_effect(if mute { + self.audio_output.play_effect(if mute { NotificationEvents::Mute } else { NotificationEvents::Unmute @@ -301,7 +305,7 @@ impl State { now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b }))) } Command::OutputVolumeSet(volume) => { - self.audio.set_output_volume(volume); + self.audio_output.set_volume(volume); now!(Ok(None)) } Command::Ping => { @@ -367,13 +371,12 @@ impl State { } self.server = None; - self.audio.clear_clients(); self.phase_watcher .0 .send(StatePhase::Disconnected) .unwrap(); - self.audio.play_effect(NotificationEvents::ServerDisconnect); + self.audio_output.play_effect(NotificationEvents::ServerDisconnect); now!(Ok(None)) } Command::ServerStatus { host, port } => ExecutionContext::Ping( @@ -387,7 +390,7 @@ impl State { } }), Box::new(move |pong| { - Ok(Some(CommandResponse::ServerStatus { + Ok(pong.map(|pong| CommandResponse::ServerStatus { version: pong.version, users: pong.users, max_users: pong.max_users, @@ -420,7 +423,7 @@ impl State { Some(v) => v, }; - self.audio.set_user_volume(user_id, volume); + self.audio_output.set_user_volume(user_id, volume); now!(Ok(None)) } } @@ -453,8 +456,6 @@ impl State { *self.server_mut().unwrap().session_id_mut() = Some(session); } else { // this is someone else - self.audio_mut().add_client(session); - // send notification only if we've passed the connecting phase if matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) { let channel_id = msg.get_channel_id(); @@ -470,7 +471,7 @@ impl State { )); } - self.audio.play_effect(NotificationEvents::UserConnected); + self.audio_output.play_effect(NotificationEvents::UserConnected); } } } @@ -524,7 +525,7 @@ impl State { } else { warn!("{} moved to invalid channel {}", user.name(), to_channel); } - self.audio.play_effect(if from_channel == this_channel { + self.audio_output.play_effect(if from_channel == this_channel { NotificationEvents::UserJoinedChannel } else { NotificationEvents::UserLeftChannel @@ -559,13 +560,12 @@ impl State { let this_channel = self.get_users_channel(self.server().unwrap().session_id().unwrap()); let other_channel = self.get_users_channel(msg.get_session()); if this_channel == other_channel { - self.audio.play_effect(NotificationEvents::UserDisconnected); + self.audio_output.play_effect(NotificationEvents::UserDisconnected); if let Some(user) = self.server().unwrap().users().get(&msg.get_session()) { notifications::send(format!("{} disconnected", &user.name())); } } - self.audio().remove_client(msg.get_session()); self.server_mut() .unwrap() .users_mut() @@ -581,13 +581,13 @@ impl State { Err(e) => error!("Couldn't read config: {}", e), } if let Some(input_volume) = self.config.audio.input_volume { - self.audio.set_input_volume(input_volume); + self.audio_input.set_volume(input_volume); } if let Some(output_volume) = self.config.audio.output_volume { - self.audio.set_output_volume(output_volume); + self.audio_output.set_volume(output_volume); } if let Some(sound_effects) = &self.config.audio.sound_effects { - self.audio.load_sound_effects(sound_effects); + self.audio_output.load_sound_effects(sound_effects); } } @@ -600,14 +600,20 @@ impl State { pub fn initialized(&self) { self.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP)); - self.audio.play_effect(NotificationEvents::ServerConnect); + self.audio_output.play_effect(NotificationEvents::ServerConnect); } - pub fn audio(&self) -> &Audio { - &self.audio + pub fn audio_input(&self) -> &AudioInput { + &self.audio_input + } + pub fn audio_output(&self) -> &AudioOutput { + &self.audio_output + } + pub fn audio_input_mut(&mut self) -> &mut AudioInput { + &mut self.audio_input } - pub fn audio_mut(&mut self) -> &mut Audio { - &mut self.audio + pub fn audio_output_mut(&mut self) -> &mut AudioOutput { + &mut self.audio_output } pub fn phase_receiver(&self) -> watch::Receiver<StatePhase> { self.phase_watcher.1.clone() diff --git a/mumlib/Cargo.toml b/mumlib/Cargo.toml index 245b108..5c9d4e1 100644 --- a/mumlib/Cargo.toml +++ b/mumlib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mumlib" -version = "0.3.0" +version = "0.4.0" authors = ["Gustav Sörnäs <gustav@sornas.net>", "Eskil Queseth <eskilq@kth.se>"] edition = "2018" @@ -13,6 +13,7 @@ readme = "../README.md" [dependencies] colored = "2" +chrono = "0.4" dirs = "3" fern = "0.6" log = "0.4" diff --git a/mumlib/src/lib.rs b/mumlib/src/lib.rs index 36edc10..9b7d686 100644 --- a/mumlib/src/lib.rs +++ b/mumlib/src/lib.rs @@ -16,7 +16,7 @@ pub fn setup_logger<T: Into<fern::Output>>(target: T, color: bool) { .format(move |out, message, record| { let message = message.to_string(); out.finish(format_args!( - "{} {}:{}{}{}", + "{} {} {}:{}{}{}", //TODO runtime flag that disables color if color { match record.level() { @@ -36,6 +36,7 @@ pub fn setup_logger<T: Into<fern::Output>>(target: T, color: bool) { } .normal() }, + chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S%.6f]"), record.file().unwrap(), record.line().unwrap(), if message.chars().any(|e| e == '\n') { |
