aboutsummaryrefslogtreecommitdiffstats
path: root/mumd
diff options
context:
space:
mode:
Diffstat (limited to 'mumd')
-rw-r--r--mumd/Cargo.toml7
-rw-r--r--mumd/src/command.rs53
-rw-r--r--mumd/src/main.rs118
-rw-r--r--mumd/src/state.rs223
4 files changed, 51 insertions, 350 deletions
diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml
index 72f9167..9101b43 100644
--- a/mumd/Cargo.toml
+++ b/mumd/Cargo.toml
@@ -9,18 +9,20 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+mumlib = { path = "../mumlib" }
+
argparse = "0.2"
bytes = "0.5"
-colored = "2.0"
cpal = { git = "https://github.com/RustAudio/cpal" }
-fern = "0.5"
futures = "0.3"
futures-util = "0.3"
+ipc-channel = "0.14"
log = "0.4"
mumble-protocol = "0.3"
native-tls = "0.2"
openssl = { version = "0.10", optional = true }
opus = "0.2"
+serde = { version = "1.0", features = ["derive"] }
tokio = { version = "0.2", features = ["full"] }
tokio-tls = "0.3"
tokio-util = { version = "0.3", features = ["codec", "udp"] }
@@ -28,4 +30,3 @@ tokio-util = { version = "0.3", features = ["codec", "udp"] }
#clap = "2.33"
#compressor = "0.3"
#daemonize = "0.4"
-#ipc-channel = "0.14"
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index b4bd1b7..9adf7d8 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -1,54 +1,33 @@
-use crate::state::{Channel, Server, State, StatePhase};
+use crate::state::{State, StatePhase};
+use ipc_channel::ipc::IpcSender;
use log::*;
-use std::collections::HashMap;
+use mumlib::command::{Command, CommandResponse};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
-#[derive(Clone, Debug)]
-pub enum Command {
- ChannelJoin {
- channel_id: u32,
- },
- ChannelList,
- ServerConnect {
- host: String,
- port: u16,
- username: String,
- accept_invalid_cert: bool, //TODO ask when connecting
- },
- ServerDisconnect,
- Status,
-}
-
-#[derive(Debug)]
-pub enum CommandResponse {
- ChannelList {
- channels: HashMap<u32, Channel>,
- },
- Status {
- username: Option<String>,
- server_state: Server,
- },
-}
-
pub async fn handle(
state: Arc<Mutex<State>>,
- mut command_receiver: mpsc::UnboundedReceiver<Command>,
- command_response_sender: mpsc::UnboundedSender<Result<Option<CommandResponse>, ()>>,
+ mut command_receiver: mpsc::UnboundedReceiver<(Command, IpcSender<Result<Option<CommandResponse>, ()>>)>,
) {
- //TODO err if not connected
- while let Some(command) = command_receiver.recv().await {
- debug!("Parsing command {:?}", command);
+ debug!("Begin listening for commands");
+ loop {
+ debug!("Enter loop");
+ let command = command_receiver.recv().await.unwrap();
+ debug!("Received command {:?}", command.0);
let mut state = state.lock().unwrap();
- let (wait_for_connected, command_response) = state.handle_command(command).await;
+ let (wait_for_connected, command_response) = state.handle_command(command.0).await;
if wait_for_connected {
let mut watcher = state.phase_receiver();
drop(state);
while !matches!(watcher.recv().await.unwrap(), StatePhase::Connected) {}
}
- command_response_sender.send(command_response).unwrap();
+ command.1.send(command_response).unwrap();
}
+ //TODO err if not connected
+ //while let Some(command) = command_receiver.recv().await {
+ // debug!("Parsing command {:?}", command);
+ //}
- debug!("Finished handling commands");
+ //debug!("Finished handling commands");
}
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index f837a52..14a43c1 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -3,96 +3,41 @@ mod command;
mod network;
mod state;
-use crate::command::{Command, CommandResponse};
use crate::network::ConnectionInfo;
use crate::state::State;
-use argparse::ArgumentParser;
-use argparse::Store;
-use argparse::StoreTrue;
-use colored::*;
use futures::join;
+use ipc_channel::ipc::{IpcSender, IpcOneShotServer};
use log::*;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::voice::Serverbound;
+use mumlib::command::{Command, CommandResponse};
+use mumlib::setup_logger;
+use std::fs;
use std::sync::{Arc, Mutex};
-use std::time::Duration;
use tokio::sync::{mpsc, watch};
+use tokio::task::spawn_blocking;
#[tokio::main]
async fn main() {
- // setup logger
- fern::Dispatch::new()
- .format(|out, message, record| {
- let message = message.to_string();
- out.finish(format_args!(
- "{} {}:{}{}{}",
- //TODO runtime flag that disables color
- match record.level() {
- Level::Error => "ERROR".red(),
- Level::Warn => "WARN ".yellow(),
- Level::Info => "INFO ".normal(),
- Level::Debug => "DEBUG".green(),
- Level::Trace => "TRACE".normal(),
- },
- record.file().unwrap(),
- record.line().unwrap(),
- if message.chars().any(|e| e == '\n') {
- "\n"
- } else {
- " "
- },
- message
- ))
- })
- .level(log::LevelFilter::Debug)
- .chain(std::io::stderr())
- .apply()
- .unwrap();
-
- // Handle command line arguments
- let mut server_host = "".to_string();
- let mut server_port = 64738u16;
- let mut username = "EchoBot".to_string();
- let mut accept_invalid_cert = false;
- {
- let mut ap = ArgumentParser::new();
- ap.set_description("Run the echo client example");
- ap.refer(&mut server_host)
- .add_option(&["--host"], Store, "Hostname of mumble server")
- .required();
- ap.refer(&mut server_port)
- .add_option(&["--port"], Store, "Port of mumble server");
- ap.refer(&mut username)
- .add_option(&["--username"], Store, "User name used to connect");
- ap.refer(&mut accept_invalid_cert).add_option(
- &["--accept-invalid-cert"],
- StoreTrue,
- "Accept invalid TLS certificates",
- );
- ap.parse_args_or_exit();
- }
+ setup_logger();
// Oneshot channel for setting UDP CryptState from control task
// For simplicity we don't deal with re-syncing, real applications would have to.
let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::<ClientCryptState>(1); // crypt state should always be consumed before sending a new one
let (packet_sender, packet_receiver) = mpsc::unbounded_channel::<ControlPacket<Serverbound>>();
- let (command_sender, command_receiver) = mpsc::unbounded_channel::<Command>();
- let (command_response_sender, command_response_receiver) =
- mpsc::unbounded_channel::<Result<Option<CommandResponse>, ()>>();
+ let (command_sender, command_receiver) = mpsc::unbounded_channel::<(Command, IpcSender<Result<Option<CommandResponse>, ()>>)>();
let (connection_info_sender, connection_info_receiver) =
watch::channel::<Option<ConnectionInfo>>(None);
let state = State::new(
packet_sender,
- command_sender.clone(),
connection_info_sender,
);
let state = Arc::new(Mutex::new(state));
- // Run it
- join!(
+ let (_, _, _, e) = join!(
network::tcp::handle(
Arc::clone(&state),
connection_info_receiver.clone(),
@@ -104,38 +49,29 @@ async fn main() {
connection_info_receiver.clone(),
crypt_state_receiver,
),
- command::handle(state, command_receiver, command_response_sender,),
- send_commands(
- command_sender,
- Command::ServerConnect {
- host: server_host,
- port: server_port,
- username: username.clone(),
- accept_invalid_cert
- }
+ command::handle(
+ state,
+ command_receiver,
),
- receive_command_responses(command_response_receiver,),
+ spawn_blocking(move || { // IpcSender is blocking
+ receive_oneshot_commands(command_sender);
+ }),
);
+ e.unwrap();
}
-async fn send_commands(command_sender: mpsc::UnboundedSender<Command>, connect_command: Command) {
- command_sender.send(connect_command.clone()).unwrap();
- tokio::time::delay_for(Duration::from_secs(2)).await;
- command_sender.send(Command::ServerDisconnect).unwrap();
- tokio::time::delay_for(Duration::from_secs(2)).await;
- command_sender.send(connect_command.clone()).unwrap();
- tokio::time::delay_for(Duration::from_secs(2)).await;
- command_sender.send(Command::ServerDisconnect).unwrap();
-
- debug!("Finished sending commands");
-}
-
-async fn receive_command_responses(
- mut command_response_receiver: mpsc::UnboundedReceiver<Result<Option<CommandResponse>, ()>>,
+fn receive_oneshot_commands(
+ command_sender: mpsc::UnboundedSender<(Command, IpcSender<Result<Option<CommandResponse>, ()>>)>,
) {
- while let Some(command_response) = command_response_receiver.recv().await {
- debug!("{:?}", command_response);
- }
+ loop {
+ // create listener
+ let (server, server_name): (IpcOneShotServer<(Command, IpcSender<Result<Option<CommandResponse>, ()>>)>, String) = IpcOneShotServer::new().unwrap();
+ fs::write("/var/tmp/mumd-oneshot", &server_name).unwrap();
+ debug!("Listening to {}", server_name);
- debug!("Finished receiving commands");
+ // receive command and response channel
+ let (_, conn): (_, (Command, IpcSender<Result<Option<CommandResponse>, ()>>)) = server.accept().unwrap();
+ debug!("Sending to command handler: {:#?}", conn.0);
+ command_sender.send(conn).unwrap();
+ }
}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index 68ced10..72197f6 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -1,12 +1,12 @@
use crate::audio::Audio;
-use crate::command::{Command, CommandResponse};
use crate::network::ConnectionInfo;
+
use log::*;
use mumble_protocol::control::msgs;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::voice::Serverbound;
-use std::collections::hash_map::Entry;
-use std::collections::HashMap;
+use mumlib::command::{Command, CommandResponse};
+use mumlib::state::Server;
use std::net::ToSocketAddrs;
use tokio::sync::{mpsc, watch};
@@ -22,7 +22,6 @@ pub struct State {
audio: Audio,
packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
- command_sender: mpsc::UnboundedSender<Command>,
connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
phase_watcher: (watch::Sender<StatePhase>, watch::Receiver<StatePhase>),
@@ -34,14 +33,12 @@ pub struct State {
impl State {
pub fn new(
packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
- command_sender: mpsc::UnboundedSender<Command>,
connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
) -> Self {
Self {
server: None,
audio: Audio::new(),
packet_sender,
- command_sender,
connection_info_sender,
phase_watcher: watch::channel(StatePhase::Disconnected),
username: None,
@@ -74,7 +71,7 @@ impl State {
(
false,
Ok(Some(CommandResponse::ChannelList {
- channels: self.server.as_ref().unwrap().channels.clone(),
+ channels: self.server.as_ref().unwrap().channels().clone(),
})),
)
}
@@ -191,215 +188,3 @@ impl State {
self.username.as_ref()
}
}
-
-#[derive(Clone, Debug)]
-pub struct Server {
- channels: HashMap<u32, Channel>,
- users: HashMap<u32, User>,
- pub welcome_text: Option<String>,
-}
-
-impl Server {
- pub fn new() -> Self {
- Self {
- channels: HashMap::new(),
- users: HashMap::new(),
- welcome_text: None,
- }
- }
-
- pub fn parse_server_sync(&mut self, mut msg: msgs::ServerSync) {
- if msg.has_welcome_text() {
- self.welcome_text = Some(msg.take_welcome_text());
- }
- }
-
- pub fn parse_channel_state(&mut self, msg: msgs::ChannelState) {
- if !msg.has_channel_id() {
- warn!("Can't parse channel state without channel id");
- return;
- }
- match self.channels.entry(msg.get_channel_id()) {
- Entry::Vacant(e) => {
- e.insert(Channel::new(msg));
- }
- Entry::Occupied(mut e) => e.get_mut().parse_channel_state(msg),
- }
- }
-
- pub fn parse_channel_remove(&mut self, msg: msgs::ChannelRemove) {
- if !msg.has_channel_id() {
- warn!("Can't parse channel remove without channel id");
- return;
- }
- match self.channels.entry(msg.get_channel_id()) {
- Entry::Vacant(_) => {
- warn!("Attempted to remove channel that doesn't exist");
- }
- Entry::Occupied(e) => {
- e.remove();
- }
- }
- }
-
- pub fn parse_user_state(&mut self, msg: msgs::UserState) {
- if !msg.has_session() {
- warn!("Can't parse user state without session");
- return;
- }
- match self.users.entry(msg.get_session()) {
- Entry::Vacant(e) => {
- e.insert(User::new(msg));
- }
- Entry::Occupied(mut e) => e.get_mut().parse_user_state(msg),
- }
- }
-
- pub fn channels(&self) -> &HashMap<u32, Channel> {
- &self.channels
- }
-
- pub fn users(&self) -> &HashMap<u32, User> {
- &self.users
- }
-}
-
-#[derive(Clone, Debug)]
-pub struct Channel {
- description: Option<String>,
- links: Vec<u32>,
- max_users: u32,
- name: String,
- parent: Option<u32>,
- position: i32,
-}
-
-impl Channel {
- pub fn new(mut msg: msgs::ChannelState) -> Self {
- Self {
- description: if msg.has_description() {
- Some(msg.take_description())
- } else {
- None
- },
- links: Vec::new(),
- max_users: msg.get_max_users(),
- name: msg.take_name(),
- parent: if msg.has_parent() {
- Some(msg.get_parent())
- } else {
- None
- },
- position: msg.get_position(),
- }
- }
-
- pub fn parse_channel_state(&mut self, mut msg: msgs::ChannelState) {
- if msg.has_description() {
- self.description = Some(msg.take_description());
- }
- self.links = msg.take_links();
- if msg.has_max_users() {
- self.max_users = msg.get_max_users();
- }
- if msg.has_name() {
- self.name = msg.take_name();
- }
- if msg.has_parent() {
- self.parent = Some(msg.get_parent());
- }
- if msg.has_position() {
- self.position = msg.get_position();
- }
- }
-
- pub fn name(&self) -> &str {
- &self.name
- }
-}
-
-#[derive(Clone, Debug)]
-pub struct User {
- channel: u32,
- comment: Option<String>,
- hash: Option<String>,
- name: String,
- priority_speaker: bool,
- recording: bool,
-
- suppress: bool, // by me
- self_mute: bool, // by self
- self_deaf: bool, // by self
- mute: bool, // by admin
- deaf: bool, // by admin
-}
-
-impl User {
- pub fn new(mut msg: msgs::UserState) -> Self {
- Self {
- channel: msg.get_channel_id(),
- comment: if msg.has_comment() {
- Some(msg.take_comment())
- } else {
- None
- },
- hash: if msg.has_hash() {
- Some(msg.take_hash())
- } else {
- None
- },
- name: msg.take_name(),
- priority_speaker: msg.has_priority_speaker() && msg.get_priority_speaker(),
- recording: msg.has_recording() && msg.get_recording(),
- suppress: msg.has_suppress() && msg.get_suppress(),
- self_mute: msg.has_self_mute() && msg.get_self_mute(),
- self_deaf: msg.has_self_deaf() && msg.get_self_deaf(),
- mute: msg.has_mute() && msg.get_mute(),
- deaf: msg.has_deaf() && msg.get_deaf(),
- }
- }
-
- pub fn parse_user_state(&mut self, mut msg: msgs::UserState) {
- if msg.has_channel_id() {
- self.channel = msg.get_channel_id();
- }
- if msg.has_comment() {
- self.comment = Some(msg.take_comment());
- }
- if msg.has_hash() {
- self.hash = Some(msg.take_hash());
- }
- if msg.has_name() {
- self.name = msg.take_name();
- }
- if msg.has_priority_speaker() {
- self.priority_speaker = msg.get_priority_speaker();
- }
- if msg.has_recording() {
- self.recording = msg.get_recording();
- }
- if msg.has_suppress() {
- self.suppress = msg.get_suppress();
- }
- if msg.has_self_mute() {
- self.self_mute = msg.get_self_mute();
- }
- if msg.has_self_deaf() {
- self.self_deaf = msg.get_self_deaf();
- }
- if msg.has_mute() {
- self.mute = msg.get_mute();
- }
- if msg.has_deaf() {
- self.deaf = msg.get_deaf();
- }
- }
-
- pub fn name(&self) -> &str {
- &self.name
- }
-
- pub fn channel(&self) -> u32 {
- self.channel
- }
-}