aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src
diff options
context:
space:
mode:
Diffstat (limited to 'mumd/src')
-rw-r--r--mumd/src/audio.rs288
-rw-r--r--mumd/src/audio/input.rs110
-rw-r--r--mumd/src/audio/output.rs194
-rw-r--r--mumd/src/error.rs6
-rw-r--r--mumd/src/network/tcp.rs4
-rw-r--r--mumd/src/network/udp.rs4
-rw-r--r--mumd/src/state.rs64
7 files changed, 386 insertions, 284 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index fdbeaee..df7af70 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -2,14 +2,14 @@ pub mod input;
pub mod output;
mod noise_gate;
-use crate::audio::output::SaturatingAdd;
+use crate::audio::input::{DefaultAudioInputDevice, AudioInputDevice};
+use crate::audio::output::{DefaultAudioOutputDevice, AudioOutputDevice, ClientStream};
use crate::audio::noise_gate::{from_interleaved_samples_stream, OpusEncoder, StreamingNoiseGate, StreamingSignalExt};
-use crate::error::{AudioError, AudioStream};
+use crate::error::AudioError;
use crate::network::VoiceStreamType;
use crate::state::StatePhase;
-use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
-use cpal::{SampleFormat, SampleRate, StreamConfig};
+use cpal::SampleRate;
use dasp_interpolate::linear::Linear;
use dasp_signal::{self as signal, Signal};
use futures_util::stream::Stream;
@@ -19,7 +19,7 @@ use mumble_protocol::Serverbound;
use mumble_protocol::voice::{VoicePacketPayload, VoicePacket};
use mumlib::config::SoundEffect;
use std::borrow::Cow;
-use std::collections::{hash_map::Entry, HashMap, VecDeque};
+use std::collections::{hash_map::Entry, HashMap};
use std::convert::TryFrom;
use std::fmt::Debug;
use std::fs::File;
@@ -67,170 +67,82 @@ impl TryFrom<&str> for NotificationEvents {
}
}
-pub struct Audio {
- output_config: StreamConfig,
- _output_stream: cpal::Stream,
- _input_stream: cpal::Stream,
+pub struct AudioInput {
+ device: DefaultAudioInputDevice,
- input_channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>,
- input_volume_sender: watch::Sender<f32>,
-
- output_volume_sender: watch::Sender<f32>,
-
- user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
-
- client_streams: Arc<Mutex<HashMap<(VoiceStreamType, u32), output::ClientStream>>>,
-
- sounds: HashMap<NotificationEvents, Vec<f32>>,
- play_sounds: Arc<Mutex<VecDeque<f32>>>,
+ channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>,
}
-impl Audio {
- pub fn new(input_volume: f32, output_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> {
+impl AudioInput {
+ pub fn new(input_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> {
+ let mut default = DefaultAudioInputDevice::new(input_volume, phase_watcher)?;
let sample_rate = SampleRate(SAMPLE_RATE);
- let host = cpal::default_host();
- let output_device = host
- .default_output_device()
- .ok_or(AudioError::NoDevice(AudioStream::Output))?;
- let output_supported_config = output_device
- .supported_output_configs()
- .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))?
- .find_map(|c| {
- if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
- Some(c)
- } else {
- None
- }
- })
- .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))?
- .with_sample_rate(sample_rate);
- let output_supported_sample_format = output_supported_config.sample_format();
- let output_config: StreamConfig = output_supported_config.into();
+ let opus_stream = OpusEncoder::new(
+ 4,
+ sample_rate.0,
+ default.num_channels(),
+ StreamingSignalExt::into_interleaved_samples(
+ StreamingNoiseGate::new(
+ from_interleaved_samples_stream::<_, f32>(default.sample_receiver()), //TODO group frames correctly
+ 10_000
+ )
+ )
+ ).enumerate()
+ .map(|(i, e)| VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0, // normal speech
+ session_id: (), // unused for server-bound packets
+ seq_num: i as u64,
+ payload: VoicePacketPayload::Opus(e.into(), false),
+ position_info: None,
+ }
+ );
- let input_device = host
- .default_input_device()
- .ok_or(AudioError::NoDevice(AudioStream::Input))?;
- let input_supported_config = input_device
- .supported_input_configs()
- .map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))?
- .find_map(|c| {
- if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
- Some(c)
- } else {
- None
- }
- })
- .ok_or(AudioError::NoSupportedConfig(AudioStream::Input))?
- .with_sample_rate(sample_rate);
- let input_supported_sample_format = input_supported_config.sample_format();
- let input_config: StreamConfig = input_supported_config.into();
+ default.play()?;
- let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
+ let res = Self {
+ device: default,
+ channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))),
+ };
+ Ok(res)
+ }
- let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new()));
- let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume);
- let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new()));
+ pub fn receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> {
+ Arc::clone(&self.channel_receiver)
+ }
- let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new()));
- let output_stream = match output_supported_sample_format {
- SampleFormat::F32 => output_device.build_output_stream(
- &output_config,
- output::curry_callback::<f32>(
- Arc::clone(&play_sounds),
- Arc::clone(&client_streams),
- output_volume_receiver,
- Arc::clone(&user_volumes),
- ),
- err_fn,
- ),
- SampleFormat::I16 => output_device.build_output_stream(
- &output_config,
- output::curry_callback::<i16>(
- Arc::clone(&play_sounds),
- Arc::clone(&client_streams),
- output_volume_receiver,
- Arc::clone(&user_volumes),
- ),
- err_fn,
- ),
- SampleFormat::U16 => output_device.build_output_stream(
- &output_config,
- output::curry_callback::<u16>(
- Arc::clone(&play_sounds),
- Arc::clone(&client_streams),
- output_volume_receiver,
- Arc::clone(&user_volumes),
- ),
- err_fn,
- ),
- }
- .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?;
+ pub fn set_volume(&self, input_volume: f32) {
+ self.device.set_volume(input_volume);
+ }
+}
- let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000);
+pub struct AudioOutput {
+ device: DefaultAudioOutputDevice,
+ user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
- let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
+ client_streams: Arc<Mutex<ClientStream>>,
- let input_stream = match input_supported_sample_format {
- SampleFormat::F32 => input_device.build_input_stream(
- &input_config,
- input::callback::<f32>(
- sample_sender,
- input_volume_receiver,
- phase_watcher,
- ),
- err_fn,
- ),
- SampleFormat::I16 => input_device.build_input_stream(
- &input_config,
- input::callback::<i16>(
- sample_sender,
- input_volume_receiver,
- phase_watcher,
- ),
- err_fn,
- ),
- SampleFormat::U16 => input_device.build_input_stream(
- &input_config,
- input::callback::<u16>(
- sample_sender,
- input_volume_receiver,
- phase_watcher,
- ),
- err_fn,
- ),
- }
- .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?;
+ sounds: HashMap<NotificationEvents, Vec<f32>>,
+}
- let opus_stream = OpusEncoder::new(
- 4,
- input_config.sample_rate.0,
- input_config.channels as usize,
- StreamingSignalExt::into_interleaved_samples(
- StreamingNoiseGate::new(
- from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly
- 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio {
- _dst: std::marker::PhantomData,
- target: 0, // normal speech
- session_id: (), // unused for server-bound packets
- seq_num: i as u64,
- payload: VoicePacketPayload::Opus(e.into(), false),
- position_info: None,
- });
+impl AudioOutput {
+ pub fn new(output_volume: f32) -> Result<Self, AudioError> {
+ let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new()));
+
+ let default = DefaultAudioOutputDevice::new(
+ output_volume,
+ Arc::clone(&user_volumes),
+ )?;
+ default.play()?;
- output_stream.play().map_err(|e| AudioError::OutputPlayError(e))?;
+ let client_streams = default.client_streams();
let mut res = Self {
- output_config,
- _output_stream: output_stream,
- _input_stream: input_stream,
- input_volume_sender,
- input_channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))),
- client_streams,
+ device: default,
sounds: HashMap::new(),
- output_volume_sender,
+ client_streams,
user_volumes,
- play_sounds,
};
res.load_sound_effects(&[]);
Ok(res)
@@ -278,7 +190,7 @@ impl Audio {
.until_exhausted()
// if the source audio is stereo and is being played as mono, discard the right audio
.flat_map(
- |e| if self.output_config.channels == 1 {
+ |e| if self.device.num_channels() == 1 {
vec![e[0]]
} else {
e.to_vec()
@@ -291,64 +203,14 @@ impl Audio {
}
pub fn decode_packet_payload(&self, stream_type: VoiceStreamType, session_id: u32, payload: VoicePacketPayload) {
- match self.client_streams.lock().unwrap().entry((stream_type, session_id)) {
- Entry::Occupied(mut entry) => {
- entry
- .get_mut()
- .decode_packet(payload, self.output_config.channels as usize);
- }
- Entry::Vacant(_) => {
- warn!("Can't find session id {}", session_id);
- }
- }
- }
-
- pub fn add_client(&self, session_id: u32) {
- for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() {
- match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) {
- Entry::Occupied(_) => {
- warn!("Session id {} already exists", session_id);
- }
- Entry::Vacant(entry) => {
- entry.insert(output::ClientStream::new(
- self.output_config.sample_rate.0,
- self.output_config.channels,
- ));
- }
- }
- }
- }
-
- pub fn remove_client(&self, session_id: u32) {
- for stream_type in [VoiceStreamType::TCP, VoiceStreamType::UDP].iter() {
- match self.client_streams.lock().unwrap().entry((*stream_type, session_id)) {
- Entry::Occupied(entry) => {
- entry.remove();
- }
- Entry::Vacant(_) => {
- warn!(
- "Tried to remove session id {} that doesn't exist",
- session_id
- );
- }
- }
- }
- }
-
- pub fn input_receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> {
- Arc::clone(&self.input_channel_receiver)
+ self.client_streams.lock().unwrap().decode_packet(
+ (stream_type, session_id),
+ payload,
+ );
}
- pub fn clear_clients(&mut self) {
- self.client_streams.lock().unwrap().clear();
- }
-
- pub fn set_input_volume(&self, input_volume: f32) {
- self.input_volume_sender.send(input_volume).unwrap();
- }
-
- pub fn set_output_volume(&self, output_volume: f32) {
- self.output_volume_sender.send(output_volume).unwrap();
+ pub fn set_volume(&self, output_volume: f32) {
+ self.device.set_volume(output_volume);
}
pub fn set_user_volume(&self, id: u32, volume: f32) {
@@ -375,15 +237,7 @@ impl Audio {
pub fn play_effect(&self, effect: NotificationEvents) {
let samples = self.sounds.get(&effect).unwrap();
-
- let mut play_sounds = self.play_sounds.lock().unwrap();
-
- for (val, e) in play_sounds.iter_mut().zip(samples.iter()) {
- *val = val.saturating_add(*e);
- }
-
- let l = play_sounds.len();
- play_sounds.extend(samples.iter().skip(l));
+ self.client_streams.lock().unwrap().extend(None, samples);
}
}
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
index 176747d..4a1ed3d 100644
--- a/mumd/src/audio/input.rs
+++ b/mumd/src/audio/input.rs
@@ -1,8 +1,11 @@
-use cpal::{InputCallbackInfo, Sample};
+use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
+use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use tokio::sync::watch;
use log::*;
use crate::state::StatePhase;
+use crate::audio::SAMPLE_RATE;
+use crate::error::{AudioError, AudioStream};
pub fn callback<T: Sample>(
mut input_sender: futures_channel::mpsc::Sender<f32>,
@@ -24,3 +27,108 @@ pub fn callback<T: Sample>(
}
}
}
+
+pub trait AudioInputDevice {
+ fn play(&self) -> Result<(), AudioError>;
+ fn pause(&self) -> Result<(), AudioError>;
+ fn set_volume(&self, volume: f32);
+ fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver<f32>;
+ fn num_channels(&self) -> usize;
+}
+
+pub struct DefaultAudioInputDevice {
+ stream: cpal::Stream,
+ sample_receiver: Option<futures_channel::mpsc::Receiver<f32>>,
+ volume_sender: watch::Sender<f32>,
+ channels: u16,
+}
+
+impl DefaultAudioInputDevice {
+ pub fn new(input_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> {
+ let sample_rate = SampleRate(SAMPLE_RATE);
+
+ let host = cpal::default_host();
+
+ let input_device = host
+ .default_input_device()
+ .ok_or(AudioError::NoDevice(AudioStream::Input))?;
+ let input_supported_config = input_device
+ .supported_input_configs()
+ .map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))?
+ .find_map(|c| {
+ if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
+ Some(c)
+ } else {
+ None
+ }
+ })
+ .ok_or(AudioError::NoSupportedConfig(AudioStream::Input))?
+ .with_sample_rate(sample_rate);
+ let input_supported_sample_format = input_supported_config.sample_format();
+ let input_config: StreamConfig = input_supported_config.into();
+
+ let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
+
+ let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000);
+
+ let (volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
+
+ let input_stream = match input_supported_sample_format {
+ SampleFormat::F32 => input_device.build_input_stream(
+ &input_config,
+ callback::<f32>(
+ sample_sender,
+ input_volume_receiver,
+ phase_watcher,
+ ),
+ err_fn,
+ ),
+ SampleFormat::I16 => input_device.build_input_stream(
+ &input_config,
+ callback::<i16>(
+ sample_sender,
+ input_volume_receiver,
+ phase_watcher,
+ ),
+ err_fn,
+ ),
+ SampleFormat::U16 => input_device.build_input_stream(
+ &input_config,
+ callback::<u16>(
+ sample_sender,
+ input_volume_receiver,
+ phase_watcher,
+ ),
+ err_fn,
+ ),
+ }
+ .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?;
+
+ let res = Self {
+ stream: input_stream,
+ sample_receiver: Some(sample_receiver),
+ volume_sender,
+ channels: input_config.channels,
+ };
+ Ok(res)
+ }
+}
+
+impl AudioInputDevice for DefaultAudioInputDevice {
+ fn play(&self) -> Result<(), AudioError> {
+ self.stream.play().map_err(|e| AudioError::InputPlayError(e))
+ }
+ fn pause(&self) -> Result<(), AudioError> {
+ self.stream.pause().map_err(|e| AudioError::InputPauseError(e))
+ }
+ fn set_volume(&self, volume: f32) {
+ self.volume_sender.send(volume).unwrap();
+ }
+ fn sample_receiver(&mut self) -> futures_channel::mpsc::Receiver<f32> {
+ let ret = self.sample_receiver.take();
+ ret.unwrap()
+ }
+ fn num_channels(&self) -> usize {
+ self.channels as usize
+ }
+}
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs
index 421d395..658c1c8 100644
--- a/mumd/src/audio/output.rs
+++ b/mumd/src/audio/output.rs
@@ -1,50 +1,76 @@
use crate::network::VoiceStreamType;
+use crate::audio::SAMPLE_RATE;
+use crate::error::{AudioError, AudioStream};
-use cpal::{OutputCallbackInfo, Sample};
+use log::*;
+use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
+use cpal::{SampleFormat, SampleRate, StreamConfig, OutputCallbackInfo, Sample};
use mumble_protocol::voice::VoicePacketPayload;
-use opus::Channels;
use std::collections::{HashMap, VecDeque};
use std::ops::AddAssign;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
+type ClientStreamKey = (VoiceStreamType, u32);
+
pub struct ClientStream {
- buffer: VecDeque<f32>, //TODO ring buffer?
- opus_decoder: opus::Decoder,
+ buffer_clients: HashMap<ClientStreamKey, (VecDeque<f32>, opus::Decoder)>, //TODO ring buffer?
+ buffer_effects: VecDeque<f32>,
+ sample_rate: u32,
+ channels: opus::Channels,
}
impl ClientStream {
pub fn new(sample_rate: u32, channels: u16) -> Self {
+ let channels = match channels {
+ 1 => opus::Channels::Mono,
+ 2 => opus::Channels::Stereo,
+ _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels),
+ };
Self {
- buffer: VecDeque::new(),
- opus_decoder: opus::Decoder::new(
- sample_rate,
- match channels {
- 1 => Channels::Mono,
- 2 => Channels::Stereo,
- _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels),
- },
- )
- .unwrap(),
+ buffer_clients: HashMap::new(),
+ buffer_effects: VecDeque::new(),
+ sample_rate,
+ channels,
}
}
- pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) {
+ fn get_client(&mut self, client: ClientStreamKey) -> &mut (VecDeque<f32>, opus::Decoder) {
+ let sample_rate = self.sample_rate;
+ let channels = self.channels;
+ self.buffer_clients.entry(client).or_insert_with(|| {
+ let opus_decoder = opus::Decoder::new(
+ sample_rate,
+ channels
+ ).unwrap();
+ (VecDeque::new(), opus_decoder)
+ })
+ }
+
+ pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) {
match payload {
VoicePacketPayload::Opus(bytes, _eot) => {
- let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode
- let parsed = self
- .opus_decoder
+ let mut out: Vec<f32> = vec![0.0; 720 * (self.channels as usize) * 4]; //720 is because that is the max size of packet we can get that we want to decode
+ let (buffer, decoder) = self.get_client(client);
+ let parsed = decoder
.decode_float(&bytes, &mut out, false)
.expect("Error decoding");
out.truncate(parsed);
- self.buffer.extend(out);
+ buffer.extend(&out);
}
_ => {
unimplemented!("Payload type not supported");
}
}
}
+
+ pub fn extend(&mut self, client: Option<ClientStreamKey>, values: &[f32]) {
+ let buffer = match client {
+ Some(x) => &mut self.get_client(x).0,
+ None => &mut self.buffer_effects,
+ };
+ buffer.extend(values.iter().copied());
+ }
}
pub trait SaturatingAdd {
@@ -73,9 +99,116 @@ impl SaturatingAdd for u16 {
}
}
+pub trait AudioOutputDevice {
+ fn play(&self) -> Result<(), AudioError>;
+ fn pause(&self) -> Result<(), AudioError>;
+ fn set_volume(&self, volume: f32);
+ fn num_channels(&self) -> usize;
+ fn client_streams(&self) -> Arc<Mutex<ClientStream>>;
+}
+
+pub struct DefaultAudioOutputDevice {
+ config: StreamConfig,
+ stream: cpal::Stream,
+ client_streams: Arc<Mutex<ClientStream>>,
+ volume_sender: watch::Sender<f32>,
+}
+
+impl DefaultAudioOutputDevice {
+ pub fn new(
+ output_volume: f32,
+ user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
+ ) -> Result<Self, AudioError> {
+ let sample_rate = SampleRate(SAMPLE_RATE);
+
+ let host = cpal::default_host();
+ let output_device = host
+ .default_output_device()
+ .ok_or(AudioError::NoDevice(AudioStream::Output))?;
+ let output_supported_config = output_device
+ .supported_output_configs()
+ .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))?
+ .find_map(|c| {
+ if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
+ Some(c)
+ } else {
+ None
+ }
+ })
+ .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))?
+ .with_sample_rate(sample_rate);
+ let output_supported_sample_format = output_supported_config.sample_format();
+ let output_config: StreamConfig = output_supported_config.into();
+ let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(sample_rate.0, output_config.channels)));
+
+ let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
+
+ let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume);
+
+ let output_stream = match output_supported_sample_format {
+ SampleFormat::F32 => output_device.build_output_stream(
+ &output_config,
+ curry_callback::<f32>(
+ Arc::clone(&client_streams),
+ output_volume_receiver,
+ user_volumes,
+ ),
+ err_fn,
+ ),
+ SampleFormat::I16 => output_device.build_output_stream(
+ &output_config,
+ curry_callback::<i16>(
+ Arc::clone(&client_streams),
+ output_volume_receiver,
+ user_volumes,
+ ),
+ err_fn,
+ ),
+ SampleFormat::U16 => output_device.build_output_stream(
+ &output_config,
+ curry_callback::<u16>(
+ Arc::clone(&client_streams),
+ output_volume_receiver,
+ user_volumes,
+ ),
+ err_fn,
+ ),
+ }
+ .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?;
+
+ Ok(Self {
+ config: output_config,
+ stream: output_stream,
+ volume_sender: output_volume_sender,
+ client_streams,
+ })
+ }
+}
+
+impl AudioOutputDevice for DefaultAudioOutputDevice {
+ fn play(&self) -> Result<(), AudioError> {
+ self.stream.play().map_err(|e| AudioError::OutputPlayError(e))
+ }
+
+ fn pause(&self) -> Result<(), AudioError> {
+ self.stream.pause().map_err(|e| AudioError::OutputPauseError(e))
+ }
+
+ fn set_volume(&self, volume: f32) {
+ self.volume_sender.send(volume).unwrap();
+ }
+
+ fn num_channels(&self) -> usize {
+ self.config.channels as usize
+ }
+
+ fn client_streams(&self) -> Arc<Mutex<ClientStream>> {
+ Arc::clone(&self.client_streams)
+ }
+}
+
pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>(
- effect_sound: Arc<Mutex<VecDeque<f32>>>,
- user_bufs: Arc<Mutex<HashMap<(VoiceStreamType, u32), ClientStream>>>,
+ user_bufs: Arc<Mutex<ClientStream>>,
output_volume_receiver: watch::Receiver<f32>,
user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
@@ -86,26 +219,21 @@ pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>
let volume = *output_volume_receiver.borrow();
- let mut effects_sound = effect_sound.lock().unwrap();
let mut user_bufs = user_bufs.lock().unwrap();
- for ((_, id), client_stream) in &mut *user_bufs {
- let (user_volume, muted) = user_volumes
- .lock()
- .unwrap()
- .get(id)
- .cloned()
- .unwrap_or((1.0, false));
+ let user_volumes = user_volumes.lock().unwrap();
+ for (k, v) in user_bufs.buffer_clients.iter_mut() {
+ let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false));
for sample in data.iter_mut() {
- let s = client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume;
if !muted {
- *sample = sample.saturating_add(Sample::from(&s));
+ *sample = sample.saturating_add(Sample::from(
+ &(v.0.pop_front().unwrap_or(0.0) * volume * user_volume),
+ ));
}
}
}
-
for sample in data.iter_mut() {
*sample = sample.saturating_add(Sample::from(
- &(effects_sound.pop_front().unwrap_or(0.0) * volume),
+ &(user_bufs.buffer_effects.pop_front().unwrap_or(0.0) * volume),
));
}
}
diff --git a/mumd/src/error.rs b/mumd/src/error.rs
index f7818a1..eb63df8 100644
--- a/mumd/src/error.rs
+++ b/mumd/src/error.rs
@@ -86,6 +86,9 @@ pub enum AudioError {
NoSupportedConfig(AudioStream),
InvalidStream(AudioStream, cpal::BuildStreamError),
OutputPlayError(cpal::PlayStreamError),
+ OutputPauseError(cpal::PauseStreamError),
+ InputPlayError(cpal::PlayStreamError),
+ InputPauseError(cpal::PauseStreamError),
}
impl fmt::Display for AudioError {
@@ -96,6 +99,9 @@ impl fmt::Display for AudioError {
AudioError::NoSupportedConfig(s) => write!(f, "No supported {} config found", s),
AudioError::InvalidStream(s, e) => write!(f, "Invalid {} stream: {}", s, e),
AudioError::OutputPlayError(e) => write!(f, "Playback error: {}", e),
+ AudioError::OutputPauseError(e) => write!(f, "Playback error: {}", e),
+ AudioError::InputPlayError(e) => write!(f, "Recording error: {}", e),
+ AudioError::InputPauseError(e) => write!(f, "Recording error: {}", e),
}
}
}
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 02477dc..7606987 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -112,7 +112,7 @@ pub async fn handle(
let (phase_watcher, input_receiver) = {
let state_lock = state.read().unwrap();
(state_lock.phase_receiver(),
- state_lock.audio().input_receiver())
+ state_lock.audio_input().receiver())
};
let event_queue = TcpEventQueue::new();
@@ -362,7 +362,7 @@ async fn listen(
state
.read()
.unwrap()
- .audio()
+ .audio_output()
.decode_packet_payload(
VoiceStreamType::TCP,
session_id,
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index cc085b5..3ca77af 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -33,7 +33,7 @@ pub async fn handle(
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>,
) -> Result<(), UdpError> {
- let receiver = state.read().unwrap().audio().input_receiver();
+ let receiver = state.read().unwrap().audio_input().receiver();
loop {
let connection_info = 'data: loop {
@@ -151,7 +151,7 @@ async fn listen(
state
.read()
.unwrap()
- .audio()
+ .audio_output()
.decode_packet_payload(VoiceStreamType::UDP, session_id, payload);
}
}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index b52b330..132da74 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -2,7 +2,7 @@ pub mod channel;
pub mod server;
pub mod user;
-use crate::audio::{Audio, NotificationEvents};
+use crate::audio::{AudioInput, AudioOutput, NotificationEvents};
use crate::error::StateError;
use crate::network::{ConnectionInfo, VoiceStreamType};
use crate::network::tcp::{TcpEvent, TcpEventData};
@@ -57,7 +57,8 @@ pub enum StatePhase {
pub struct State {
config: Config,
server: Option<Server>,
- audio: Audio,
+ audio_input: AudioInput,
+ audio_output: AudioOutput,
phase_watcher: (watch::Sender<StatePhase>, watch::Receiver<StatePhase>),
}
@@ -66,15 +67,18 @@ impl State {
pub fn new() -> Result<Self, StateError> {
let config = mumlib::config::read_default_cfg()?;
let phase_watcher = watch::channel(StatePhase::Disconnected);
- let audio = Audio::new(
+ let audio_input = AudioInput::new(
config.audio.input_volume.unwrap_or(1.0),
- config.audio.output_volume.unwrap_or(1.0),
phase_watcher.1.clone(),
).map_err(|e| StateError::AudioError(e))?;
+ let audio_output = AudioOutput::new(
+ config.audio.output_volume.unwrap_or(1.0),
+ ).map_err(|e| StateError::AudioError(e))?;
let mut state = Self {
config,
server: None,
- audio,
+ audio_input,
+ audio_output,
phase_watcher,
};
state.reload_config();
@@ -176,13 +180,13 @@ impl State {
let mut new_deaf = None;
if let Some((mute, deafen)) = action {
if server.deafened() != deafen {
- self.audio.play_effect(if deafen {
+ self.audio_output.play_effect(if deafen {
NotificationEvents::Deafen
} else {
NotificationEvents::Undeafen
});
} else if server.muted() != mute {
- self.audio.play_effect(if mute {
+ self.audio_output.play_effect(if mute {
NotificationEvents::Mute
} else {
NotificationEvents::Unmute
@@ -207,7 +211,7 @@ impl State {
now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b })))
}
Command::InputVolumeSet(volume) => {
- self.audio.set_input_volume(volume);
+ self.audio_input.set_volume(volume);
now!(Ok(None))
}
Command::MuteOther(string, toggle) => {
@@ -240,7 +244,7 @@ impl State {
if let Some(action) = action {
user.set_suppressed(action);
- self.audio.set_mute(id, action);
+ self.audio_output.set_mute(id, action);
}
return now!(Ok(None));
@@ -269,13 +273,13 @@ impl State {
let mut new_mute = None;
if let Some((mute, deafen)) = action {
if server.deafened() != deafen {
- self.audio.play_effect(if deafen {
+ self.audio_output.play_effect(if deafen {
NotificationEvents::Deafen
} else {
NotificationEvents::Undeafen
});
} else if server.muted() != mute {
- self.audio.play_effect(if mute {
+ self.audio_output.play_effect(if mute {
NotificationEvents::Mute
} else {
NotificationEvents::Unmute
@@ -301,7 +305,7 @@ impl State {
now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b })))
}
Command::OutputVolumeSet(volume) => {
- self.audio.set_output_volume(volume);
+ self.audio_output.set_volume(volume);
now!(Ok(None))
}
Command::Ping => {
@@ -367,13 +371,12 @@ impl State {
}
self.server = None;
- self.audio.clear_clients();
self.phase_watcher
.0
.send(StatePhase::Disconnected)
.unwrap();
- self.audio.play_effect(NotificationEvents::ServerDisconnect);
+ self.audio_output.play_effect(NotificationEvents::ServerDisconnect);
now!(Ok(None))
}
Command::ServerStatus { host, port } => ExecutionContext::Ping(
@@ -420,7 +423,7 @@ impl State {
Some(v) => v,
};
- self.audio.set_user_volume(user_id, volume);
+ self.audio_output.set_user_volume(user_id, volume);
now!(Ok(None))
}
}
@@ -453,8 +456,6 @@ impl State {
*self.server_mut().unwrap().session_id_mut() = Some(session);
} else {
// this is someone else
- self.audio_mut().add_client(session);
-
// send notification only if we've passed the connecting phase
if matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) {
let channel_id = msg.get_channel_id();
@@ -470,7 +471,7 @@ impl State {
));
}
- self.audio.play_effect(NotificationEvents::UserConnected);
+ self.audio_output.play_effect(NotificationEvents::UserConnected);
}
}
}
@@ -524,7 +525,7 @@ impl State {
} else {
warn!("{} moved to invalid channel {}", user.name(), to_channel);
}
- self.audio.play_effect(if from_channel == this_channel {
+ self.audio_output.play_effect(if from_channel == this_channel {
NotificationEvents::UserJoinedChannel
} else {
NotificationEvents::UserLeftChannel
@@ -559,13 +560,12 @@ impl State {
let this_channel = self.get_users_channel(self.server().unwrap().session_id().unwrap());
let other_channel = self.get_users_channel(msg.get_session());
if this_channel == other_channel {
- self.audio.play_effect(NotificationEvents::UserDisconnected);
+ self.audio_output.play_effect(NotificationEvents::UserDisconnected);
if let Some(user) = self.server().unwrap().users().get(&msg.get_session()) {
notifications::send(format!("{} disconnected", &user.name()));
}
}
- self.audio().remove_client(msg.get_session());
self.server_mut()
.unwrap()
.users_mut()
@@ -581,13 +581,13 @@ impl State {
Err(e) => error!("Couldn't read config: {}", e),
}
if let Some(input_volume) = self.config.audio.input_volume {
- self.audio.set_input_volume(input_volume);
+ self.audio_input.set_volume(input_volume);
}
if let Some(output_volume) = self.config.audio.output_volume {
- self.audio.set_output_volume(output_volume);
+ self.audio_output.set_volume(output_volume);
}
if let Some(sound_effects) = &self.config.audio.sound_effects {
- self.audio.load_sound_effects(sound_effects);
+ self.audio_output.load_sound_effects(sound_effects);
}
}
@@ -600,14 +600,20 @@ impl State {
pub fn initialized(&self) {
self.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP));
- self.audio.play_effect(NotificationEvents::ServerConnect);
+ self.audio_output.play_effect(NotificationEvents::ServerConnect);
}
- pub fn audio(&self) -> &Audio {
- &self.audio
+ pub fn audio_input(&self) -> &AudioInput {
+ &self.audio_input
+ }
+ pub fn audio_output(&self) -> &AudioOutput {
+ &self.audio_output
+ }
+ pub fn audio_input_mut(&mut self) -> &mut AudioInput {
+ &mut self.audio_input
}
- pub fn audio_mut(&mut self) -> &mut Audio {
- &mut self.audio
+ pub fn audio_output_mut(&mut self) -> &mut AudioOutput {
+ &mut self.audio_output
}
pub fn phase_receiver(&self) -> watch::Receiver<StatePhase> {
self.phase_watcher.1.clone()