aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRubens Brandao <git@rubens.io>2021-04-05 18:39:55 +0200
committerRubens Brandao <git@rubens.io>2021-04-05 18:39:55 +0200
commite431ecb6c5c8406bde6a54f40ee2f648cc0cec05 (patch)
tree93a227ab5f3ceac27286a1c2c36b79c12c72f95f
parent1734a72d3caff4f8831f4b366fdb818fddecf32f (diff)
downloadmum-e431ecb6c5c8406bde6a54f40ee2f648cc0cec05.tar.gz
Separate the input and output audio
-rw-r--r--mumd/src/audio.rs181
-rw-r--r--mumd/src/network/tcp.rs4
-rw-r--r--mumd/src/network/udp.rs4
-rw-r--r--mumd/src/state.rs66
4 files changed, 141 insertions, 114 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index fdbeaee..0aac68c 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -67,46 +67,18 @@ impl TryFrom<&str> for NotificationEvents {
}
}
-pub struct Audio {
- output_config: StreamConfig,
- _output_stream: cpal::Stream,
+pub struct AudioInput {
_input_stream: cpal::Stream,
input_channel_receiver: Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>,
input_volume_sender: watch::Sender<f32>,
-
- output_volume_sender: watch::Sender<f32>,
-
- user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
-
- client_streams: Arc<Mutex<HashMap<(VoiceStreamType, u32), output::ClientStream>>>,
-
- sounds: HashMap<NotificationEvents, Vec<f32>>,
- play_sounds: Arc<Mutex<VecDeque<f32>>>,
}
-impl Audio {
- pub fn new(input_volume: f32, output_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> {
+impl AudioInput {
+ pub fn new(input_volume: f32, phase_watcher: watch::Receiver<StatePhase>) -> Result<Self, AudioError> {
let sample_rate = SampleRate(SAMPLE_RATE);
let host = cpal::default_host();
- let output_device = host
- .default_output_device()
- .ok_or(AudioError::NoDevice(AudioStream::Output))?;
- let output_supported_config = output_device
- .supported_output_configs()
- .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))?
- .find_map(|c| {
- if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
- Some(c)
- } else {
- None
- }
- })
- .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))?
- .with_sample_rate(sample_rate);
- let output_supported_sample_format = output_supported_config.sample_format();
- let output_config: StreamConfig = output_supported_config.into();
let input_device = host
.default_input_device()
@@ -128,45 +100,6 @@ impl Audio {
let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
- let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new()));
- let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume);
- let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new()));
-
- let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new()));
- let output_stream = match output_supported_sample_format {
- SampleFormat::F32 => output_device.build_output_stream(
- &output_config,
- output::curry_callback::<f32>(
- Arc::clone(&play_sounds),
- Arc::clone(&client_streams),
- output_volume_receiver,
- Arc::clone(&user_volumes),
- ),
- err_fn,
- ),
- SampleFormat::I16 => output_device.build_output_stream(
- &output_config,
- output::curry_callback::<i16>(
- Arc::clone(&play_sounds),
- Arc::clone(&client_streams),
- output_volume_receiver,
- Arc::clone(&user_volumes),
- ),
- err_fn,
- ),
- SampleFormat::U16 => output_device.build_output_stream(
- &output_config,
- output::curry_callback::<u16>(
- Arc::clone(&play_sounds),
- Arc::clone(&client_streams),
- output_volume_receiver,
- Arc::clone(&user_volumes),
- ),
- err_fn,
- ),
- }
- .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?;
-
let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000);
let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
@@ -218,14 +151,106 @@ impl Audio {
position_info: None,
});
- output_stream.play().map_err(|e| AudioError::OutputPlayError(e))?;
+ input_stream.play().map_err(|e| AudioError::OutputPlayError(e))?;
- let mut res = Self {
- output_config,
- _output_stream: output_stream,
+ let res = Self {
_input_stream: input_stream,
input_volume_sender,
input_channel_receiver: Arc::new(tokio::sync::Mutex::new(Box::new(opus_stream))),
+ };
+ Ok(res)
+ }
+
+ pub fn input_receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> {
+ Arc::clone(&self.input_channel_receiver)
+ }
+
+ pub fn set_input_volume(&self, input_volume: f32) {
+ self.input_volume_sender.send(input_volume).unwrap();
+ }
+}
+
+pub struct AudioOutput {
+ output_config: StreamConfig,
+ _output_stream: cpal::Stream,
+
+ output_volume_sender: watch::Sender<f32>,
+
+ user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
+
+ client_streams: Arc<Mutex<HashMap<(VoiceStreamType, u32), output::ClientStream>>>,
+
+ sounds: HashMap<NotificationEvents, Vec<f32>>,
+ play_sounds: Arc<Mutex<VecDeque<f32>>>,
+}
+
+impl AudioOutput {
+ pub fn new(output_volume: f32) -> Result<Self, AudioError> {
+ let sample_rate = SampleRate(SAMPLE_RATE);
+
+ let host = cpal::default_host();
+ let output_device = host
+ .default_output_device()
+ .ok_or(AudioError::NoDevice(AudioStream::Output))?;
+ let output_supported_config = output_device
+ .supported_output_configs()
+ .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))?
+ .find_map(|c| {
+ if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
+ Some(c)
+ } else {
+ None
+ }
+ })
+ .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))?
+ .with_sample_rate(sample_rate);
+ let output_supported_sample_format = output_supported_config.sample_format();
+ let output_config: StreamConfig = output_supported_config.into();
+
+ let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
+
+ let user_volumes = Arc::new(std::sync::Mutex::new(HashMap::new()));
+ let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume);
+ let play_sounds = Arc::new(std::sync::Mutex::new(VecDeque::new()));
+
+ let client_streams = Arc::new(std::sync::Mutex::new(HashMap::new()));
+ let output_stream = match output_supported_sample_format {
+ SampleFormat::F32 => output_device.build_output_stream(
+ &output_config,
+ output::curry_callback::<f32>(
+ Arc::clone(&play_sounds),
+ Arc::clone(&client_streams),
+ output_volume_receiver,
+ Arc::clone(&user_volumes),
+ ),
+ err_fn,
+ ),
+ SampleFormat::I16 => output_device.build_output_stream(
+ &output_config,
+ output::curry_callback::<i16>(
+ Arc::clone(&play_sounds),
+ Arc::clone(&client_streams),
+ output_volume_receiver,
+ Arc::clone(&user_volumes),
+ ),
+ err_fn,
+ ),
+ SampleFormat::U16 => output_device.build_output_stream(
+ &output_config,
+ output::curry_callback::<u16>(
+ Arc::clone(&play_sounds),
+ Arc::clone(&client_streams),
+ output_volume_receiver,
+ Arc::clone(&user_volumes),
+ ),
+ err_fn,
+ ),
+ }
+ .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?;
+
+ let mut res = Self {
+ output_config,
+ _output_stream: output_stream,
client_streams,
sounds: HashMap::new(),
output_volume_sender,
@@ -335,18 +360,10 @@ impl Audio {
}
}
- pub fn input_receiver(&self) -> Arc<tokio::sync::Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> {
- Arc::clone(&self.input_channel_receiver)
- }
-
pub fn clear_clients(&mut self) {
self.client_streams.lock().unwrap().clear();
}
- pub fn set_input_volume(&self, input_volume: f32) {
- self.input_volume_sender.send(input_volume).unwrap();
- }
-
pub fn set_output_volume(&self, output_volume: f32) {
self.output_volume_sender.send(output_volume).unwrap();
}
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 6402a89..d2f0b41 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -108,7 +108,7 @@ pub async fn handle(
let password = state_lock.password().map(|x| x.to_string());
authenticate(&mut sink, username, password).await?;
let phase_watcher = state_lock.phase_receiver();
- let input_receiver = state_lock.audio().input_receiver();
+ let input_receiver = state_lock.audio_input().input_receiver();
drop(state_lock);
let event_queue = TcpEventQueue::new();
@@ -358,7 +358,7 @@ async fn listen(
state
.lock()
.await
- .audio()
+ .audio_output()
.decode_packet_payload(
VoiceStreamType::TCP,
session_id,
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 5996e43..a8e190d 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -33,7 +33,7 @@ pub async fn handle(
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>,
) -> Result<(), UdpError> {
- let receiver = state.lock().await.audio().input_receiver();
+ let receiver = state.lock().await.audio_input().input_receiver();
loop {
let connection_info = 'data: loop {
@@ -151,7 +151,7 @@ async fn listen(
state
.lock() //TODO change so that we only have to lock audio and not the whole state
.await
- .audio()
+ .audio_output()
.decode_packet_payload(VoiceStreamType::UDP, session_id, payload);
}
}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index b52b330..84bb372 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -2,7 +2,7 @@ pub mod channel;
pub mod server;
pub mod user;
-use crate::audio::{Audio, NotificationEvents};
+use crate::audio::{AudioInput, AudioOutput, NotificationEvents};
use crate::error::StateError;
use crate::network::{ConnectionInfo, VoiceStreamType};
use crate::network::tcp::{TcpEvent, TcpEventData};
@@ -57,7 +57,8 @@ pub enum StatePhase {
pub struct State {
config: Config,
server: Option<Server>,
- audio: Audio,
+ audio_input: AudioInput,
+ audio_output: AudioOutput,
phase_watcher: (watch::Sender<StatePhase>, watch::Receiver<StatePhase>),
}
@@ -66,15 +67,18 @@ impl State {
pub fn new() -> Result<Self, StateError> {
let config = mumlib::config::read_default_cfg()?;
let phase_watcher = watch::channel(StatePhase::Disconnected);
- let audio = Audio::new(
+ let audio_input = AudioInput::new(
config.audio.input_volume.unwrap_or(1.0),
- config.audio.output_volume.unwrap_or(1.0),
phase_watcher.1.clone(),
).map_err(|e| StateError::AudioError(e))?;
+ let audio_output = AudioOutput::new(
+ config.audio.output_volume.unwrap_or(1.0),
+ ).map_err(|e| StateError::AudioError(e))?;
let mut state = Self {
config,
server: None,
- audio,
+ audio_input,
+ audio_output,
phase_watcher,
};
state.reload_config();
@@ -176,13 +180,13 @@ impl State {
let mut new_deaf = None;
if let Some((mute, deafen)) = action {
if server.deafened() != deafen {
- self.audio.play_effect(if deafen {
+ self.audio_output.play_effect(if deafen {
NotificationEvents::Deafen
} else {
NotificationEvents::Undeafen
});
} else if server.muted() != mute {
- self.audio.play_effect(if mute {
+ self.audio_output.play_effect(if mute {
NotificationEvents::Mute
} else {
NotificationEvents::Unmute
@@ -207,7 +211,7 @@ impl State {
now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b })))
}
Command::InputVolumeSet(volume) => {
- self.audio.set_input_volume(volume);
+ self.audio_input.set_input_volume(volume);
now!(Ok(None))
}
Command::MuteOther(string, toggle) => {
@@ -240,7 +244,7 @@ impl State {
if let Some(action) = action {
user.set_suppressed(action);
- self.audio.set_mute(id, action);
+ self.audio_output.set_mute(id, action);
}
return now!(Ok(None));
@@ -269,13 +273,13 @@ impl State {
let mut new_mute = None;
if let Some((mute, deafen)) = action {
if server.deafened() != deafen {
- self.audio.play_effect(if deafen {
+ self.audio_output.play_effect(if deafen {
NotificationEvents::Deafen
} else {
NotificationEvents::Undeafen
});
} else if server.muted() != mute {
- self.audio.play_effect(if mute {
+ self.audio_output.play_effect(if mute {
NotificationEvents::Mute
} else {
NotificationEvents::Unmute
@@ -301,7 +305,7 @@ impl State {
now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b })))
}
Command::OutputVolumeSet(volume) => {
- self.audio.set_output_volume(volume);
+ self.audio_output.set_output_volume(volume);
now!(Ok(None))
}
Command::Ping => {
@@ -367,13 +371,13 @@ impl State {
}
self.server = None;
- self.audio.clear_clients();
+ self.audio_output.clear_clients();
self.phase_watcher
.0
.send(StatePhase::Disconnected)
.unwrap();
- self.audio.play_effect(NotificationEvents::ServerDisconnect);
+ self.audio_output.play_effect(NotificationEvents::ServerDisconnect);
now!(Ok(None))
}
Command::ServerStatus { host, port } => ExecutionContext::Ping(
@@ -420,7 +424,7 @@ impl State {
Some(v) => v,
};
- self.audio.set_user_volume(user_id, volume);
+ self.audio_output.set_user_volume(user_id, volume);
now!(Ok(None))
}
}
@@ -453,7 +457,7 @@ impl State {
*self.server_mut().unwrap().session_id_mut() = Some(session);
} else {
// this is someone else
- self.audio_mut().add_client(session);
+ self.audio_output_mut().add_client(session);
// send notification only if we've passed the connecting phase
if matches!(*self.phase_receiver().borrow(), StatePhase::Connected(_)) {
@@ -470,7 +474,7 @@ impl State {
));
}
- self.audio.play_effect(NotificationEvents::UserConnected);
+ self.audio_output.play_effect(NotificationEvents::UserConnected);
}
}
}
@@ -524,7 +528,7 @@ impl State {
} else {
warn!("{} moved to invalid channel {}", user.name(), to_channel);
}
- self.audio.play_effect(if from_channel == this_channel {
+ self.audio_output.play_effect(if from_channel == this_channel {
NotificationEvents::UserJoinedChannel
} else {
NotificationEvents::UserLeftChannel
@@ -559,13 +563,13 @@ impl State {
let this_channel = self.get_users_channel(self.server().unwrap().session_id().unwrap());
let other_channel = self.get_users_channel(msg.get_session());
if this_channel == other_channel {
- self.audio.play_effect(NotificationEvents::UserDisconnected);
+ self.audio_output.play_effect(NotificationEvents::UserDisconnected);
if let Some(user) = self.server().unwrap().users().get(&msg.get_session()) {
notifications::send(format!("{} disconnected", &user.name()));
}
}
- self.audio().remove_client(msg.get_session());
+ self.audio_output().remove_client(msg.get_session());
self.server_mut()
.unwrap()
.users_mut()
@@ -581,13 +585,13 @@ impl State {
Err(e) => error!("Couldn't read config: {}", e),
}
if let Some(input_volume) = self.config.audio.input_volume {
- self.audio.set_input_volume(input_volume);
+ self.audio_input.set_input_volume(input_volume);
}
if let Some(output_volume) = self.config.audio.output_volume {
- self.audio.set_output_volume(output_volume);
+ self.audio_output.set_output_volume(output_volume);
}
if let Some(sound_effects) = &self.config.audio.sound_effects {
- self.audio.load_sound_effects(sound_effects);
+ self.audio_output.load_sound_effects(sound_effects);
}
}
@@ -600,14 +604,20 @@ impl State {
pub fn initialized(&self) {
self.broadcast_phase(StatePhase::Connected(VoiceStreamType::TCP));
- self.audio.play_effect(NotificationEvents::ServerConnect);
+ self.audio_output.play_effect(NotificationEvents::ServerConnect);
}
- pub fn audio(&self) -> &Audio {
- &self.audio
+ pub fn audio_input(&self) -> &AudioInput {
+ &self.audio_input
+ }
+ pub fn audio_output(&self) -> &AudioOutput {
+ &self.audio_output
+ }
+ pub fn audio_input_mut(&mut self) -> &mut AudioInput {
+ &mut self.audio_input
}
- pub fn audio_mut(&mut self) -> &mut Audio {
- &mut self.audio
+ pub fn audio_output_mut(&mut self) -> &mut AudioOutput {
+ &mut self.audio_output
}
pub fn phase_receiver(&self) -> watch::Receiver<StatePhase> {
self.phase_watcher.1.clone()