diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2020-10-14 01:48:07 +0200 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2020-10-14 01:48:07 +0200 |
| commit | 3d8009a0201fba0bdc464fae0797d3bb3bcf69f4 (patch) | |
| tree | c831804fa1e4e20d1152b4051f276feb67ed0881 /mumd/src | |
| parent | 50f5f273426d805025a9336398862529b6bb9b60 (diff) | |
| download | mum-3d8009a0201fba0bdc464fae0797d3bb3bcf69f4.tar.gz | |
wip handle more commands (panics)
Diffstat (limited to 'mumd/src')
| -rw-r--r-- | mumd/src/command.rs | 20 | ||||
| -rw-r--r-- | mumd/src/main.rs | 35 | ||||
| -rw-r--r-- | mumd/src/network/mod.rs | 23 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 29 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 15 | ||||
| -rw-r--r-- | mumd/src/state.rs | 64 |
6 files changed, 142 insertions, 44 deletions
diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 1f7a781..c3b72bf 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,5 +1,6 @@ -use crate::state::State; +use crate::state::{Channel, Server, State}; +use std::collections::HashMap; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; @@ -19,14 +20,23 @@ pub enum Command { Status, } +#[derive(Debug)] +pub enum CommandResponse { + ChannelList { + channels: HashMap<u32, Channel>, + }, + Status { + username: String, + server_state: Server, + } +} + pub async fn handle( state: Arc<Mutex<State>>, mut command_receiver: mpsc::UnboundedReceiver<Command>, + command_response_sender: mpsc::UnboundedSender<Result<Option<CommandResponse>, ()>>, ) { - // wait until we can send packages - let mut initialized_receiver = state.lock().unwrap().initialized_receiver(); - while matches!(initialized_receiver.recv().await, Some(false)) {} - + //TODO err if not connected while let Some(command) = command_receiver.recv().await { state.lock().unwrap().handle_command(command).await; } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 6d8d9bf..93bb0d0 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -3,8 +3,9 @@ mod network; mod command; mod state; +use crate::network::ConnectionInfo; +use crate::command::{Command, CommandResponse}; use crate::state::State; -use crate::command::Command; use argparse::ArgumentParser; use argparse::Store; @@ -16,9 +17,8 @@ use log::*; use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; -use std::net::ToSocketAddrs; use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; #[tokio::main] async fn main() { @@ -67,40 +67,49 @@ async fn main() { ); ap.parse_args_or_exit(); } - let server_addr = (server_host.as_ref(), server_port) - .to_socket_addrs() - .expect("Failed to parse server address") - .next() - .expect("Failed to resolve server address"); // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::<ClientCryptState>(); let (packet_sender, packet_receiver) = mpsc::unbounded_channel::<ControlPacket<Serverbound>>(); let (command_sender, command_receiver) = mpsc::unbounded_channel::<Command>(); + let (command_response_sender, command_response_receiver) = mpsc::unbounded_channel::<Result<Option<CommandResponse>, ()>>(); + let (connection_info_sender, connection_info_receiver) = watch::channel::<Option<ConnectionInfo>>(None); + + command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert}); command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); - let state = State::new(packet_sender, command_sender, username); + let state = State::new(packet_sender, command_sender.clone(), connection_info_sender, username); let state = Arc::new(Mutex::new(state)); // Run it join!( network::tcp::handle( Arc::clone(&state), - server_addr, - server_host, - accept_invalid_cert, + connection_info_receiver.clone(), crypt_state_sender, packet_receiver, ), network::udp::handle( Arc::clone(&state), - server_addr, + connection_info_receiver.clone(), crypt_state_receiver, ), command::handle( state, command_receiver, + command_response_sender, + ), + send_commands( + command_sender, + command_response_receiver, ), ); } + +async fn send_commands( + command_sender: mpsc::UnboundedSender<Command>, + command_response_receiver: mpsc::UnboundedReceiver<Result<Option<CommandResponse>, ()>>, +) { + +} diff --git a/mumd/src/network/mod.rs b/mumd/src/network/mod.rs index f7a6a76..777faad 100644 --- a/mumd/src/network/mod.rs +++ b/mumd/src/network/mod.rs @@ -1,2 +1,25 @@ pub mod tcp; pub mod udp; + +use std::net::SocketAddr; + +#[derive(Clone, Debug)] +pub struct ConnectionInfo { + socket_addr: SocketAddr, + hostname: String, + accept_invalid_cert: bool, +} + +impl ConnectionInfo { + pub fn new( + socket_addr: SocketAddr, + hostname: String, + accept_invalid_cert: bool, + ) -> Self { + Self { + socket_addr, + hostname, + accept_invalid_cert, + } + } +} diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 6f60b63..9fb5ae4 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,4 +1,5 @@ -use crate::state::State; +use crate::network::ConnectionInfo; +use crate::state::{State, StatePhase}; use log::*; use futures::channel::oneshot; @@ -11,7 +12,7 @@ use std::convert::{Into, TryInto}; use std::net::{SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -25,16 +26,24 @@ type TcpReceiver = pub async fn handle( state: Arc<Mutex<State>>, - server_addr: SocketAddr, - server_host: String, - accept_invalid_cert: bool, + mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, crypt_state_sender: oneshot::Sender<ClientCryptState>, packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, ) { - let (mut sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; + let (mut sink, stream) = connect(connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert) + .await; // Handshake (omitting `Version` message for brevity) - authenticate(&mut sink, state.lock().unwrap().username().to_string()).await; + authenticate(&mut sink, state.lock().unwrap().username().unwrap().to_string()).await; info!("Logging in..."); @@ -158,10 +167,10 @@ async fn listen( let mut state = state.lock().unwrap(); let session = msg.get_session(); state.audio_mut().add_client(msg.get_session()); //TODO - if *state.initialized_receiver().borrow() { - state.server_mut().parse_user_state(msg); - } else { + if *state.phase_receiver().borrow() == StatePhase::Connecting { state.parse_initial_user_state(msg); + } else { + state.server_mut().parse_user_state(msg); } let server = state.server_mut(); let user = server.users().get(&session).unwrap(); diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 5f76501..cf0305b 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,3 +1,4 @@ +use crate::network::ConnectionInfo; use crate::state::State; use log::*; @@ -11,6 +12,7 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; +use tokio::sync::watch; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>; @@ -38,20 +40,27 @@ pub async fn connect( pub async fn handle( state: Arc<Mutex<State>>, - server_addr: SocketAddr, + mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, crypt_state: oneshot::Receiver<ClientCryptState>, ) { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; let (mut sink, source) = connect(crypt_state).await; // Note: A normal application would also send periodic Ping packets, and its own audio // via UDP. We instead trick the server into accepting us by sending it one // dummy voice packet. - send_ping(&mut sink, server_addr).await; + send_ping(&mut sink, connection_info.socket_addr).await; let sink = Arc::new(Mutex::new(sink)); join!( listen(Arc::clone(&state), source), - send_voice(state, sink, server_addr) + send_voice(state, sink, connection_info.socket_addr), ); } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index cd266d7..8689a9a 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -1,23 +1,33 @@ use log::*; use crate::audio::Audio; -use crate::command::Command; +use crate::command::{Command, CommandResponse}; +use crate::network::ConnectionInfo; use mumble_protocol::control::msgs; use mumble_protocol::control::ControlPacket; use mumble_protocol::voice::Serverbound; use std::collections::HashMap; use std::collections::hash_map::Entry; +use std::net::ToSocketAddrs; use tokio::sync::{mpsc, watch}; +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum StatePhase { + Disconnected, + Connecting, + Connected, +} + pub struct State { server: Server, audio: Audio, packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, command_sender: mpsc::UnboundedSender<Command>, + connection_info_sender: watch::Sender<Option<ConnectionInfo>>, - initialized_watcher: (watch::Sender<bool>, watch::Receiver<bool>), + phase_watcher: (watch::Sender<StatePhase>, watch::Receiver<StatePhase>), - username: String, + username: Option<String>, session_id: Option<u32>, } @@ -25,6 +35,7 @@ impl State { pub fn new( packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, command_sender: mpsc::UnboundedSender<Command>, + connection_info_sender: watch::Sender<Option<ConnectionInfo>>, username: String, ) -> Self { Self { @@ -32,26 +43,50 @@ impl State { audio: Audio::new(), packet_sender, command_sender, - initialized_watcher: watch::channel(false), - username, + connection_info_sender, + phase_watcher: watch::channel(StatePhase::Disconnected), + username: None, session_id: None, } } - //TODO result - pub async fn handle_command(&mut self, command: Command) { + pub async fn handle_command(&mut self, command: Command) -> Result<Option<CommandResponse>, ()> { match command { Command::ChannelJoin{channel_id} => { if self.session_id.is_none() { warn!("Tried to join channel but we don't have a session id"); - return; + return Err(()); } let mut msg = msgs::UserState::new(); msg.set_session(self.session_id.unwrap()); msg.set_channel_id(channel_id); self.packet_sender.send(msg.into()).unwrap(); + Ok(None) + } + Command::ChannelList => { + Ok(Some(CommandResponse::ChannelList{channels: self.server.channels.clone()})) + } + Command::ServerConnect{host, port, username, accept_invalid_cert} => { + if !matches!(*self.phase_receiver().borrow(), StatePhase::Disconnected) { + warn!("Tried to connect to a server while already connected"); + return Err(()); + } + self.username = Some(username); + self.phase_watcher.0.broadcast(StatePhase::Connecting).unwrap(); + let socket_addr = (host.as_ref(), port) + .to_socket_addrs() + .expect("Failed to parse server address") + .next() + .expect("Failed to resolve server address"); + self.connection_info_sender.broadcast(Some(ConnectionInfo::new( + socket_addr, + host, + accept_invalid_cert, + ))); + while !matches!(self.phase_receiver().recv().await.unwrap(), StatePhase::Connected) {} + Ok(None) } - _ => {} + _ => { Ok(None) } } } @@ -63,7 +98,7 @@ impl State { if !msg.has_name() { warn!("Missing name in initial user state"); } else { - if msg.get_name() == self.username { + if msg.get_name() == self.username.as_ref().unwrap() { match self.session_id { None => { debug!("Found our session id: {}", msg.get_session()); @@ -85,17 +120,18 @@ impl State { } pub fn initialized(&self) { - self.initialized_watcher.0.broadcast(true).unwrap(); + self.phase_watcher.0.broadcast(StatePhase::Connected).unwrap(); } pub fn audio(&self) -> &Audio { &self.audio } pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio } pub fn packet_sender(&self) -> mpsc::UnboundedSender<ControlPacket<Serverbound>> { self.packet_sender.clone() } - pub fn initialized_receiver(&self) -> watch::Receiver<bool> { self.initialized_watcher.1.clone() } + pub fn phase_receiver(&self) -> watch::Receiver<StatePhase> { self.phase_watcher.1.clone() } pub fn server_mut(&mut self) -> &mut Server { &mut self.server } - pub fn username(&self) -> &str { &self.username } + pub fn username(&self) -> Option<&String> { self.username.as_ref() } } +#[derive(Debug)] pub struct Server { channels: HashMap<u32, Channel>, users: HashMap<u32, User>, @@ -159,6 +195,7 @@ impl Server { } } +#[derive(Clone, Debug)] pub struct Channel { description: Option<String>, links: Vec<u32>, @@ -212,6 +249,7 @@ impl Channel { } } +#[derive(Debug)] pub struct User { channel: u32, comment: Option<String>, |
