From b583f6dbe521e01e879e16605026997dfa10c3d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 02:31:09 +0200 Subject: join different channels Co-authored-by: Eskil Queseth --- mumd/src/main.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) (limited to 'mumd/src/main.rs') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 2a0fcbd..a08db44 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -2,21 +2,22 @@ mod audio; mod network; mod command; mod state; -use crate::audio::Audio; -use crate::state::Server; +use crate::state::State; use argparse::ArgumentParser; use argparse::Store; use argparse::StoreTrue; use colored::*; -use cpal::traits::StreamTrait; use futures::channel::oneshot; use futures::join; use log::*; +use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; +use mumble_protocol::voice::Serverbound; use std::net::ToSocketAddrs; use std::sync::Arc; use std::sync::Mutex; +use tokio::sync::mpsc; #[tokio::main] async fn main() { @@ -73,28 +74,24 @@ async fn main() { // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::(); + let (packet_sender, packet_receiver) = mpsc::channel::>(10); - let audio = Audio::new(); - audio.output_stream.play().unwrap(); - let audio = Arc::new(Mutex::new(audio)); - - let server_state = Arc::new(Mutex::new(Server::new())); + let state = Arc::new(Mutex::new(State::new(packet_sender, username))); // Run it join!( network::tcp::handle( - server_state, + Arc::clone(&state), server_addr, server_host, - username, accept_invalid_cert, crypt_state_sender, - Arc::clone(&audio), + packet_receiver, ), network::udp::handle( + state, server_addr, crypt_state_receiver, - audio, ), ); } -- cgit v1.2.1 From 503f6c90395682bf5d7fd3fb8a79bfcfc3c2f329 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 17:05:22 +0200 Subject: wait for complete state before sending commands --- mumd/src/main.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) (limited to 'mumd/src/main.rs') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index a08db44..6d8d9bf 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -2,7 +2,9 @@ mod audio; mod network; mod command; mod state; + use crate::state::State; +use crate::command::Command; use argparse::ArgumentParser; use argparse::Store; @@ -15,13 +17,13 @@ use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; use std::net::ToSocketAddrs; -use std::sync::Arc; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; #[tokio::main] async fn main() { // setup logger + //TODO? add newline before message if it contains newlines fern::Dispatch::new() .format(|out, message, record| { out.finish(format_args!( @@ -74,9 +76,12 @@ async fn main() { // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::(); - let (packet_sender, packet_receiver) = mpsc::channel::>(10); + let (packet_sender, packet_receiver) = mpsc::unbounded_channel::>(); + let (command_sender, command_receiver) = mpsc::unbounded_channel::(); - let state = Arc::new(Mutex::new(State::new(packet_sender, username))); + command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); + let state = State::new(packet_sender, command_sender, username); + let state = Arc::new(Mutex::new(state)); // Run it join!( @@ -89,9 +94,13 @@ async fn main() { packet_receiver, ), network::udp::handle( - state, + Arc::clone(&state), server_addr, crypt_state_receiver, ), + command::handle( + state, + command_receiver, + ), ); } -- cgit v1.2.1 From 3d8009a0201fba0bdc464fae0797d3bb3bcf69f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 01:48:07 +0200 Subject: wip handle more commands (panics) --- mumd/src/main.rs | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) (limited to 'mumd/src/main.rs') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 6d8d9bf..93bb0d0 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -3,8 +3,9 @@ mod network; mod command; mod state; +use crate::network::ConnectionInfo; +use crate::command::{Command, CommandResponse}; use crate::state::State; -use crate::command::Command; use argparse::ArgumentParser; use argparse::Store; @@ -16,9 +17,8 @@ use log::*; use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; -use std::net::ToSocketAddrs; use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; #[tokio::main] async fn main() { @@ -67,40 +67,49 @@ async fn main() { ); ap.parse_args_or_exit(); } - let server_addr = (server_host.as_ref(), server_port) - .to_socket_addrs() - .expect("Failed to parse server address") - .next() - .expect("Failed to resolve server address"); // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::(); let (packet_sender, packet_receiver) = mpsc::unbounded_channel::>(); let (command_sender, command_receiver) = mpsc::unbounded_channel::(); + let (command_response_sender, command_response_receiver) = mpsc::unbounded_channel::, ()>>(); + let (connection_info_sender, connection_info_receiver) = watch::channel::>(None); + + command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert}); command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); - let state = State::new(packet_sender, command_sender, username); + let state = State::new(packet_sender, command_sender.clone(), connection_info_sender, username); let state = Arc::new(Mutex::new(state)); // Run it join!( network::tcp::handle( Arc::clone(&state), - server_addr, - server_host, - accept_invalid_cert, + connection_info_receiver.clone(), crypt_state_sender, packet_receiver, ), network::udp::handle( Arc::clone(&state), - server_addr, + connection_info_receiver.clone(), crypt_state_receiver, ), command::handle( state, command_receiver, + command_response_sender, + ), + send_commands( + command_sender, + command_response_receiver, ), ); } + +async fn send_commands( + command_sender: mpsc::UnboundedSender, + command_response_receiver: mpsc::UnboundedReceiver, ()>>, +) { + +} -- cgit v1.2.1 From b5528c2198d54028ef03d35d5aa4d7fdde6af8f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 02:33:50 +0200 Subject: some changes --- mumd/src/main.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'mumd/src/main.rs') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 93bb0d0..24c2567 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -76,9 +76,10 @@ async fn main() { let (command_response_sender, command_response_receiver) = mpsc::unbounded_channel::, ()>>(); let (connection_info_sender, connection_info_receiver) = watch::channel::>(None); + command_sender.send(Command::ChannelList).unwrap(); command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert}); - command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); + command_sender.send(Command::ChannelList).unwrap(); let state = State::new(packet_sender, command_sender.clone(), connection_info_sender, username); let state = Arc::new(Mutex::new(state)); @@ -100,16 +101,16 @@ async fn main() { command_receiver, command_response_sender, ), - send_commands( - command_sender, + receive_command_responses( command_response_receiver, ), ); } -async fn send_commands( - command_sender: mpsc::UnboundedSender, - command_response_receiver: mpsc::UnboundedReceiver, ()>>, +async fn receive_command_responses( + mut command_response_receiver: mpsc::UnboundedReceiver, ()>>, ) { - + while let Some(command_response) = command_response_receiver.recv().await { + debug!("{:#?}", command_response); + } } -- cgit v1.2.1 From dcb71982eab550535298b2d879a3a83820a0798a Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 14:49:56 +0200 Subject: add newline when logging if logged message contains a newline --- mumd/src/main.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'mumd/src/main.rs') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 24c2567..a2665ba 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -23,11 +23,11 @@ use tokio::sync::{mpsc, watch}; #[tokio::main] async fn main() { // setup logger - //TODO? add newline before message if it contains newlines fern::Dispatch::new() .format(|out, message, record| { + let message = message.to_string(); out.finish(format_args!( - "{} {}:{} {}", + "{} {}:{}{}{}", //TODO runtime flag that disables color match record.level() { Level::Error => "ERROR".red(), @@ -38,6 +38,7 @@ async fn main() { }, record.file().unwrap(), record.line().unwrap(), + if message.chars().any(|e| e == '\n') { "\n" } else { " " }, message )) }) -- cgit v1.2.1 From 7fb14d648aacd398f720f60236020dab6bf9fd35 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 16:54:27 +0200 Subject: add support for disconnect command --- mumd/src/main.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) (limited to 'mumd/src/main.rs') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index a2665ba..c923857 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -11,14 +11,18 @@ use argparse::ArgumentParser; use argparse::Store; use argparse::StoreTrue; use colored::*; -use futures::channel::oneshot; -use futures::join; +use tokio::sync::oneshot; +use futures::{join, select}; use log::*; use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; +use std::thread; +use std::time::Duration; +use tokio::stream::StreamExt; +use futures::FutureExt; #[tokio::main] async fn main() { @@ -79,7 +83,7 @@ async fn main() { command_sender.send(Command::ChannelList).unwrap(); command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert}); - command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); + //command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); command_sender.send(Command::ChannelList).unwrap(); let state = State::new(packet_sender, command_sender.clone(), connection_info_sender, username); let state = Arc::new(Mutex::new(state)); @@ -102,16 +106,28 @@ async fn main() { command_receiver, command_response_sender, ), + send_commands( + command_sender + ), receive_command_responses( command_response_receiver, ), ); } +async fn send_commands(command_sender: mpsc::UnboundedSender) { + tokio::time::delay_for(Duration::from_secs(5)).await; + command_sender.send(Command::ServerDisconnect); + + debug!("Finished sending commands"); +} + async fn receive_command_responses( mut command_response_receiver: mpsc::UnboundedReceiver, ()>>, ) { while let Some(command_response) = command_response_receiver.recv().await { - debug!("{:#?}", command_response); + debug!("{:?}", command_response); } + + debug!("Finished receiving commands"); } -- cgit v1.2.1 From e4406676a28f2dfb756f8f9e38a4242166f19c0e Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 17:02:49 +0200 Subject: resolve some compiler warnings --- mumd/src/main.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'mumd/src/main.rs') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index c923857..812e7a1 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -12,17 +12,14 @@ use argparse::Store; use argparse::StoreTrue; use colored::*; use tokio::sync::oneshot; -use futures::{join, select}; +use futures::join; use log::*; use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; -use std::thread; use std::time::Duration; -use tokio::stream::StreamExt; -use futures::FutureExt; #[tokio::main] async fn main() { @@ -85,7 +82,7 @@ async fn main() { command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert}); //command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); command_sender.send(Command::ChannelList).unwrap(); - let state = State::new(packet_sender, command_sender.clone(), connection_info_sender, username); + let state = State::new(packet_sender, command_sender.clone(), connection_info_sender); let state = Arc::new(Mutex::new(state)); // Run it -- cgit v1.2.1 From ab0cdc240c65fdc6b764ed17f6611786d449acc3 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 17:45:04 +0200 Subject: add support for reconnecting to server --- mumd/src/main.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) (limited to 'mumd/src/main.rs') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 812e7a1..6d435fa 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -11,7 +11,6 @@ use argparse::ArgumentParser; use argparse::Store; use argparse::StoreTrue; use colored::*; -use tokio::sync::oneshot; use futures::join; use log::*; use mumble_protocol::control::ControlPacket; @@ -72,16 +71,12 @@ async fn main() { // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. - let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::(); + let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::(1); // crypt state should always be consumed before sending a new one let (packet_sender, packet_receiver) = mpsc::unbounded_channel::>(); let (command_sender, command_receiver) = mpsc::unbounded_channel::(); let (command_response_sender, command_response_receiver) = mpsc::unbounded_channel::, ()>>(); let (connection_info_sender, connection_info_receiver) = watch::channel::>(None); - command_sender.send(Command::ChannelList).unwrap(); - command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert}); - //command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); - command_sender.send(Command::ChannelList).unwrap(); let state = State::new(packet_sender, command_sender.clone(), connection_info_sender); let state = Arc::new(Mutex::new(state)); @@ -104,7 +99,8 @@ async fn main() { command_response_sender, ), send_commands( - command_sender + command_sender, + Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert} ), receive_command_responses( command_response_receiver, @@ -112,8 +108,13 @@ async fn main() { ); } -async fn send_commands(command_sender: mpsc::UnboundedSender) { - tokio::time::delay_for(Duration::from_secs(5)).await; +async fn send_commands(command_sender: mpsc::UnboundedSender, connect_command: Command) { + command_sender.send(connect_command.clone()); + tokio::time::delay_for(Duration::from_secs(2)).await; + command_sender.send(Command::ServerDisconnect); + tokio::time::delay_for(Duration::from_secs(2)).await; + command_sender.send(connect_command.clone()); + tokio::time::delay_for(Duration::from_secs(2)).await; command_sender.send(Command::ServerDisconnect); debug!("Finished sending commands"); -- cgit v1.2.1 From 8f32d34f1cf31cfd10d07e623842dd3f7fc86e8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 19:29:34 +0200 Subject: cargo fmt --- mumd/src/main.rs | 48 +++++++++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 19 deletions(-) (limited to 'mumd/src/main.rs') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 6d435fa..797b71f 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -1,10 +1,10 @@ mod audio; -mod network; mod command; +mod network; mod state; -use crate::network::ConnectionInfo; use crate::command::{Command, CommandResponse}; +use crate::network::ConnectionInfo; use crate::state::State; use argparse::ArgumentParser; @@ -17,8 +17,8 @@ use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, watch}; use std::time::Duration; +use tokio::sync::{mpsc, watch}; #[tokio::main] async fn main() { @@ -31,20 +31,25 @@ async fn main() { //TODO runtime flag that disables color match record.level() { Level::Error => "ERROR".red(), - Level::Warn => "WARN ".yellow(), - Level::Info => "INFO ".normal(), + Level::Warn => "WARN ".yellow(), + Level::Info => "INFO ".normal(), Level::Debug => "DEBUG".green(), Level::Trace => "TRACE".normal(), }, record.file().unwrap(), record.line().unwrap(), - if message.chars().any(|e| e == '\n') { "\n" } else { " " }, + if message.chars().any(|e| e == '\n') { + "\n" + } else { + " " + }, message )) }) .level(log::LevelFilter::Debug) .chain(std::io::stderr()) - .apply().unwrap(); + .apply() + .unwrap(); // Handle command line arguments let mut server_host = "".to_string(); @@ -74,10 +79,16 @@ async fn main() { let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::(1); // crypt state should always be consumed before sending a new one let (packet_sender, packet_receiver) = mpsc::unbounded_channel::>(); let (command_sender, command_receiver) = mpsc::unbounded_channel::(); - let (command_response_sender, command_response_receiver) = mpsc::unbounded_channel::, ()>>(); - let (connection_info_sender, connection_info_receiver) = watch::channel::>(None); + let (command_response_sender, command_response_receiver) = + mpsc::unbounded_channel::, ()>>(); + let (connection_info_sender, connection_info_receiver) = + watch::channel::>(None); - let state = State::new(packet_sender, command_sender.clone(), connection_info_sender); + let state = State::new( + packet_sender, + command_sender.clone(), + connection_info_sender, + ); let state = Arc::new(Mutex::new(state)); // Run it @@ -93,18 +104,17 @@ async fn main() { connection_info_receiver.clone(), crypt_state_receiver, ), - command::handle( - state, - command_receiver, - command_response_sender, - ), + command::handle(state, command_receiver, command_response_sender,), send_commands( command_sender, - Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert} - ), - receive_command_responses( - command_response_receiver, + Command::ServerConnect { + host: server_host, + port: server_port, + username: username.clone(), + accept_invalid_cert + } ), + receive_command_responses(command_response_receiver,), ); } -- cgit v1.2.1 From afd537e085ddf2c92fb1f1879a72d290010fa570 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 19:42:28 +0200 Subject: cargo clippy --- mumd/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'mumd/src/main.rs') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 797b71f..f837a52 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -119,13 +119,13 @@ async fn main() { } async fn send_commands(command_sender: mpsc::UnboundedSender, connect_command: Command) { - command_sender.send(connect_command.clone()); + command_sender.send(connect_command.clone()).unwrap(); tokio::time::delay_for(Duration::from_secs(2)).await; - command_sender.send(Command::ServerDisconnect); + command_sender.send(Command::ServerDisconnect).unwrap(); tokio::time::delay_for(Duration::from_secs(2)).await; - command_sender.send(connect_command.clone()); + command_sender.send(connect_command.clone()).unwrap(); tokio::time::delay_for(Duration::from_secs(2)).await; - command_sender.send(Command::ServerDisconnect); + command_sender.send(Command::ServerDisconnect).unwrap(); debug!("Finished sending commands"); } -- cgit v1.2.1