diff options
| author | Eskil Queseth <eskilq@kth.se> | 2020-11-03 22:13:37 +0100 |
|---|---|---|
| committer | Eskil Queseth <eskilq@kth.se> | 2020-11-03 22:13:37 +0100 |
| commit | 4dd73f7b837572211b71483d62bbdfb1227d2aee (patch) | |
| tree | b0ae8e001e1ada802a95fd1a2fc2b59272f45f27 /mumd/src | |
| parent | 71941137265669013ef64473748c4fde6bc48f1c (diff) | |
| parent | d6496cb0f6abba855b04338fa8bc5aaa89487c29 (diff) | |
| download | mum-4dd73f7b837572211b71483d62bbdfb1227d2aee.tar.gz | |
Merge branch 'main' into mute
Diffstat (limited to 'mumd/src')
| -rw-r--r-- | mumd/src/audio.rs | 41 | ||||
| -rw-r--r-- | mumd/src/audio/output.rs | 10 | ||||
| -rw-r--r-- | mumd/src/command.rs | 57 | ||||
| -rw-r--r-- | mumd/src/main.rs | 11 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 6 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 49 | ||||
| -rw-r--r-- | mumd/src/state.rs | 100 |
7 files changed, 218 insertions, 56 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index ad4a762..9f837f7 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -19,6 +19,10 @@ pub struct Audio { input_channel_receiver: Option<mpsc::Receiver<VoicePacketPayload>>, input_volume_sender: watch::Sender<f32>, + output_volume_sender: watch::Sender<f32>, + + user_volumes: Arc<Mutex<HashMap<u32, f32>>>, + client_streams: Arc<Mutex<HashMap<u32, output::ClientStream>>>, } @@ -65,21 +69,36 @@ impl Audio { let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); + let user_volumes = Arc::new(Mutex::new(HashMap::new())); + let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(1.0); + let client_streams = Arc::new(Mutex::new(HashMap::new())); let output_stream = match output_supported_sample_format { SampleFormat::F32 => output_device.build_output_stream( &output_config, - output::curry_callback::<f32>(Arc::clone(&client_streams)), + output::curry_callback::<f32>( + Arc::clone(&client_streams), + output_volume_receiver, + Arc::clone(&user_volumes), + ), err_fn, ), SampleFormat::I16 => output_device.build_output_stream( &output_config, - output::curry_callback::<i16>(Arc::clone(&client_streams)), + output::curry_callback::<i16>( + Arc::clone(&client_streams), + output_volume_receiver, + Arc::clone(&user_volumes), + ), err_fn, ), SampleFormat::U16 => output_device.build_output_stream( &output_config, - output::curry_callback::<u16>(Arc::clone(&client_streams)), + output::curry_callback::<u16>( + Arc::clone(&client_streams), + output_volume_receiver, + Arc::clone(&user_volumes), + ), err_fn, ), } @@ -109,7 +128,7 @@ impl Audio { input_encoder, input_sender, input_config.sample_rate.0, - input_volume_receiver.clone(), + input_volume_receiver, 4, // 10 ms ), err_fn, @@ -120,7 +139,7 @@ impl Audio { input_encoder, input_sender, input_config.sample_rate.0, - input_volume_receiver.clone(), + input_volume_receiver, 4, // 10 ms ), err_fn, @@ -131,7 +150,7 @@ impl Audio { input_encoder, input_sender, input_config.sample_rate.0, - input_volume_receiver.clone(), + input_volume_receiver, 4, // 10 ms ), err_fn, @@ -148,6 +167,8 @@ impl Audio { input_volume_sender, input_channel_receiver: Some(input_receiver), client_streams, + output_volume_sender, + user_volumes, } } @@ -203,4 +224,12 @@ impl Audio { pub fn set_input_volume(&self, input_volume: f32) { self.input_volume_sender.broadcast(input_volume).unwrap(); } + + pub fn set_output_volume(&self, output_volume: f32) { + self.output_volume_sender.broadcast(output_volume).unwrap(); + } + + pub fn set_user_volume(&self, id: u32, volume: f32) { + self.user_volumes.lock().unwrap().insert(id, volume); + } } diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs index 94e4b21..56da596 100644 --- a/mumd/src/audio/output.rs +++ b/mumd/src/audio/output.rs @@ -4,6 +4,7 @@ use opus::Channels; use std::collections::{HashMap, VecDeque}; use std::ops::AddAssign; use std::sync::{Arc, Mutex}; +use tokio::sync::watch; pub struct ClientStream { buffer: VecDeque<f32>, //TODO ring buffer? @@ -72,17 +73,22 @@ impl SaturatingAdd for u16 { pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd>( buf: Arc<Mutex<HashMap<u32, ClientStream>>>, + output_volume_receiver: watch::Receiver<f32>, + user_volumes: Arc<Mutex<HashMap<u32, f32>>>, ) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { move |data: &mut [T], _info: &OutputCallbackInfo| { for sample in data.iter_mut() { *sample = Sample::from(&0.0); } + let volume = *output_volume_receiver.borrow(); + let mut lock = buf.lock().unwrap(); - for client_stream in lock.values_mut() { + for (id, client_stream) in &mut *lock { + let user_volume = user_volumes.lock().unwrap().get(id).cloned().unwrap_or(1.0); for sample in data.iter_mut() { *sample = sample.saturating_add(Sample::from( - &client_stream.buffer.pop_front().unwrap_or(0.0), + &(client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume), )); } } diff --git a/mumd/src/command.rs b/mumd/src/command.rs index d4b25d0..330e3fc 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,9 +1,11 @@ -use crate::state::State; +use crate::state::{ExecutionContext, State}; use crate::network::tcp::{TcpEvent, TcpEventCallback}; use ipc_channel::ipc::IpcSender; use log::*; +use mumble_protocol::ping::PongPacket; use mumlib::command::{Command, CommandResponse}; +use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, oneshot}; @@ -14,28 +16,51 @@ pub async fn handle( IpcSender<mumlib::error::Result<Option<CommandResponse>>>, )>, tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>, + ping_request_sender: mpsc::UnboundedSender<(u64, SocketAddr, Box<dyn FnOnce(PongPacket)>)>, ) { debug!("Begin listening for commands"); while let Some((command, response_sender)) = command_receiver.recv().await { debug!("Received command {:?}", command); let mut state = state.lock().unwrap(); - let (event, generator) = state.handle_command(command); + let event = state.handle_command(command); drop(state); - if let Some(event) = event { - let (tx, rx) = oneshot::channel(); - //TODO handle this error - let _ = tcp_event_register_sender.send(( - event, - Box::new(move |e| { - let response = generator(Some(e)); - response_sender.send(response).unwrap(); - tx.send(()).unwrap(); - }), - )); + match event { + ExecutionContext::TcpEvent(event, generator) => { + let (tx, rx) = oneshot::channel(); + //TODO handle this error + let _ = tcp_event_register_sender.send(( + event, + Box::new(move |e| { + let response = generator(e); + response_sender.send(response).unwrap(); + tx.send(()).unwrap(); + }), + )); - rx.await.unwrap(); - } else { - response_sender.send(generator(None)).unwrap(); + rx.await.unwrap(); + } + ExecutionContext::Now(generator) => { + response_sender.send(generator()).unwrap(); + } + ExecutionContext::Ping(generator, converter) => { + match generator() { + Ok(addr) => { + let res = ping_request_sender.send(( + 0, + addr, + Box::new(move |packet| { + response_sender.send(converter(packet)).unwrap(); + }), + )); + if res.is_err() { + panic!(); + } + } + Err(e) => { + response_sender.send(Err(e)).unwrap(); + } + }; + } } } } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 37ff0dd..b83299f 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -36,11 +36,12 @@ async fn main() { let (connection_info_sender, connection_info_receiver) = watch::channel::<Option<ConnectionInfo>>(None); let (response_sender, response_receiver) = mpsc::unbounded_channel(); + let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel(); let state = State::new(packet_sender, connection_info_sender); let state = Arc::new(Mutex::new(state)); - let (_, _, _, e) = join!( + let (_, _, _, e, _) = join!( network::tcp::handle( Arc::clone(&state), connection_info_receiver.clone(), @@ -53,11 +54,17 @@ async fn main() { connection_info_receiver.clone(), crypt_state_receiver, ), - command::handle(state, command_receiver, response_sender), + command::handle( + state, + command_receiver, + response_sender, + ping_request_sender, + ), spawn_blocking(move || { // IpcSender is blocking receive_oneshot_commands(command_sender); }), + network::udp::handle_pings(ping_request_receiver), ); e.unwrap(); } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index cd11690..131f066 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -27,7 +27,7 @@ type TcpSender = SplitSink< type TcpReceiver = SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>; -pub(crate) type TcpEventCallback = Box<dyn FnOnce(&TcpEventData)>; +pub(crate) type TcpEventCallback = Box<dyn FnOnce(TcpEventData)>; #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub enum TcpEvent { @@ -228,7 +228,7 @@ async fn listen( if let Some(vec) = event_queue.lock().unwrap().get_mut(&TcpEvent::Connected) { let old = std::mem::take(vec); for handler in old { - handler(&TcpEventData::Connected(&msg)); + handler(TcpEventData::Connected(&msg)); } } let mut state = state.lock().unwrap(); @@ -282,7 +282,7 @@ async fn listen( if let Some(vec) = event_queue.lock().unwrap().get_mut(&TcpEvent::Disconnected) { let old = std::mem::take(vec); for handler in old { - handler(&TcpEventData::Disconnected); + handler(TcpEventData::Disconnected); } } }, diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 4f96c4c..f97807d 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -6,9 +6,13 @@ use bytes::Bytes; use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::crypt::ClientCryptState; +use mumble_protocol::ping::{PingPacket, PongPacket}; use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; use mumble_protocol::Serverbound; +use std::collections::HashMap; +use std::convert::TryFrom; use std::net::{Ipv6Addr, SocketAddr}; +use std::rc::Rc; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; use tokio::sync::{mpsc, oneshot, watch}; @@ -225,3 +229,48 @@ async fn send_voice( debug!("UDP sender process killed"); } + +pub async fn handle_pings( + mut ping_request_receiver: mpsc::UnboundedReceiver<( + u64, + SocketAddr, + Box<dyn FnOnce(PongPacket)>, + )>, +) { + let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) + .await + .expect("Failed to bind UDP socket"); + + let (mut receiver, mut sender) = udp_socket.split(); + + let pending = Rc::new(Mutex::new(HashMap::new())); + + let sender_handle = async { + while let Some((id, socket_addr, handle)) = ping_request_receiver.recv().await { + let packet = PingPacket { id }; + let packet: [u8; 12] = packet.into(); + sender.send_to(&packet, &socket_addr).await.unwrap(); + pending.lock().unwrap().insert(id, handle); + } + }; + + let receiver_handle = async { + let mut buf = vec![0; 24]; + while let Ok(read) = receiver.recv(&mut buf).await { + assert_eq!(read, 24); + + let packet = match PongPacket::try_from(buf.as_slice()) { + Ok(v) => v, + Err(_) => panic!(), + }; + + if let Some(handler) = pending.lock().unwrap().remove(&packet.id) { + handler(packet); + } + } + }; + + debug!("Waiting for ping requests"); + + join!(sender_handle, receiver_handle); +} diff --git a/mumd/src/state.rs b/mumd/src/state.rs index def5c03..d1d5510 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -11,26 +11,40 @@ use crate::network::tcp::{TcpEvent, TcpEventData}; use log::*; use mumble_protocol::control::msgs; use mumble_protocol::control::ControlPacket; +use mumble_protocol::ping::PongPacket; use mumble_protocol::voice::Serverbound; use mumlib::command::{Command, CommandResponse}; use mumlib::config::Config; use mumlib::error::{ChannelIdentifierError, Error}; use mumlib::state::UserDiff; -use std::net::ToSocketAddrs; +use std::net::{SocketAddr, ToSocketAddrs}; use tokio::sync::{mpsc, watch}; macro_rules! at { ($event:expr, $generator:expr) => { - (Some($event), Box::new($generator)) + ExecutionContext::TcpEvent($event, Box::new($generator)) }; } macro_rules! now { ($data:expr) => { - (None, Box::new(move |_| $data)) + ExecutionContext::Now(Box::new(move || $data)) }; } +//TODO give me a better name +pub enum ExecutionContext { + TcpEvent( + TcpEvent, + Box<dyn FnOnce(TcpEventData) -> mumlib::error::Result<Option<CommandResponse>>>, + ), + Now(Box<dyn FnOnce() -> mumlib::error::Result<Option<CommandResponse>>>), + Ping( + Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>, + Box<dyn FnOnce(PongPacket) -> mumlib::error::Result<Option<CommandResponse>>>, + ), +} + #[derive(Clone, Debug, Eq, PartialEq)] pub enum StatePhase { Disconnected, @@ -39,7 +53,7 @@ pub enum StatePhase { } pub struct State { - config: Option<Config>, + config: Config, server: Option<Server>, audio: Audio, @@ -68,13 +82,7 @@ impl State { } //TODO? move bool inside Result - pub fn handle_command( - &mut self, - command: Command, - ) -> ( - Option<TcpEvent>, - Box<dyn FnOnce(Option<&TcpEventData>) -> mumlib::error::Result<Option<CommandResponse>>>, - ) { + pub fn handle_command(&mut self, command: Command) -> ExecutionContext { match command { Command::ChannelJoin { channel_identifier } => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) { @@ -128,7 +136,7 @@ impl State { } Command::ChannelList => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) { - return (None, Box::new(|_| Err(Error::DisconnectedError))); + return now!(Err(Error::DisconnectedError)); } let list = channel::into_channel( self.server.as_ref().unwrap().channels(), @@ -173,7 +181,7 @@ impl State { .unwrap(); at!(TcpEvent::Connected, |e| { //runs the closure when the client is connected - if let Some(TcpEventData::Connected(msg)) = e { + if let TcpEventData::Connected(msg) = e { Ok(Some(CommandResponse::ServerConnect { welcome_message: if msg.has_welcome_text() { Some(msg.get_welcome_text().to_string()) @@ -209,12 +217,16 @@ impl State { .unwrap(); now!(Ok(None)) } + Command::ConfigReload => { + self.reload_config(); + now!(Ok(None)) + } Command::InputVolumeSet(volume) => { self.audio.set_input_volume(volume); now!(Ok(None)) } - Command::ConfigReload => { - self.reload_config(); + Command::OutputVolumeSet(volume) => { + self.audio.set_output_volume(volume); now!(Ok(None)) } Command::DeafenSelf => { @@ -258,6 +270,44 @@ impl State { return now!(Ok(None)); } + Command::UserVolumeSet(string, volume) => { + if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) { + return now!(Err(Error::DisconnectedError)); + } + let user_id = match self + .server() + .unwrap() + .users() + .iter() + .find(|e| e.1.name() == &string) + .map(|e| *e.0) + { + None => return now!(Err(Error::InvalidUsernameError(string))), + Some(v) => v, + }; + + self.audio.set_user_volume(user_id, volume); + now!(Ok(None)) + } + Command::ServerStatus { host, port } => ExecutionContext::Ping( + Box::new(move || { + match (host.as_str(), port) + .to_socket_addrs() + .map(|mut e| e.next()) + { + Ok(Some(v)) => Ok(v), + _ => Err(mumlib::error::Error::InvalidServerAddrError(host, port)), + } + }), + Box::new(move |pong| { + Ok(Some(CommandResponse::ServerStatus { + version: pong.version, + users: pong.users, + max_users: pong.max_users, + bandwidth: pong.bandwidth, + })) + }), + ), } } @@ -270,9 +320,9 @@ impl State { // check if this is initial state if !self.server().unwrap().users().contains_key(&session) { self.parse_initial_user_state(session, msg); - return None; + None } else { - return Some(self.parse_updated_user_state(session, msg)); + Some(self.parse_updated_user_state(session, msg)) } } @@ -393,16 +443,12 @@ impl State { } pub fn reload_config(&mut self) { - if let Some(config) = mumlib::config::read_default_cfg() { - self.config = Some(config); - let config = &self.config.as_ref().unwrap(); - if let Some(audio_config) = &config.audio { - if let Some(input_volume) = audio_config.input_volume { - self.audio.set_input_volume(input_volume); - } - } - } else { - warn!("config file not found"); + self.config = mumlib::config::read_default_cfg(); + if let Some(input_volume) = self.config.audio.input_volume { + self.audio.set_input_volume(input_volume); + } + if let Some(output_volume) = self.config.audio.output_volume { + self.audio.set_output_volume(output_volume); } } |
