aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEskil Queseth <eskilq@kth.se>2020-11-03 22:13:37 +0100
committerEskil Queseth <eskilq@kth.se>2020-11-03 22:13:37 +0100
commit4dd73f7b837572211b71483d62bbdfb1227d2aee (patch)
treeb0ae8e001e1ada802a95fd1a2fc2b59272f45f27
parent71941137265669013ef64473748c4fde6bc48f1c (diff)
parentd6496cb0f6abba855b04338fa8bc5aaa89487c29 (diff)
downloadmum-4dd73f7b837572211b71483d62bbdfb1227d2aee.tar.gz
Merge branch 'main' into mute
-rw-r--r--mumctl/src/main.rs374
-rw-r--r--mumd/src/audio.rs41
-rw-r--r--mumd/src/audio/output.rs10
-rw-r--r--mumd/src/command.rs57
-rw-r--r--mumd/src/main.rs11
-rw-r--r--mumd/src/network/tcp.rs6
-rw-r--r--mumd/src/network/udp.rs49
-rw-r--r--mumd/src/state.rs100
-rw-r--r--mumlib/src/command.rs24
-rw-r--r--mumlib/src/config.rs61
-rw-r--r--mumlib/src/error.rs2
-rw-r--r--mumlib/src/lib.rs1
-rw-r--r--usage.org6
13 files changed, 473 insertions, 269 deletions
diff --git a/mumctl/src/main.rs b/mumctl/src/main.rs
index 9471b6a..2805545 100644
--- a/mumctl/src/main.rs
+++ b/mumctl/src/main.rs
@@ -74,7 +74,8 @@ fn main() {
)
.subcommand(
SubCommand::with_name("remove").arg(Arg::with_name("name").required(true)),
- ),
+ )
+ .subcommand(SubCommand::with_name("list")),
)
.subcommand(
SubCommand::with_name("channel")
@@ -99,6 +100,16 @@ fn main() {
.arg(Arg::with_name("zsh").long("zsh"))
.arg(Arg::with_name("bash").long("bash"))
.arg(Arg::with_name("fish").long("fish")),
+ )
+ .subcommand(
+ SubCommand::with_name("volume")
+ .subcommand(
+ SubCommand::with_name("set")
+ .arg(Arg::with_name("user").required(true))
+ .arg(Arg::with_name("volume").required(true)),
+ )
+ .arg(Arg::with_name("user").required(true))
+ .setting(AppSettings::SubcommandsNegateReqs),
);
let matches = app.clone().get_matches();
@@ -116,6 +127,31 @@ fn main() {
match_server_remove(matches, &mut config);
} else if let Some(matches) = matches.subcommand_matches("add") {
match_server_add(matches, &mut config);
+ } else if let Some(_) = matches.subcommand_matches("list") {
+ if config.servers.len() == 0 {
+ println!("{} No servers in config", "warning:".yellow());
+ }
+ for (server, response) in config.servers
+ .iter()
+ .map(|e| {
+ let response = send_command(Command::ServerStatus {
+ host: e.host.clone(),
+ port: e.port.unwrap_or(mumlib::DEFAULT_PORT),
+ });
+ (e, response)
+ })
+ .filter(|e| e.1.is_ok())
+ .map(|e| (e.0, e.1.unwrap().unwrap()))
+ {
+ if let CommandResponse::ServerStatus {
+ users, max_users, ..
+ } = response
+ {
+ println!("{} [{}/{}]", server.name, users, max_users)
+ } else {
+ unreachable!()
+ }
+ }
}
} else if let Some(matches) = matches.subcommand_matches("channel") {
if let Some(_matches) = matches.subcommand_matches("list") {
@@ -150,6 +186,13 @@ fn main() {
"audio.input_volume" => {
if let Ok(volume) = value.parse() {
send_command(Command::InputVolumeSet(volume)).unwrap();
+ config.audio.input_volume = Some(volume);
+ }
+ }
+ "audio.output_volume" => {
+ if let Ok(volume) = value.parse() {
+ send_command(Command::OutputVolumeSet(volume)).unwrap();
+ config.audio.output_volume = Some(volume);
}
}
_ => {
@@ -169,72 +212,79 @@ fn main() {
&mut io::stdout(),
);
return;
+ } else if let Some(matches) = matches.subcommand_matches("volume") {
+ if let Some(matches) = matches.subcommand_matches("set") {
+ let user = matches.value_of("user").unwrap();
+ let volume = matches.value_of("volume").unwrap();
+ if let Ok(val) = volume.parse() {
+ err_print!(send_command(Command::UserVolumeSet(user.to_string(), val)))
+ } else {
+ println!("{} Invalid volume value: {}", "error:".red(), volume);
+ }
+ } else {
+ let _user = matches.value_of("user").unwrap();
+ //TODO implement me
+ //needs work on mumd to implement
+ }
};
- if let Some(config) = config {
- if !config::cfg_exists() {
- println!(
- "Config file not found. Create one in {}? [Y/n]",
- config::get_creatable_cfg_path()
- );
- let stdin = std::io::stdin();
- let response = stdin.lock().lines().next();
- match response.map(|e| e.map(|e| &e == "Y")) {
- Some(Ok(true)) => {
- config.write_default_cfg(true).unwrap();
- }
- _ => {}
+ if !config::cfg_exists() {
+ println!(
+ "Config file not found. Create one in {}? [Y/n]",
+ config::get_creatable_cfg_path()
+ );
+ let stdin = std::io::stdin();
+ let response = stdin.lock().lines().next();
+ match response.map(|e| e.map(|e| &e == "Y")) {
+ Some(Ok(true)) => {
+ config.write_default_cfg(true).unwrap();
}
- } else {
- config.write_default_cfg(false).unwrap();
+ _ => {}
}
+ } else {
+ config.write_default_cfg(false).unwrap();
}
}
-fn match_server_connect(matches: &clap::ArgMatches<'_>, config: &Option<mumlib::config::Config>) {
+fn match_server_connect(matches: &clap::ArgMatches<'_>, config: &mumlib::config::Config) {
let host = matches.value_of("host").unwrap();
let username = matches.value_of("username");
let port = match matches.value_of("port").map(|e| e.parse()) {
- None => Some(64738),
+ None => Some(mumlib::DEFAULT_PORT),
Some(Err(_)) => None,
Some(Ok(v)) => Some(v),
};
if let Some(port) = port {
- let response = match config.as_ref().and_then(|e| {
- e.servers
- .as_ref()
- .and_then(|e| e.iter().find(|e| e.name == host))
- }) {
- Some(config) => {
- let host = config.host.as_str();
- let port = config.port.unwrap_or(port);
- let username = config.username.as_ref().map(|e| e.as_str()).or(username);
+ let (host, port, username) = match config.servers.iter().find(|e| e.name == host) {
+ Some(server_config) => {
+ let host = server_config.host.as_str();
+ let port = server_config.port.unwrap_or(port);
+ let username = server_config
+ .username
+ .as_ref()
+ .map(|e| e.as_str())
+ .or(username);
if username.is_none() {
println!("{} no username specified", "error:".red());
return;
}
- send_command(Command::ServerConnect {
- host: host.to_string(),
- port,
- username: username.unwrap().to_string(),
- accept_invalid_cert: true, //TODO
- })
- .map(|e| (e, host))
+ (host, port, username.unwrap())
}
None => {
if username.is_none() {
println!("{} no username specified", "error:".red());
return;
}
- send_command(Command::ServerConnect {
- host: host.to_string(),
- port,
- username: username.unwrap().to_string(),
- accept_invalid_cert: true, //TODO
- })
- .map(|e| (e, host))
+ (host, port, username.unwrap())
}
};
+ let response = send_command(Command::ServerConnect {
+ host: host.to_string(),
+ port,
+ username: username.to_string(),
+ accept_invalid_cert: true, //TODO
+ })
+ .map(|e| (e, host));
match response {
Ok((e, host)) => {
if let Some(CommandResponse::ServerConnect { welcome_message }) = e {
@@ -251,197 +301,143 @@ fn match_server_connect(matches: &clap::ArgMatches<'_>, config: &Option<mumlib::
}
}
-fn match_server_config(
- matches: &clap::ArgMatches<'_>,
- config: &mut Option<mumlib::config::Config>,
-) {
- if config.is_none() {
- *config = Some(mumlib::config::Config::default());
- }
-
- let config = config.as_mut().unwrap();
-
+fn match_server_config(matches: &clap::ArgMatches<'_>, config: &mut mumlib::config::Config) {
if let Some(server_name) = matches.value_of("server_name") {
- if let Some(servers) = &mut config.servers {
- let server = servers.iter_mut().find(|s| s.name == server_name);
- if let Some(server) = server {
- if let Some(var_name) = matches.value_of("var_name") {
- if let Some(var_value) = matches.value_of("var_value") {
- // save var_value in var_name (if it is valid)
+ let server = config.servers.iter_mut().find(|s| s.name == server_name);
+ if let Some(server) = server {
+ if let Some(var_name) = matches.value_of("var_name") {
+ if let Some(var_value) = matches.value_of("var_value") {
+ // save var_value in var_name (if it is valid)
+ match var_name {
+ "name" => {
+ println!("{} use mumctl server rename instead!", "error:".red());
+ }
+ "host" => {
+ server.host = var_value.to_string();
+ }
+ "port" => {
+ server.port = Some(var_value.parse().unwrap());
+ }
+ "username" => {
+ server.username = Some(var_value.to_string());
+ }
+ "password" => {
+ server.password = Some(var_value.to_string()); //TODO ask stdin if empty
+ }
+ _ => {
+ println!("{} variable {} not found", "error:".red(), var_name);
+ }
+ };
+ } else {
+ // var_value is None
+ // print value of var_name
+ println!(
+ "{}",
match var_name {
"name" => {
- println!("{} use mumctl server rename instead!", "error:".red());
+ server.name.to_string()
}
"host" => {
- server.host = var_value.to_string();
+ server.host.to_string()
}
"port" => {
- server.port = Some(var_value.parse().unwrap());
+ server
+ .port
+ .map(|s| s.to_string())
+ .unwrap_or(format!("{} not set", "error:".red()))
}
"username" => {
- server.username = Some(var_value.to_string());
+ server
+ .username
+ .as_ref()
+ .map(|s| s.to_string())
+ .unwrap_or(format!("{} not set", "error:".red()))
}
"password" => {
- server.password = Some(var_value.to_string()); //TODO ask stdin if empty
+ server
+ .password
+ .as_ref()
+ .map(|s| s.to_string())
+ .unwrap_or(format!("{} not set", "error:".red()))
}
_ => {
- println!("{} variable {} not found", "error:".red(), var_name);
- }
- };
- } else {
- // var_value is None
- // print value of var_name
- println!(
- "{}",
- match var_name {
- "name" => {
- server.name.to_string()
- }
- "host" => {
- server.host.to_string()
- }
- "port" => {
- server
- .port
- .map(|s| s.to_string())
- .unwrap_or(format!("{} not set", "error:".red()))
- }
- "username" => {
- server
- .username
- .as_ref()
- .map(|s| s.to_string())
- .unwrap_or(format!("{} not set", "error:".red()))
- }
- "password" => {
- server
- .password
- .as_ref()
- .map(|s| s.to_string())
- .unwrap_or(format!("{} not set", "error:".red()))
- }
- _ => {
- format!("{} unknown variable", "error:".red())
- }
+ format!("{} unknown variable", "error:".red())
}
- );
- }
- } else {
- // var_name is None
- // print server config
- print!(
- "{}{}{}{}",
- format!("host: {}\n", server.host.to_string()),
- server
- .port
- .map(|s| format!("port: {}\n", s))
- .unwrap_or("".to_string()),
- server
- .username
- .as_ref()
- .map(|s| format!("username: {}\n", s))
- .unwrap_or("".to_string()),
- server
- .password
- .as_ref()
- .map(|s| format!("password: {}\n", s))
- .unwrap_or("".to_string()),
- )
+ }
+ );
}
} else {
- // server is None
- println!("{} server {} not found", "error:".red(), server_name);
+ // var_name is None
+ // print server config
+ print!(
+ "{}{}{}{}",
+ format!("host: {}\n", server.host.to_string()),
+ server
+ .port
+ .map(|s| format!("port: {}\n", s))
+ .unwrap_or("".to_string()),
+ server
+ .username
+ .as_ref()
+ .map(|s| format!("username: {}\n", s))
+ .unwrap_or("".to_string()),
+ server
+ .password
+ .as_ref()
+ .map(|s| format!("password: {}\n", s))
+ .unwrap_or("".to_string()),
+ )
}
} else {
- // servers is None
- println!("{} no servers found in configuration", "error:".red());
+ // server is None
+ println!("{} server {} not found", "error:".red(), server_name);
}
} else {
- for server in config.servers.iter().flat_map(|e| e.iter()) {
+ for server in config.servers.iter() {
println!("{}", server.name);
}
}
}
-fn match_server_rename(
- matches: &clap::ArgMatches<'_>,
- config: &mut Option<mumlib::config::Config>,
-) {
- if config.is_none() {
- *config = Some(mumlib::config::Config::default());
- }
-
- let config = config.as_mut().unwrap();
-
- if let Some(servers) = &mut config.servers {
- let prev_name = matches.value_of("prev_name").unwrap();
- let next_name = matches.value_of("next_name").unwrap();
- if let Some(server) = servers.iter_mut().find(|s| s.name == prev_name) {
- server.name = next_name.to_string();
- } else {
- println!("{} server {} not found", "error:".red(), prev_name);
- }
+fn match_server_rename(matches: &clap::ArgMatches<'_>, config: &mut mumlib::config::Config) {
+ let prev_name = matches.value_of("prev_name").unwrap();
+ let next_name = matches.value_of("next_name").unwrap();
+ if let Some(server) = config.servers.iter_mut().find(|s| s.name == prev_name) {
+ server.name = next_name.to_string();
+ } else {
+ println!("{} server {} not found", "error:".red(), prev_name);
}
}
-fn match_server_remove(
- matches: &clap::ArgMatches<'_>,
- config: &mut Option<mumlib::config::Config>,
-) {
- if config.is_none() {
- *config = Some(mumlib::config::Config::default());
- }
-
- let config = config.as_mut().unwrap();
-
+fn match_server_remove(matches: &clap::ArgMatches<'_>, config: &mut mumlib::config::Config) {
let name = matches.value_of("name").unwrap();
- if let Some(servers) = &mut config.servers {
- match servers.iter().position(|server| server.name == name) {
- Some(idx) => {
- servers.remove(idx);
- }
- None => {
- println!("{} server {} not found", "error:".red(), name);
- }
- };
- } else {
- println!("{} no servers found in configuration", "error:".red());
- }
+ match config.servers.iter().position(|server| server.name == name) {
+ Some(idx) => {
+ config.servers.remove(idx);
+ }
+ None => {
+ println!("{} server {} not found", "error:".red(), name);
+ }
+ };
}
-fn match_server_add(matches: &clap::ArgMatches<'_>, config: &mut Option<mumlib::config::Config>) {
- if config.is_none() {
- *config = Some(mumlib::config::Config::default());
- }
-
- let mut config = config.as_mut().unwrap();
-
+fn match_server_add(matches: &clap::ArgMatches<'_>, config: &mut mumlib::config::Config) {
let name = matches.value_of("name").unwrap().to_string();
let host = matches.value_of("host").unwrap().to_string();
// optional arguments map None to None
let port = matches.value_of("port").map(|s| s.parse().unwrap());
let username = matches.value_of("username").map(|s| s.to_string());
let password = matches.value_of("password").map(|s| s.to_string());
- if let Some(servers) = &mut config.servers {
- if servers.iter().any(|s| s.name == name) {
- println!("{} a server named {} already exists", "error:".red(), name);
- } else {
- servers.push(ServerConfig {
- name,
- host,
- port,
- username,
- password,
- });
- }
+ if config.servers.iter().any(|s| s.name == name) {
+ println!("{} a server named {} already exists", "error:".red(), name);
} else {
- config.servers = Some(vec![ServerConfig {
+ config.servers.push(ServerConfig {
name,
host,
port,
username,
password,
- }]);
+ });
}
}
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index ad4a762..9f837f7 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -19,6 +19,10 @@ pub struct Audio {
input_channel_receiver: Option<mpsc::Receiver<VoicePacketPayload>>,
input_volume_sender: watch::Sender<f32>,
+ output_volume_sender: watch::Sender<f32>,
+
+ user_volumes: Arc<Mutex<HashMap<u32, f32>>>,
+
client_streams: Arc<Mutex<HashMap<u32, output::ClientStream>>>,
}
@@ -65,21 +69,36 @@ impl Audio {
let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
+ let user_volumes = Arc::new(Mutex::new(HashMap::new()));
+ let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(1.0);
+
let client_streams = Arc::new(Mutex::new(HashMap::new()));
let output_stream = match output_supported_sample_format {
SampleFormat::F32 => output_device.build_output_stream(
&output_config,
- output::curry_callback::<f32>(Arc::clone(&client_streams)),
+ output::curry_callback::<f32>(
+ Arc::clone(&client_streams),
+ output_volume_receiver,
+ Arc::clone(&user_volumes),
+ ),
err_fn,
),
SampleFormat::I16 => output_device.build_output_stream(
&output_config,
- output::curry_callback::<i16>(Arc::clone(&client_streams)),
+ output::curry_callback::<i16>(
+ Arc::clone(&client_streams),
+ output_volume_receiver,
+ Arc::clone(&user_volumes),
+ ),
err_fn,
),
SampleFormat::U16 => output_device.build_output_stream(
&output_config,
- output::curry_callback::<u16>(Arc::clone(&client_streams)),
+ output::curry_callback::<u16>(
+ Arc::clone(&client_streams),
+ output_volume_receiver,
+ Arc::clone(&user_volumes),
+ ),
err_fn,
),
}
@@ -109,7 +128,7 @@ impl Audio {
input_encoder,
input_sender,
input_config.sample_rate.0,
- input_volume_receiver.clone(),
+ input_volume_receiver,
4, // 10 ms
),
err_fn,
@@ -120,7 +139,7 @@ impl Audio {
input_encoder,
input_sender,
input_config.sample_rate.0,
- input_volume_receiver.clone(),
+ input_volume_receiver,
4, // 10 ms
),
err_fn,
@@ -131,7 +150,7 @@ impl Audio {
input_encoder,
input_sender,
input_config.sample_rate.0,
- input_volume_receiver.clone(),
+ input_volume_receiver,
4, // 10 ms
),
err_fn,
@@ -148,6 +167,8 @@ impl Audio {
input_volume_sender,
input_channel_receiver: Some(input_receiver),
client_streams,
+ output_volume_sender,
+ user_volumes,
}
}
@@ -203,4 +224,12 @@ impl Audio {
pub fn set_input_volume(&self, input_volume: f32) {
self.input_volume_sender.broadcast(input_volume).unwrap();
}
+
+ pub fn set_output_volume(&self, output_volume: f32) {
+ self.output_volume_sender.broadcast(output_volume).unwrap();
+ }
+
+ pub fn set_user_volume(&self, id: u32, volume: f32) {
+ self.user_volumes.lock().unwrap().insert(id, volume);
+ }
}
diff --git a/mumd/src/audio/output.rs b/mumd/src/audio/output.rs
index 94e4b21..56da596 100644
--- a/mumd/src/audio/output.rs
+++ b/mumd/src/audio/output.rs
@@ -4,6 +4,7 @@ use opus::Channels;
use std::collections::{HashMap, VecDeque};
use std::ops::AddAssign;
use std::sync::{Arc, Mutex};
+use tokio::sync::watch;
pub struct ClientStream {
buffer: VecDeque<f32>, //TODO ring buffer?
@@ -72,17 +73,22 @@ impl SaturatingAdd for u16 {
pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd>(
buf: Arc<Mutex<HashMap<u32, ClientStream>>>,
+ output_volume_receiver: watch::Receiver<f32>,
+ user_volumes: Arc<Mutex<HashMap<u32, f32>>>,
) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
move |data: &mut [T], _info: &OutputCallbackInfo| {
for sample in data.iter_mut() {
*sample = Sample::from(&0.0);
}
+ let volume = *output_volume_receiver.borrow();
+
let mut lock = buf.lock().unwrap();
- for client_stream in lock.values_mut() {
+ for (id, client_stream) in &mut *lock {
+ let user_volume = user_volumes.lock().unwrap().get(id).cloned().unwrap_or(1.0);
for sample in data.iter_mut() {
*sample = sample.saturating_add(Sample::from(
- &client_stream.buffer.pop_front().unwrap_or(0.0),
+ &(client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume),
));
}
}
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index d4b25d0..330e3fc 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -1,9 +1,11 @@
-use crate::state::State;
+use crate::state::{ExecutionContext, State};
use crate::network::tcp::{TcpEvent, TcpEventCallback};
use ipc_channel::ipc::IpcSender;
use log::*;
+use mumble_protocol::ping::PongPacket;
use mumlib::command::{Command, CommandResponse};
+use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};
@@ -14,28 +16,51 @@ pub async fn handle(
IpcSender<mumlib::error::Result<Option<CommandResponse>>>,
)>,
tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>,
+ ping_request_sender: mpsc::UnboundedSender<(u64, SocketAddr, Box<dyn FnOnce(PongPacket)>)>,
) {
debug!("Begin listening for commands");
while let Some((command, response_sender)) = command_receiver.recv().await {
debug!("Received command {:?}", command);
let mut state = state.lock().unwrap();
- let (event, generator) = state.handle_command(command);
+ let event = state.handle_command(command);
drop(state);
- if let Some(event) = event {
- let (tx, rx) = oneshot::channel();
- //TODO handle this error
- let _ = tcp_event_register_sender.send((
- event,
- Box::new(move |e| {
- let response = generator(Some(e));
- response_sender.send(response).unwrap();
- tx.send(()).unwrap();
- }),
- ));
+ match event {
+ ExecutionContext::TcpEvent(event, generator) => {
+ let (tx, rx) = oneshot::channel();
+ //TODO handle this error
+ let _ = tcp_event_register_sender.send((
+ event,
+ Box::new(move |e| {
+ let response = generator(e);
+ response_sender.send(response).unwrap();
+ tx.send(()).unwrap();
+ }),
+ ));
- rx.await.unwrap();
- } else {
- response_sender.send(generator(None)).unwrap();
+ rx.await.unwrap();
+ }
+ ExecutionContext::Now(generator) => {
+ response_sender.send(generator()).unwrap();
+ }
+ ExecutionContext::Ping(generator, converter) => {
+ match generator() {
+ Ok(addr) => {
+ let res = ping_request_sender.send((
+ 0,
+ addr,
+ Box::new(move |packet| {
+ response_sender.send(converter(packet)).unwrap();
+ }),
+ ));
+ if res.is_err() {
+ panic!();
+ }
+ }
+ Err(e) => {
+ response_sender.send(Err(e)).unwrap();
+ }
+ };
+ }
}
}
}
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index 37ff0dd..b83299f 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -36,11 +36,12 @@ async fn main() {
let (connection_info_sender, connection_info_receiver) =
watch::channel::<Option<ConnectionInfo>>(None);
let (response_sender, response_receiver) = mpsc::unbounded_channel();
+ let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel();
let state = State::new(packet_sender, connection_info_sender);
let state = Arc::new(Mutex::new(state));
- let (_, _, _, e) = join!(
+ let (_, _, _, e, _) = join!(
network::tcp::handle(
Arc::clone(&state),
connection_info_receiver.clone(),
@@ -53,11 +54,17 @@ async fn main() {
connection_info_receiver.clone(),
crypt_state_receiver,
),
- command::handle(state, command_receiver, response_sender),
+ command::handle(
+ state,
+ command_receiver,
+ response_sender,
+ ping_request_sender,
+ ),
spawn_blocking(move || {
// IpcSender is blocking
receive_oneshot_commands(command_sender);
}),
+ network::udp::handle_pings(ping_request_receiver),
);
e.unwrap();
}
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index cd11690..131f066 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -27,7 +27,7 @@ type TcpSender = SplitSink<
type TcpReceiver =
SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>;
-pub(crate) type TcpEventCallback = Box<dyn FnOnce(&TcpEventData)>;
+pub(crate) type TcpEventCallback = Box<dyn FnOnce(TcpEventData)>;
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum TcpEvent {
@@ -228,7 +228,7 @@ async fn listen(
if let Some(vec) = event_queue.lock().unwrap().get_mut(&TcpEvent::Connected) {
let old = std::mem::take(vec);
for handler in old {
- handler(&TcpEventData::Connected(&msg));
+ handler(TcpEventData::Connected(&msg));
}
}
let mut state = state.lock().unwrap();
@@ -282,7 +282,7 @@ async fn listen(
if let Some(vec) = event_queue.lock().unwrap().get_mut(&TcpEvent::Disconnected) {
let old = std::mem::take(vec);
for handler in old {
- handler(&TcpEventData::Disconnected);
+ handler(TcpEventData::Disconnected);
}
}
},
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 4f96c4c..f97807d 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -6,9 +6,13 @@ use bytes::Bytes;
use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt};
use futures_util::stream::{SplitSink, SplitStream};
use mumble_protocol::crypt::ClientCryptState;
+use mumble_protocol::ping::{PingPacket, PongPacket};
use mumble_protocol::voice::{VoicePacket, VoicePacketPayload};
use mumble_protocol::Serverbound;
+use std::collections::HashMap;
+use std::convert::TryFrom;
use std::net::{Ipv6Addr, SocketAddr};
+use std::rc::Rc;
use std::sync::{Arc, Mutex};
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, oneshot, watch};
@@ -225,3 +229,48 @@ async fn send_voice(
debug!("UDP sender process killed");
}
+
+pub async fn handle_pings(
+ mut ping_request_receiver: mpsc::UnboundedReceiver<(
+ u64,
+ SocketAddr,
+ Box<dyn FnOnce(PongPacket)>,
+ )>,
+) {
+ let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
+ .await
+ .expect("Failed to bind UDP socket");
+
+ let (mut receiver, mut sender) = udp_socket.split();
+
+ let pending = Rc::new(Mutex::new(HashMap::new()));
+
+ let sender_handle = async {
+ while let Some((id, socket_addr, handle)) = ping_request_receiver.recv().await {
+ let packet = PingPacket { id };
+ let packet: [u8; 12] = packet.into();
+ sender.send_to(&packet, &socket_addr).await.unwrap();
+ pending.lock().unwrap().insert(id, handle);
+ }
+ };
+
+ let receiver_handle = async {
+ let mut buf = vec![0; 24];
+ while let Ok(read) = receiver.recv(&mut buf).await {
+ assert_eq!(read, 24);
+
+ let packet = match PongPacket::try_from(buf.as_slice()) {
+ Ok(v) => v,
+ Err(_) => panic!(),
+ };
+
+ if let Some(handler) = pending.lock().unwrap().remove(&packet.id) {
+ handler(packet);
+ }
+ }
+ };
+
+ debug!("Waiting for ping requests");
+
+ join!(sender_handle, receiver_handle);
+}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index def5c03..d1d5510 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -11,26 +11,40 @@ use crate::network::tcp::{TcpEvent, TcpEventData};
use log::*;
use mumble_protocol::control::msgs;
use mumble_protocol::control::ControlPacket;
+use mumble_protocol::ping::PongPacket;
use mumble_protocol::voice::Serverbound;
use mumlib::command::{Command, CommandResponse};
use mumlib::config::Config;
use mumlib::error::{ChannelIdentifierError, Error};
use mumlib::state::UserDiff;
-use std::net::ToSocketAddrs;
+use std::net::{SocketAddr, ToSocketAddrs};
use tokio::sync::{mpsc, watch};
macro_rules! at {
($event:expr, $generator:expr) => {
- (Some($event), Box::new($generator))
+ ExecutionContext::TcpEvent($event, Box::new($generator))
};
}
macro_rules! now {
($data:expr) => {
- (None, Box::new(move |_| $data))
+ ExecutionContext::Now(Box::new(move || $data))
};
}
+//TODO give me a better name
+pub enum ExecutionContext {
+ TcpEvent(
+ TcpEvent,
+ Box<dyn FnOnce(TcpEventData) -> mumlib::error::Result<Option<CommandResponse>>>,
+ ),
+ Now(Box<dyn FnOnce() -> mumlib::error::Result<Option<CommandResponse>>>),
+ Ping(
+ Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>,
+ Box<dyn FnOnce(PongPacket) -> mumlib::error::Result<Option<CommandResponse>>>,
+ ),
+}
+
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum StatePhase {
Disconnected,
@@ -39,7 +53,7 @@ pub enum StatePhase {
}
pub struct State {
- config: Option<Config>,
+ config: Config,
server: Option<Server>,
audio: Audio,
@@ -68,13 +82,7 @@ impl State {
}
//TODO? move bool inside Result
- pub fn handle_command(
- &mut self,
- command: Command,
- ) -> (
- Option<TcpEvent>,
- Box<dyn FnOnce(Option<&TcpEventData>) -> mumlib::error::Result<Option<CommandResponse>>>,
- ) {
+ pub fn handle_command(&mut self, command: Command) -> ExecutionContext {
match command {
Command::ChannelJoin { channel_identifier } => {
if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) {
@@ -128,7 +136,7 @@ impl State {
}
Command::ChannelList => {
if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) {
- return (None, Box::new(|_| Err(Error::DisconnectedError)));
+ return now!(Err(Error::DisconnectedError));
}
let list = channel::into_channel(
self.server.as_ref().unwrap().channels(),
@@ -173,7 +181,7 @@ impl State {
.unwrap();
at!(TcpEvent::Connected, |e| {
//runs the closure when the client is connected
- if let Some(TcpEventData::Connected(msg)) = e {
+ if let TcpEventData::Connected(msg) = e {
Ok(Some(CommandResponse::ServerConnect {
welcome_message: if msg.has_welcome_text() {
Some(msg.get_welcome_text().to_string())
@@ -209,12 +217,16 @@ impl State {
.unwrap();
now!(Ok(None))
}
+ Command::ConfigReload => {
+ self.reload_config();
+ now!(Ok(None))
+ }
Command::InputVolumeSet(volume) => {
self.audio.set_input_volume(volume);
now!(Ok(None))
}
- Command::ConfigReload => {
- self.reload_config();
+ Command::OutputVolumeSet(volume) => {
+ self.audio.set_output_volume(volume);
now!(Ok(None))
}
Command::DeafenSelf => {
@@ -258,6 +270,44 @@ impl State {
return now!(Ok(None));
}
+ Command::UserVolumeSet(string, volume) => {
+ if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) {
+ return now!(Err(Error::DisconnectedError));
+ }
+ let user_id = match self
+ .server()
+ .unwrap()
+ .users()
+ .iter()
+ .find(|e| e.1.name() == &string)
+ .map(|e| *e.0)
+ {
+ None => return now!(Err(Error::InvalidUsernameError(string))),
+ Some(v) => v,
+ };
+
+ self.audio.set_user_volume(user_id, volume);
+ now!(Ok(None))
+ }
+ Command::ServerStatus { host, port } => ExecutionContext::Ping(
+ Box::new(move || {
+ match (host.as_str(), port)
+ .to_socket_addrs()
+ .map(|mut e| e.next())
+ {
+ Ok(Some(v)) => Ok(v),
+ _ => Err(mumlib::error::Error::InvalidServerAddrError(host, port)),
+ }
+ }),
+ Box::new(move |pong| {
+ Ok(Some(CommandResponse::ServerStatus {
+ version: pong.version,
+ users: pong.users,
+ max_users: pong.max_users,
+ bandwidth: pong.bandwidth,
+ }))
+ }),
+ ),
}
}
@@ -270,9 +320,9 @@ impl State {
// check if this is initial state
if !self.server().unwrap().users().contains_key(&session) {
self.parse_initial_user_state(session, msg);
- return None;
+ None
} else {
- return Some(self.parse_updated_user_state(session, msg));
+ Some(self.parse_updated_user_state(session, msg))
}
}
@@ -393,16 +443,12 @@ impl State {
}
pub fn reload_config(&mut self) {
- if let Some(config) = mumlib::config::read_default_cfg() {
- self.config = Some(config);
- let config = &self.config.as_ref().unwrap();
- if let Some(audio_config) = &config.audio {
- if let Some(input_volume) = audio_config.input_volume {
- self.audio.set_input_volume(input_volume);
- }
- }
- } else {
- warn!("config file not found");
+ self.config = mumlib::config::read_default_cfg();
+ if let Some(input_volume) = self.config.audio.input_volume {
+ self.audio.set_input_volume(input_volume);
+ }
+ if let Some(output_volume) = self.config.audio.output_volume {
+ self.audio.set_output_volume(output_volume);
}
}
diff --git a/mumlib/src/command.rs b/mumlib/src/command.rs
index 63dd5f9..28b4d79 100644
--- a/mumlib/src/command.rs
+++ b/mumlib/src/command.rs
@@ -10,6 +10,8 @@ pub enum Command {
ChannelList,
ConfigReload,
InputVolumeSet(f32),
+ OutputVolumeSet(f32),
+ UserVolumeSet(String, f32),
ServerConnect {
host: String,
port: u16,
@@ -21,11 +23,27 @@ pub enum Command {
DeafenSelf,
MuteSelf,
MuteOther(String),
+ ServerStatus {
+ host: String,
+ port: u16,
+ },
}
#[derive(Debug, Deserialize, Serialize)]
pub enum CommandResponse {
- ChannelList { channels: Channel },
- ServerConnect { welcome_message: Option<String> },
- Status { server_state: Server },
+ ChannelList {
+ channels: Channel,
+ },
+ ServerConnect {
+ welcome_message: Option<String>,
+ },
+ Status {
+ server_state: Server,
+ },
+ ServerStatus {
+ version: u32,
+ users: u32,
+ max_users: u32,
+ bandwidth: u32,
+ },
}
diff --git a/mumlib/src/config.rs b/mumlib/src/config.rs
index e6b97fd..3a2fa27 100644
--- a/mumlib/src/config.rs
+++ b/mumlib/src/config.rs
@@ -1,6 +1,8 @@
+use crate::DEFAULT_PORT;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::fs;
+use std::net::{SocketAddr, ToSocketAddrs};
use std::path::Path;
use toml::value::Array;
use toml::Value;
@@ -13,8 +15,8 @@ struct TOMLConfig {
#[derive(Clone, Debug, Default)]
pub struct Config {
- pub audio: Option<AudioConfig>,
- pub servers: Option<Vec<ServerConfig>>,
+ pub audio: AudioConfig,
+ pub servers: Vec<ServerConfig>,
}
impl Config {
@@ -44,9 +46,10 @@ impl Config {
}
}
-#[derive(Clone, Debug, Deserialize, Serialize)]
+#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct AudioConfig {
pub input_volume: Option<f32>,
+ pub output_volume: Option<f32>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
@@ -58,6 +61,18 @@ pub struct ServerConfig {
pub password: Option<String>,
}
+impl ServerConfig {
+ pub fn to_socket_addr(&self) -> Option<SocketAddr> {
+ match (self.host.as_str(), self.port.unwrap_or(DEFAULT_PORT))
+ .to_socket_addrs()
+ .map(|mut e| e.next())
+ {
+ Ok(Some(addr)) => Some(addr),
+ _ => None,
+ }
+ }
+}
+
pub fn get_cfg_path() -> String {
if let Ok(var) = std::env::var("XDG_CONFIG_HOME") {
let path = format!("{}/mumdrc", var);
@@ -113,7 +128,7 @@ impl TryFrom<TOMLConfig> for Config {
fn try_from(config: TOMLConfig) -> Result<Self, Self::Error> {
Ok(Config {
- audio: config.audio,
+ audio: config.audio.unwrap_or_default(),
servers: config
.servers
.map(|servers| {
@@ -122,7 +137,8 @@ impl TryFrom<TOMLConfig> for Config {
.map(|s| s.try_into::<ServerConfig>())
.collect()
})
- .transpose()?,
+ .transpose()?
+ .unwrap_or(Vec::new()),
})
}
}
@@ -130,26 +146,29 @@ impl TryFrom<TOMLConfig> for Config {
impl From<Config> for TOMLConfig {
fn from(config: Config) -> Self {
TOMLConfig {
- audio: config.audio,
- servers: config.servers.map(|servers| {
- servers
+ audio: if config.audio.output_volume.is_some() || config.audio.input_volume.is_some() {
+ Some(config.audio)
+ } else {
+ None
+ },
+ servers: Some(
+ config
+ .servers
.into_iter()
.map(|s| Value::try_from::<ServerConfig>(s).unwrap())
- .collect()
- }),
+ .collect(),
+ ),
}
}
}
-pub fn read_default_cfg() -> Option<Config> {
- Some(
- Config::try_from(
- toml::from_str::<TOMLConfig>(&match fs::read_to_string(get_cfg_path()) {
- Ok(f) => f.to_string(),
- Err(_) => return None,
- })
- .expect("invalid TOML in config file"), //TODO
- )
- .expect("invalid config in TOML"),
- ) //TODO
+pub fn read_default_cfg() -> Config {
+ Config::try_from(
+ toml::from_str::<TOMLConfig>(&match fs::read_to_string(get_cfg_path()) {
+ Ok(f) => f,
+ Err(_) => return Config::default(),
+ })
+ .expect("invalid TOML in config file"), //TODO
+ )
+ .expect("invalid config in TOML") //TODO
}
diff --git a/mumlib/src/error.rs b/mumlib/src/error.rs
index e8b0323..c9eff52 100644
--- a/mumlib/src/error.rs
+++ b/mumlib/src/error.rs
@@ -11,6 +11,7 @@ pub enum Error {
ChannelIdentifierError(String, ChannelIdentifierError),
InvalidUserIdentifierError(String),
InvalidServerAddrError(String, u16),
+ InvalidUsernameError(String),
}
impl Display for Error {
@@ -21,6 +22,7 @@ impl Display for Error {
Error::ChannelIdentifierError(id, kind) => write!(f, "{}: {}", kind, id),
Error::InvalidServerAddrError(addr, port) => write!(f, "Invalid server address: {}: {}", addr, port),
Error::InvalidUserIdentifierError(name) => write!(f, "Invalid username: {}", name),
+ Error::InvalidUsernameError(username) => write!(f, "Invalid username: {}", username),
}
}
}
diff --git a/mumlib/src/lib.rs b/mumlib/src/lib.rs
index a54990e..439efa9 100644
--- a/mumlib/src/lib.rs
+++ b/mumlib/src/lib.rs
@@ -7,6 +7,7 @@ use colored::*;
use log::*;
pub const SOCKET_PATH: &str = "/var/tmp/mumd";
+pub const DEFAULT_PORT: u16 = 64738;
pub fn setup_logger<T: Into<fern::Output>>(target: T, color: bool) {
fern::Dispatch::new()
diff --git a/usage.org b/usage.org
index 9402f39..ca544be 100644
--- a/usage.org
+++ b/usage.org
@@ -145,3 +145,9 @@ $ mumctl config audio.input_volume 1.1
$ mumctl config audio.input_volume
$ mumctl config audio.input_volume --help
#+END_SRC
+** TODO volume
+#+BEGIN_SRC bash
+$ mumctl volume set User1 1.1
+$ mumctl volume User1
+110%
+#+END_SRC