aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2020-10-14 01:48:07 +0200
committerGustav Sörnäs <gustav@sornas.net>2020-10-14 01:48:07 +0200
commit3d8009a0201fba0bdc464fae0797d3bb3bcf69f4 (patch)
treec831804fa1e4e20d1152b4051f276feb67ed0881 /mumd/src
parent50f5f273426d805025a9336398862529b6bb9b60 (diff)
downloadmum-3d8009a0201fba0bdc464fae0797d3bb3bcf69f4.tar.gz
wip handle more commands (panics)
Diffstat (limited to 'mumd/src')
-rw-r--r--mumd/src/command.rs20
-rw-r--r--mumd/src/main.rs35
-rw-r--r--mumd/src/network/mod.rs23
-rw-r--r--mumd/src/network/tcp.rs29
-rw-r--r--mumd/src/network/udp.rs15
-rw-r--r--mumd/src/state.rs64
6 files changed, 142 insertions, 44 deletions
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index 1f7a781..c3b72bf 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -1,5 +1,6 @@
-use crate::state::State;
+use crate::state::{Channel, Server, State};
+use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
@@ -19,14 +20,23 @@ pub enum Command {
Status,
}
+#[derive(Debug)]
+pub enum CommandResponse {
+ ChannelList {
+ channels: HashMap<u32, Channel>,
+ },
+ Status {
+ username: String,
+ server_state: Server,
+ }
+}
+
pub async fn handle(
state: Arc<Mutex<State>>,
mut command_receiver: mpsc::UnboundedReceiver<Command>,
+ command_response_sender: mpsc::UnboundedSender<Result<Option<CommandResponse>, ()>>,
) {
- // wait until we can send packages
- let mut initialized_receiver = state.lock().unwrap().initialized_receiver();
- while matches!(initialized_receiver.recv().await, Some(false)) {}
-
+ //TODO err if not connected
while let Some(command) = command_receiver.recv().await {
state.lock().unwrap().handle_command(command).await;
}
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index 6d8d9bf..93bb0d0 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -3,8 +3,9 @@ mod network;
mod command;
mod state;
+use crate::network::ConnectionInfo;
+use crate::command::{Command, CommandResponse};
use crate::state::State;
-use crate::command::Command;
use argparse::ArgumentParser;
use argparse::Store;
@@ -16,9 +17,8 @@ use log::*;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::voice::Serverbound;
-use std::net::ToSocketAddrs;
use std::sync::{Arc, Mutex};
-use tokio::sync::mpsc;
+use tokio::sync::{mpsc, watch};
#[tokio::main]
async fn main() {
@@ -67,40 +67,49 @@ async fn main() {
);
ap.parse_args_or_exit();
}
- let server_addr = (server_host.as_ref(), server_port)
- .to_socket_addrs()
- .expect("Failed to parse server address")
- .next()
- .expect("Failed to resolve server address");
// Oneshot channel for setting UDP CryptState from control task
// For simplicity we don't deal with re-syncing, real applications would have to.
let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::<ClientCryptState>();
let (packet_sender, packet_receiver) = mpsc::unbounded_channel::<ControlPacket<Serverbound>>();
let (command_sender, command_receiver) = mpsc::unbounded_channel::<Command>();
+ let (command_response_sender, command_response_receiver) = mpsc::unbounded_channel::<Result<Option<CommandResponse>, ()>>();
+ let (connection_info_sender, connection_info_receiver) = watch::channel::<Option<ConnectionInfo>>(None);
+
+ command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert});
command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap();
- let state = State::new(packet_sender, command_sender, username);
+ let state = State::new(packet_sender, command_sender.clone(), connection_info_sender, username);
let state = Arc::new(Mutex::new(state));
// Run it
join!(
network::tcp::handle(
Arc::clone(&state),
- server_addr,
- server_host,
- accept_invalid_cert,
+ connection_info_receiver.clone(),
crypt_state_sender,
packet_receiver,
),
network::udp::handle(
Arc::clone(&state),
- server_addr,
+ connection_info_receiver.clone(),
crypt_state_receiver,
),
command::handle(
state,
command_receiver,
+ command_response_sender,
+ ),
+ send_commands(
+ command_sender,
+ command_response_receiver,
),
);
}
+
+async fn send_commands(
+ command_sender: mpsc::UnboundedSender<Command>,
+ command_response_receiver: mpsc::UnboundedReceiver<Result<Option<CommandResponse>, ()>>,
+) {
+
+}
diff --git a/mumd/src/network/mod.rs b/mumd/src/network/mod.rs
index f7a6a76..777faad 100644
--- a/mumd/src/network/mod.rs
+++ b/mumd/src/network/mod.rs
@@ -1,2 +1,25 @@
pub mod tcp;
pub mod udp;
+
+use std::net::SocketAddr;
+
+#[derive(Clone, Debug)]
+pub struct ConnectionInfo {
+ socket_addr: SocketAddr,
+ hostname: String,
+ accept_invalid_cert: bool,
+}
+
+impl ConnectionInfo {
+ pub fn new(
+ socket_addr: SocketAddr,
+ hostname: String,
+ accept_invalid_cert: bool,
+ ) -> Self {
+ Self {
+ socket_addr,
+ hostname,
+ accept_invalid_cert,
+ }
+ }
+}
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 6f60b63..9fb5ae4 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -1,4 +1,5 @@
-use crate::state::State;
+use crate::network::ConnectionInfo;
+use crate::state::{State, StatePhase};
use log::*;
use futures::channel::oneshot;
@@ -11,7 +12,7 @@ use std::convert::{Into, TryInto};
use std::net::{SocketAddr};
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
-use tokio::sync::mpsc;
+use tokio::sync::{mpsc, watch};
use tokio::time::{self, Duration};
use tokio_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
@@ -25,16 +26,24 @@ type TcpReceiver =
pub async fn handle(
state: Arc<Mutex<State>>,
- server_addr: SocketAddr,
- server_host: String,
- accept_invalid_cert: bool,
+ mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
crypt_state_sender: oneshot::Sender<ClientCryptState>,
packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>,
) {
- let (mut sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await;
+ let connection_info = loop {
+ match connection_info_receiver.recv().await {
+ None => { return; }
+ Some(None) => {}
+ Some(Some(connection_info)) => { break connection_info; }
+ }
+ };
+ let (mut sink, stream) = connect(connection_info.socket_addr,
+ connection_info.hostname,
+ connection_info.accept_invalid_cert)
+ .await;
// Handshake (omitting `Version` message for brevity)
- authenticate(&mut sink, state.lock().unwrap().username().to_string()).await;
+ authenticate(&mut sink, state.lock().unwrap().username().unwrap().to_string()).await;
info!("Logging in...");
@@ -158,10 +167,10 @@ async fn listen(
let mut state = state.lock().unwrap();
let session = msg.get_session();
state.audio_mut().add_client(msg.get_session()); //TODO
- if *state.initialized_receiver().borrow() {
- state.server_mut().parse_user_state(msg);
- } else {
+ if *state.phase_receiver().borrow() == StatePhase::Connecting {
state.parse_initial_user_state(msg);
+ } else {
+ state.server_mut().parse_user_state(msg);
}
let server = state.server_mut();
let user = server.users().get(&session).unwrap();
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 5f76501..cf0305b 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -1,3 +1,4 @@
+use crate::network::ConnectionInfo;
use crate::state::State;
use log::*;
@@ -11,6 +12,7 @@ use mumble_protocol::Serverbound;
use std::net::{Ipv6Addr, SocketAddr};
use std::sync::{Arc, Mutex};
use tokio::net::UdpSocket;
+use tokio::sync::watch;
use tokio_util::udp::UdpFramed;
type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>;
@@ -38,20 +40,27 @@ pub async fn connect(
pub async fn handle(
state: Arc<Mutex<State>>,
- server_addr: SocketAddr,
+ mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
crypt_state: oneshot::Receiver<ClientCryptState>,
) {
+ let connection_info = loop {
+ match connection_info_receiver.recv().await {
+ None => { return; }
+ Some(None) => {}
+ Some(Some(connection_info)) => { break connection_info; }
+ }
+ };
let (mut sink, source) = connect(crypt_state).await;
// Note: A normal application would also send periodic Ping packets, and its own audio
// via UDP. We instead trick the server into accepting us by sending it one
// dummy voice packet.
- send_ping(&mut sink, server_addr).await;
+ send_ping(&mut sink, connection_info.socket_addr).await;
let sink = Arc::new(Mutex::new(sink));
join!(
listen(Arc::clone(&state), source),
- send_voice(state, sink, server_addr)
+ send_voice(state, sink, connection_info.socket_addr),
);
}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index cd266d7..8689a9a 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -1,23 +1,33 @@
use log::*;
use crate::audio::Audio;
-use crate::command::Command;
+use crate::command::{Command, CommandResponse};
+use crate::network::ConnectionInfo;
use mumble_protocol::control::msgs;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::voice::Serverbound;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
+use std::net::ToSocketAddrs;
use tokio::sync::{mpsc, watch};
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum StatePhase {
+ Disconnected,
+ Connecting,
+ Connected,
+}
+
pub struct State {
server: Server,
audio: Audio,
packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
command_sender: mpsc::UnboundedSender<Command>,
+ connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
- initialized_watcher: (watch::Sender<bool>, watch::Receiver<bool>),
+ phase_watcher: (watch::Sender<StatePhase>, watch::Receiver<StatePhase>),
- username: String,
+ username: Option<String>,
session_id: Option<u32>,
}
@@ -25,6 +35,7 @@ impl State {
pub fn new(
packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
command_sender: mpsc::UnboundedSender<Command>,
+ connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
username: String,
) -> Self {
Self {
@@ -32,26 +43,50 @@ impl State {
audio: Audio::new(),
packet_sender,
command_sender,
- initialized_watcher: watch::channel(false),
- username,
+ connection_info_sender,
+ phase_watcher: watch::channel(StatePhase::Disconnected),
+ username: None,
session_id: None,
}
}
- //TODO result
- pub async fn handle_command(&mut self, command: Command) {
+ pub async fn handle_command(&mut self, command: Command) -> Result<Option<CommandResponse>, ()> {
match command {
Command::ChannelJoin{channel_id} => {
if self.session_id.is_none() {
warn!("Tried to join channel but we don't have a session id");
- return;
+ return Err(());
}
let mut msg = msgs::UserState::new();
msg.set_session(self.session_id.unwrap());
msg.set_channel_id(channel_id);
self.packet_sender.send(msg.into()).unwrap();
+ Ok(None)
+ }
+ Command::ChannelList => {
+ Ok(Some(CommandResponse::ChannelList{channels: self.server.channels.clone()}))
+ }
+ Command::ServerConnect{host, port, username, accept_invalid_cert} => {
+ if !matches!(*self.phase_receiver().borrow(), StatePhase::Disconnected) {
+ warn!("Tried to connect to a server while already connected");
+ return Err(());
+ }
+ self.username = Some(username);
+ self.phase_watcher.0.broadcast(StatePhase::Connecting).unwrap();
+ let socket_addr = (host.as_ref(), port)
+ .to_socket_addrs()
+ .expect("Failed to parse server address")
+ .next()
+ .expect("Failed to resolve server address");
+ self.connection_info_sender.broadcast(Some(ConnectionInfo::new(
+ socket_addr,
+ host,
+ accept_invalid_cert,
+ )));
+ while !matches!(self.phase_receiver().recv().await.unwrap(), StatePhase::Connected) {}
+ Ok(None)
}
- _ => {}
+ _ => { Ok(None) }
}
}
@@ -63,7 +98,7 @@ impl State {
if !msg.has_name() {
warn!("Missing name in initial user state");
} else {
- if msg.get_name() == self.username {
+ if msg.get_name() == self.username.as_ref().unwrap() {
match self.session_id {
None => {
debug!("Found our session id: {}", msg.get_session());
@@ -85,17 +120,18 @@ impl State {
}
pub fn initialized(&self) {
- self.initialized_watcher.0.broadcast(true).unwrap();
+ self.phase_watcher.0.broadcast(StatePhase::Connected).unwrap();
}
pub fn audio(&self) -> &Audio { &self.audio }
pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio }
pub fn packet_sender(&self) -> mpsc::UnboundedSender<ControlPacket<Serverbound>> { self.packet_sender.clone() }
- pub fn initialized_receiver(&self) -> watch::Receiver<bool> { self.initialized_watcher.1.clone() }
+ pub fn phase_receiver(&self) -> watch::Receiver<StatePhase> { self.phase_watcher.1.clone() }
pub fn server_mut(&mut self) -> &mut Server { &mut self.server }
- pub fn username(&self) -> &str { &self.username }
+ pub fn username(&self) -> Option<&String> { self.username.as_ref() }
}
+#[derive(Debug)]
pub struct Server {
channels: HashMap<u32, Channel>,
users: HashMap<u32, User>,
@@ -159,6 +195,7 @@ impl Server {
}
}
+#[derive(Clone, Debug)]
pub struct Channel {
description: Option<String>,
links: Vec<u32>,
@@ -212,6 +249,7 @@ impl Channel {
}
}
+#[derive(Debug)]
pub struct User {
channel: u32,
comment: Option<String>,