aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2021-04-19 23:17:52 +0200
committerGitHub <noreply@github.com>2021-04-19 23:17:52 +0200
commit182222664a513ef2b2da74e6b4f67274338b5a77 (patch)
tree16ff6ca08aaeb5f6cbde8289940b56623b8706d0
parent61ad0c5f80c79911cce00263d4b1bd3e2f2defe8 (diff)
parent4da882513c9a692161ff00e4421325ffc7d4af24 (diff)
downloadmum-182222664a513ef2b2da74e6b4f67274338b5a77.tar.gz
Merge pull request #90 from mum-rs/server-list-parallel
-rw-r--r--Cargo.lock34
-rw-r--r--mumctl/src/main.rs79
-rw-r--r--mumd/src/command.rs10
-rw-r--r--mumd/src/main.rs8
-rw-r--r--mumd/src/network/udp.rs69
-rw-r--r--mumd/src/state.rs4
-rw-r--r--mumlib/Cargo.toml1
-rw-r--r--mumlib/src/lib.rs3
8 files changed, 159 insertions, 49 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 2e68fa5..ccb8871 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -161,6 +161,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
+name = "chrono"
+version = "0.4.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
+dependencies = [
+ "libc",
+ "num-integer",
+ "num-traits",
+ "time",
+ "winapi",
+]
+
+[[package]]
name = "clang-sys"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -885,6 +898,7 @@ dependencies = [
name = "mumlib"
version = "0.4.0"
dependencies = [
+ "chrono",
"colored",
"dirs",
"fern",
@@ -999,6 +1013,16 @@ dependencies = [
]
[[package]]
+name = "num-integer"
+version = "0.1.44"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
+dependencies = [
+ "autocfg",
+ "num-traits",
+]
+
+[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1573,6 +1597,16 @@ dependencies = [
]
[[package]]
+name = "time"
+version = "0.1.43"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
+dependencies = [
+ "libc",
+ "winapi",
+]
+
+[[package]]
name = "tokio"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/mumctl/src/main.rs b/mumctl/src/main.rs
index 29c9e44..5abed50 100644
--- a/mumctl/src/main.rs
+++ b/mumctl/src/main.rs
@@ -3,7 +3,11 @@ use log::*;
use mumlib::command::{Command as MumCommand, CommandResponse};
use mumlib::config::{self, Config, ServerConfig};
use mumlib::state::Channel as MumChannel;
-use std::{fmt,io::{self, BufRead, Read, Write}, iter, os::unix::net::UnixStream};
+use std::fmt;
+use std::io::{self, BufRead, Read, Write};
+use std::iter;
+use std::os::unix::net::UnixStream;
+use std::thread;
use structopt::{clap::Shell, StructOpt};
const INDENTATION: &str = " ";
@@ -297,7 +301,7 @@ fn match_opt() -> Result<(), Error> {
}
}
_ => {
- return Err(CliError::ConfigKeyNotFound(key))?;
+ return Err(CliError::ConfigKeyNotFound(key).into());
}
},
Command::ConfigReload => {
@@ -363,7 +367,7 @@ fn match_opt() -> Result<(), Error> {
Ok(())
}
-fn match_server_command(server_command: Server, config: &mut Config) -> Result<(), CliError> {
+fn match_server_command(server_command: Server, config: &mut Config) -> Result<(), Error> {
match server_command {
Server::Config {
server_name,
@@ -473,7 +477,7 @@ fn match_server_command(server_command: Server, config: &mut Config) -> Result<(
password,
} => {
if config.servers.iter().any(|s| s.name == name) {
- return Err(CliError::ServerAlreadyExists(name))?;
+ return Err(CliError::ServerAlreadyExists(name).into());
} else {
config.servers.push(ServerConfig {
name,
@@ -494,31 +498,56 @@ fn match_server_command(server_command: Server, config: &mut Config) -> Result<(
}
Server::List => {
if config.servers.is_empty() {
- return Err(CliError::NoServers)?;
+ return Err(CliError::NoServers.into());
}
- let query = config
+
+ let longest = config
.servers
.iter()
- .map(|e| {
- let response = send_command(MumCommand::ServerStatus {
- host: e.host.clone(),
- port: e.port.unwrap_or(mumlib::DEFAULT_PORT),
- });
- response.map(|f| (e, f))
+ .map(|s| s.name.len())
+ .max()
+ .unwrap() // ok since !config.servers.is_empty() above
+ + 1;
+
+ let queries: Vec<_> = config
+ .servers
+ .iter()
+ .map(|s| {
+ let query = MumCommand::ServerStatus {
+ host: s.host.clone(),
+ port: s.port.unwrap_or(mumlib::DEFAULT_PORT),
+ };
+ thread::spawn(move || {
+ send_command(query)
+ })
})
- .collect::<Result<Vec<_>, _>>()?;
- for (server, response) in query
- .into_iter()
- .filter(|e| e.1.is_ok())
- .map(|e| (e.0, e.1.unwrap().unwrap()))
- {
- if let CommandResponse::ServerStatus {
- users, max_users, ..
- } = response
- {
- println!("{} [{}/{}]", server.name, users, max_users)
- } else {
- unreachable!()
+ .collect();
+
+ for (server, response) in config.servers.iter().zip(queries) {
+ match response.join().unwrap() {
+ Ok(Ok(Some(response))) => {
+ if let CommandResponse::ServerStatus {
+ users,
+ max_users,
+ ..
+ } = response
+ {
+ println!("{0:<1$} [{2:}/{3:}]", server.name, longest, users, max_users);
+ } else {
+ unreachable!();
+ }
+ }
+ Ok(Ok(None)) => {
+ println!("{0:<1$} offline", server.name, longest);
+ }
+ Ok(Err(e)) => {
+ error!("{}", e);
+ return Err(e.into());
+ }
+ Err(e) => {
+ error!("{}", e);
+ return Err(e.into());
+ }
}
}
}
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index 7eec388..1337dce 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -8,7 +8,7 @@ use crate::state::{ExecutionContext, State};
use log::*;
use mumble_protocol::{Serverbound, control::ControlPacket};
use mumlib::command::{Command, CommandResponse};
-use std::sync::{Arc, RwLock};
+use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock};
use tokio::sync::{mpsc, oneshot, watch};
pub async fn handle(
@@ -23,6 +23,7 @@ pub async fn handle(
mut connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
) {
debug!("Begin listening for commands");
+ let ping_count = AtomicU64::new(0);
while let Some((command, response_sender)) = command_receiver.recv().await {
debug!("Received command {:?}", command);
let mut state = state.write().unwrap();
@@ -47,10 +48,13 @@ pub async fn handle(
response_sender.send(generator()).unwrap();
}
ExecutionContext::Ping(generator, converter) => {
- match generator() {
+ let ret = generator();
+ debug!("Ping generated: {:?}", ret);
+ match ret {
Ok(addr) => {
+ let id = ping_count.fetch_add(1, Ordering::Relaxed);
let res = ping_request_sender.send((
- 0,
+ id,
addr,
Box::new(move |packet| {
response_sender.send(converter(packet)).unwrap();
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index d7bc2c0..f298070 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -109,7 +109,13 @@ async fn receive_commands(
sender.send((command, tx)).unwrap();
- let response = rx.await.unwrap();
+ let response = match rx.await {
+ Ok(r) => r,
+ Err(_) => {
+ error!("Internal command response sender dropped");
+ Ok(None)
+ }
+ };
let mut serialized = BytesMut::new();
bincode::serialize_into((&mut serialized).writer(), &response).unwrap();
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 3ca77af..0958912 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -3,27 +3,25 @@ use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
use futures_util::{FutureExt, SinkExt, StreamExt};
+use futures_util::future::join4;
use futures_util::stream::{SplitSink, SplitStream, Stream};
use log::*;
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::ping::{PingPacket, PongPacket};
use mumble_protocol::voice::VoicePacket;
use mumble_protocol::Serverbound;
-use std::collections::HashMap;
+use std::collections::{hash_map::Entry, HashMap};
use std::convert::TryFrom;
use std::net::{Ipv6Addr, SocketAddr};
-use std::rc::Rc;
-use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::{Arc, RwLock};
+use std::sync::{atomic::{AtomicU64, Ordering}, Arc, RwLock};
use tokio::{join, net::UdpSocket};
-use tokio::sync::{mpsc, watch, Mutex};
-use tokio::time::{interval, Duration};
+use tokio::sync::{mpsc, oneshot, watch, Mutex};
+use tokio::time::{interval, timeout, Duration};
use tokio_util::udp::UdpFramed;
use super::{run_until, VoiceStreamType};
-use futures_util::future::join4;
-pub type PingRequest = (u64, SocketAddr, Box<dyn FnOnce(PongPacket)>);
+pub type PingRequest = (u64, SocketAddr, Box<dyn FnOnce(Option<PongPacket>) + Send>);
type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>;
type UdpReceiver = SplitStream<UdpFramed<ClientCryptState>>;
@@ -228,31 +226,68 @@ pub async fn handle_pings(
.await
.expect("Failed to bind UDP socket");
- let pending = Rc::new(Mutex::new(HashMap::new()));
+ let pending = Mutex::new(HashMap::new());
- let sender_handle = async {
+ let sender = async {
while let Some((id, socket_addr, handle)) = ping_request_receiver.recv().await {
+ debug!("Sending ping with id {} to {}", id, socket_addr);
let packet = PingPacket { id };
let packet: [u8; 12] = packet.into();
udp_socket.send_to(&packet, &socket_addr).await.unwrap();
- pending.lock().await.insert(id, handle);
+ let (tx, rx) = oneshot::channel();
+ match pending.lock().await.entry(id) {
+ Entry::Occupied(_) => {
+ warn!("Tried to send duplicate ping with id {}", id);
+ continue;
+ }
+ Entry::Vacant(v) => {
+ v.insert(tx);
+ }
+ }
+
+ tokio::spawn(async move {
+ handle(
+ match timeout(Duration::from_secs(1), rx).await {
+ Ok(Ok(r)) => Some(r),
+ Ok(Err(_)) => {
+ warn!("Ping response sender for server {}, ping id {} dropped", socket_addr, id);
+ None
+ }
+ Err(_) => {
+ debug!("Server {} timed out when sending ping id {}", socket_addr, id);
+ None
+ }
+ }
+ );
+ });
}
};
- let receiver_handle = async {
+ let receiver = async {
let mut buf = vec![0; 24];
+
while let Ok(read) = udp_socket.recv(&mut buf).await {
- assert_eq!(read, 24);
+ if read != 24 {
+ warn!("Ping response had length {}, expected 24", read);
+ continue;
+ }
let packet = PongPacket::try_from(buf.as_slice()).unwrap();
- if let Some(handler) = pending.lock().await.remove(&packet.id) {
- handler(packet);
+ match pending.lock().await.entry(packet.id) {
+ Entry::Occupied(o) => {
+ let id = *o.key();
+ if o.remove().send(packet).is_err() {
+ debug!("Received response to ping with id {} too late", id);
+ }
+ }
+ Entry::Vacant(v) => {
+ warn!("Received ping with id {} that we didn't send", v.key());
+ }
}
}
};
debug!("Waiting for ping requests");
-
- join!(sender_handle, receiver_handle);
+ join!(sender, receiver);
}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index 132da74..45e7301 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -43,7 +43,7 @@ pub enum ExecutionContext {
Now(Box<dyn FnOnce() -> mumlib::error::Result<Option<CommandResponse>>>),
Ping(
Box<dyn FnOnce() -> mumlib::error::Result<SocketAddr>>,
- Box<dyn FnOnce(PongPacket) -> mumlib::error::Result<Option<CommandResponse>>>,
+ Box<dyn FnOnce(Option<PongPacket>) -> mumlib::error::Result<Option<CommandResponse>> + Send>,
),
}
@@ -390,7 +390,7 @@ impl State {
}
}),
Box::new(move |pong| {
- Ok(Some(CommandResponse::ServerStatus {
+ Ok(pong.map(|pong| CommandResponse::ServerStatus {
version: pong.version,
users: pong.users,
max_users: pong.max_users,
diff --git a/mumlib/Cargo.toml b/mumlib/Cargo.toml
index bda026b..5c9d4e1 100644
--- a/mumlib/Cargo.toml
+++ b/mumlib/Cargo.toml
@@ -13,6 +13,7 @@ readme = "../README.md"
[dependencies]
colored = "2"
+chrono = "0.4"
dirs = "3"
fern = "0.6"
log = "0.4"
diff --git a/mumlib/src/lib.rs b/mumlib/src/lib.rs
index 36edc10..9b7d686 100644
--- a/mumlib/src/lib.rs
+++ b/mumlib/src/lib.rs
@@ -16,7 +16,7 @@ pub fn setup_logger<T: Into<fern::Output>>(target: T, color: bool) {
.format(move |out, message, record| {
let message = message.to_string();
out.finish(format_args!(
- "{} {}:{}{}{}",
+ "{} {} {}:{}{}{}",
//TODO runtime flag that disables color
if color {
match record.level() {
@@ -36,6 +36,7 @@ pub fn setup_logger<T: Into<fern::Output>>(target: T, color: bool) {
}
.normal()
},
+ chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S%.6f]"),
record.file().unwrap(),
record.line().unwrap(),
if message.chars().any(|e| e == '\n') {