diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2020-10-15 20:52:27 +0200 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2020-10-15 20:52:27 +0200 |
| commit | de856d5e43ecadcd876bdf03800ecc5421347872 (patch) | |
| tree | c4ab144f6f9d4172d4654015fc73b76e5e13850a /mumd | |
| parent | a40d365aacf118b33c07f3353f277eb96c4536a8 (diff) | |
| download | mum-de856d5e43ecadcd876bdf03800ecc5421347872.tar.gz | |
initial cli
Diffstat (limited to 'mumd')
| -rw-r--r-- | mumd/Cargo.toml | 7 | ||||
| -rw-r--r-- | mumd/src/command.rs | 53 | ||||
| -rw-r--r-- | mumd/src/main.rs | 117 | ||||
| -rw-r--r-- | mumd/src/state.rs | 223 |
4 files changed, 51 insertions, 349 deletions
diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml index 72f9167..9101b43 100644 --- a/mumd/Cargo.toml +++ b/mumd/Cargo.toml @@ -9,18 +9,20 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +mumlib = { path = "../mumlib" } + argparse = "0.2" bytes = "0.5" -colored = "2.0" cpal = { git = "https://github.com/RustAudio/cpal" } -fern = "0.5" futures = "0.3" futures-util = "0.3" +ipc-channel = "0.14" log = "0.4" mumble-protocol = "0.3" native-tls = "0.2" openssl = { version = "0.10", optional = true } opus = "0.2" +serde = { version = "1.0", features = ["derive"] } tokio = { version = "0.2", features = ["full"] } tokio-tls = "0.3" tokio-util = { version = "0.3", features = ["codec", "udp"] } @@ -28,4 +30,3 @@ tokio-util = { version = "0.3", features = ["codec", "udp"] } #clap = "2.33" #compressor = "0.3" #daemonize = "0.4" -#ipc-channel = "0.14" diff --git a/mumd/src/command.rs b/mumd/src/command.rs index b4bd1b7..9adf7d8 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,54 +1,33 @@ -use crate::state::{Channel, Server, State, StatePhase}; +use crate::state::{State, StatePhase}; +use ipc_channel::ipc::IpcSender; use log::*; -use std::collections::HashMap; +use mumlib::command::{Command, CommandResponse}; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; -#[derive(Clone, Debug)] -pub enum Command { - ChannelJoin { - channel_id: u32, - }, - ChannelList, - ServerConnect { - host: String, - port: u16, - username: String, - accept_invalid_cert: bool, //TODO ask when connecting - }, - ServerDisconnect, - Status, -} - -#[derive(Debug)] -pub enum CommandResponse { - ChannelList { - channels: HashMap<u32, Channel>, - }, - Status { - username: Option<String>, - server_state: Server, - }, -} - pub async fn handle( state: Arc<Mutex<State>>, - mut command_receiver: mpsc::UnboundedReceiver<Command>, - command_response_sender: mpsc::UnboundedSender<Result<Option<CommandResponse>, ()>>, + mut command_receiver: mpsc::UnboundedReceiver<(Command, IpcSender<Result<Option<CommandResponse>, ()>>)>, ) { - //TODO err if not connected - while let Some(command) = command_receiver.recv().await { - debug!("Parsing command {:?}", command); + debug!("Begin listening for commands"); + loop { + debug!("Enter loop"); + let command = command_receiver.recv().await.unwrap(); + debug!("Received command {:?}", command.0); let mut state = state.lock().unwrap(); - let (wait_for_connected, command_response) = state.handle_command(command).await; + let (wait_for_connected, command_response) = state.handle_command(command.0).await; if wait_for_connected { let mut watcher = state.phase_receiver(); drop(state); while !matches!(watcher.recv().await.unwrap(), StatePhase::Connected) {} } - command_response_sender.send(command_response).unwrap(); + command.1.send(command_response).unwrap(); } + //TODO err if not connected + //while let Some(command) = command_receiver.recv().await { + // debug!("Parsing command {:?}", command); + //} - debug!("Finished handling commands"); + //debug!("Finished handling commands"); } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index f837a52..8639c35 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -3,96 +3,42 @@ mod command; mod network; mod state; -use crate::command::{Command, CommandResponse}; use crate::network::ConnectionInfo; use crate::state::State; -use argparse::ArgumentParser; -use argparse::Store; -use argparse::StoreTrue; -use colored::*; use futures::join; +use ipc_channel::ipc::{IpcSender, IpcOneShotServer}; use log::*; use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; +use mumlib::command::{Command, CommandResponse}; +use mumlib::setup_logger; +use std::fs; use std::sync::{Arc, Mutex}; -use std::time::Duration; use tokio::sync::{mpsc, watch}; +use tokio::task::spawn_blocking; #[tokio::main] async fn main() { - // setup logger - fern::Dispatch::new() - .format(|out, message, record| { - let message = message.to_string(); - out.finish(format_args!( - "{} {}:{}{}{}", - //TODO runtime flag that disables color - match record.level() { - Level::Error => "ERROR".red(), - Level::Warn => "WARN ".yellow(), - Level::Info => "INFO ".normal(), - Level::Debug => "DEBUG".green(), - Level::Trace => "TRACE".normal(), - }, - record.file().unwrap(), - record.line().unwrap(), - if message.chars().any(|e| e == '\n') { - "\n" - } else { - " " - }, - message - )) - }) - .level(log::LevelFilter::Debug) - .chain(std::io::stderr()) - .apply() - .unwrap(); - - // Handle command line arguments - let mut server_host = "".to_string(); - let mut server_port = 64738u16; - let mut username = "EchoBot".to_string(); - let mut accept_invalid_cert = false; - { - let mut ap = ArgumentParser::new(); - ap.set_description("Run the echo client example"); - ap.refer(&mut server_host) - .add_option(&["--host"], Store, "Hostname of mumble server") - .required(); - ap.refer(&mut server_port) - .add_option(&["--port"], Store, "Port of mumble server"); - ap.refer(&mut username) - .add_option(&["--username"], Store, "User name used to connect"); - ap.refer(&mut accept_invalid_cert).add_option( - &["--accept-invalid-cert"], - StoreTrue, - "Accept invalid TLS certificates", - ); - ap.parse_args_or_exit(); - } + setup_logger(); // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::<ClientCryptState>(1); // crypt state should always be consumed before sending a new one let (packet_sender, packet_receiver) = mpsc::unbounded_channel::<ControlPacket<Serverbound>>(); - let (command_sender, command_receiver) = mpsc::unbounded_channel::<Command>(); - let (command_response_sender, command_response_receiver) = - mpsc::unbounded_channel::<Result<Option<CommandResponse>, ()>>(); + let (command_sender, command_receiver) = mpsc::unbounded_channel::<(Command, IpcSender<Result<Option<CommandResponse>, ()>>)>(); let (connection_info_sender, connection_info_receiver) = watch::channel::<Option<ConnectionInfo>>(None); let state = State::new( packet_sender, - command_sender.clone(), connection_info_sender, ); let state = Arc::new(Mutex::new(state)); // Run it - join!( + let (_, _, _, e) = join!( network::tcp::handle( Arc::clone(&state), connection_info_receiver.clone(), @@ -104,38 +50,29 @@ async fn main() { connection_info_receiver.clone(), crypt_state_receiver, ), - command::handle(state, command_receiver, command_response_sender,), - send_commands( - command_sender, - Command::ServerConnect { - host: server_host, - port: server_port, - username: username.clone(), - accept_invalid_cert - } + command::handle( + state, + command_receiver, ), - receive_command_responses(command_response_receiver,), + spawn_blocking(move || { + receive_oneshot_commands(command_sender); + }), ); + e.unwrap(); } -async fn send_commands(command_sender: mpsc::UnboundedSender<Command>, connect_command: Command) { - command_sender.send(connect_command.clone()).unwrap(); - tokio::time::delay_for(Duration::from_secs(2)).await; - command_sender.send(Command::ServerDisconnect).unwrap(); - tokio::time::delay_for(Duration::from_secs(2)).await; - command_sender.send(connect_command.clone()).unwrap(); - tokio::time::delay_for(Duration::from_secs(2)).await; - command_sender.send(Command::ServerDisconnect).unwrap(); - - debug!("Finished sending commands"); -} - -async fn receive_command_responses( - mut command_response_receiver: mpsc::UnboundedReceiver<Result<Option<CommandResponse>, ()>>, +fn receive_oneshot_commands( + command_sender: mpsc::UnboundedSender<(Command, IpcSender<Result<Option<CommandResponse>, ()>>)>, ) { - while let Some(command_response) = command_response_receiver.recv().await { - debug!("{:?}", command_response); - } + loop { + // create listener + let (server, server_name): (IpcOneShotServer<(Command, IpcSender<Result<Option<CommandResponse>, ()>>)>, String) = IpcOneShotServer::new().unwrap(); + fs::write("/var/tmp/mumd-oneshot", &server_name).unwrap(); + debug!("Listening for command at {}...", server_name); - debug!("Finished receiving commands"); + // receive command and response channel + let (_, conn): (_, (Command, IpcSender<Result<Option<CommandResponse>, ()>>)) = server.accept().unwrap(); + debug!("Sending command {:?} to command handler", conn.0); + command_sender.send(conn).unwrap(); + } } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index b6fe780..0fd814c 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -1,12 +1,12 @@ use crate::audio::Audio; -use crate::command::{Command, CommandResponse}; use crate::network::ConnectionInfo; + use log::*; use mumble_protocol::control::msgs; use mumble_protocol::control::ControlPacket; use mumble_protocol::voice::Serverbound; -use std::collections::hash_map::Entry; -use std::collections::HashMap; +use mumlib::command::{Command, CommandResponse}; +use mumlib::state::Server; use std::net::ToSocketAddrs; use tokio::sync::{mpsc, watch}; @@ -22,7 +22,6 @@ pub struct State { audio: Audio, packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, - command_sender: mpsc::UnboundedSender<Command>, connection_info_sender: watch::Sender<Option<ConnectionInfo>>, phase_watcher: (watch::Sender<StatePhase>, watch::Receiver<StatePhase>), @@ -34,14 +33,12 @@ pub struct State { impl State { pub fn new( packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, - command_sender: mpsc::UnboundedSender<Command>, connection_info_sender: watch::Sender<Option<ConnectionInfo>>, ) -> Self { Self { server: None, audio: Audio::new(), packet_sender, - command_sender, connection_info_sender, phase_watcher: watch::channel(StatePhase::Disconnected), username: None, @@ -74,7 +71,7 @@ impl State { ( false, Ok(Some(CommandResponse::ChannelList { - channels: self.server.as_ref().unwrap().channels.clone(), + channels: self.server.as_ref().unwrap().channels().clone(), })), ) } @@ -190,215 +187,3 @@ impl State { self.username.as_ref() } } - -#[derive(Clone, Debug)] -pub struct Server { - channels: HashMap<u32, Channel>, - users: HashMap<u32, User>, - pub welcome_text: Option<String>, -} - -impl Server { - pub fn new() -> Self { - Self { - channels: HashMap::new(), - users: HashMap::new(), - welcome_text: None, - } - } - - pub fn parse_server_sync(&mut self, mut msg: msgs::ServerSync) { - if msg.has_welcome_text() { - self.welcome_text = Some(msg.take_welcome_text()); - } - } - - pub fn parse_channel_state(&mut self, msg: msgs::ChannelState) { - if !msg.has_channel_id() { - warn!("Can't parse channel state without channel id"); - return; - } - match self.channels.entry(msg.get_channel_id()) { - Entry::Vacant(e) => { - e.insert(Channel::new(msg)); - } - Entry::Occupied(mut e) => e.get_mut().parse_channel_state(msg), - } - } - - pub fn parse_channel_remove(&mut self, msg: msgs::ChannelRemove) { - if !msg.has_channel_id() { - warn!("Can't parse channel remove without channel id"); - return; - } - match self.channels.entry(msg.get_channel_id()) { - Entry::Vacant(_) => { - warn!("Attempted to remove channel that doesn't exist"); - } - Entry::Occupied(e) => { - e.remove(); - } - } - } - - pub fn parse_user_state(&mut self, msg: msgs::UserState) { - if !msg.has_session() { - warn!("Can't parse user state without session"); - return; - } - match self.users.entry(msg.get_session()) { - Entry::Vacant(e) => { - e.insert(User::new(msg)); - } - Entry::Occupied(mut e) => e.get_mut().parse_user_state(msg), - } - } - - pub fn channels(&self) -> &HashMap<u32, Channel> { - &self.channels - } - - pub fn users(&self) -> &HashMap<u32, User> { - &self.users - } -} - -#[derive(Clone, Debug)] -pub struct Channel { - description: Option<String>, - links: Vec<u32>, - max_users: u32, - name: String, - parent: Option<u32>, - position: i32, -} - -impl Channel { - pub fn new(mut msg: msgs::ChannelState) -> Self { - Self { - description: if msg.has_description() { - Some(msg.take_description()) - } else { - None - }, - links: Vec::new(), - max_users: msg.get_max_users(), - name: msg.take_name(), - parent: if msg.has_parent() { - Some(msg.get_parent()) - } else { - None - }, - position: msg.get_position(), - } - } - - pub fn parse_channel_state(&mut self, mut msg: msgs::ChannelState) { - if msg.has_description() { - self.description = Some(msg.take_description()); - } - self.links = msg.take_links(); - if msg.has_max_users() { - self.max_users = msg.get_max_users(); - } - if msg.has_name() { - self.name = msg.take_name(); - } - if msg.has_parent() { - self.parent = Some(msg.get_parent()); - } - if msg.has_position() { - self.position = msg.get_position(); - } - } - - pub fn name(&self) -> &str { - &self.name - } -} - -#[derive(Clone, Debug)] -pub struct User { - channel: u32, - comment: Option<String>, - hash: Option<String>, - name: String, - priority_speaker: bool, - recording: bool, - - suppress: bool, // by me - self_mute: bool, // by self - self_deaf: bool, // by self - mute: bool, // by admin - deaf: bool, // by admin -} - -impl User { - pub fn new(mut msg: msgs::UserState) -> Self { - Self { - channel: msg.get_channel_id(), - comment: if msg.has_comment() { - Some(msg.take_comment()) - } else { - None - }, - hash: if msg.has_hash() { - Some(msg.take_hash()) - } else { - None - }, - name: msg.take_name(), - priority_speaker: msg.has_priority_speaker() && msg.get_priority_speaker(), - recording: msg.has_recording() && msg.get_recording(), - suppress: msg.has_suppress() && msg.get_suppress(), - self_mute: msg.has_self_mute() && msg.get_self_mute(), - self_deaf: msg.has_self_deaf() && msg.get_self_deaf(), - mute: msg.has_mute() && msg.get_mute(), - deaf: msg.has_deaf() && msg.get_deaf(), - } - } - - pub fn parse_user_state(&mut self, mut msg: msgs::UserState) { - if msg.has_channel_id() { - self.channel = msg.get_channel_id(); - } - if msg.has_comment() { - self.comment = Some(msg.take_comment()); - } - if msg.has_hash() { - self.hash = Some(msg.take_hash()); - } - if msg.has_name() { - self.name = msg.take_name(); - } - if msg.has_priority_speaker() { - self.priority_speaker = msg.get_priority_speaker(); - } - if msg.has_recording() { - self.recording = msg.get_recording(); - } - if msg.has_suppress() { - self.suppress = msg.get_suppress(); - } - if msg.has_self_mute() { - self.self_mute = msg.get_self_mute(); - } - if msg.has_self_deaf() { - self.self_deaf = msg.get_self_deaf(); - } - if msg.has_mute() { - self.mute = msg.get_mute(); - } - if msg.has_deaf() { - self.deaf = msg.get_deaf(); - } - } - - pub fn name(&self) -> &str { - &self.name - } - - pub fn channel(&self) -> u32 { - self.channel - } -} |
