aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2020-10-13 17:05:22 +0200
committerGustav Sörnäs <gustav@sornas.net>2020-10-13 17:05:22 +0200
commit503f6c90395682bf5d7fd3fb8a79bfcfc3c2f329 (patch)
tree41dd58465f1afbeb583262eb383fdcb50c256337 /mumd/src
parentcd353c875c3c8bcae4f4ece597468728341362c9 (diff)
downloadmum-503f6c90395682bf5d7fd3fb8a79bfcfc3c2f329.tar.gz
wait for complete state before sending commands
Diffstat (limited to 'mumd/src')
-rw-r--r--mumd/src/audio.rs2
-rw-r--r--mumd/src/command.rs19
-rw-r--r--mumd/src/main.rs19
-rw-r--r--mumd/src/network/tcp.rs7
-rw-r--r--mumd/src/state.rs28
5 files changed, 56 insertions, 19 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index 3c24f1c..e13845e 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -27,7 +27,7 @@ pub struct Audio {
pub input_config: StreamConfig,
pub input_stream: Stream,
pub input_buffer: Arc<Mutex<VecDeque<f32>>>,
- input_channel_receiver: Option<Receiver<VoicePacketPayload>>,
+ input_channel_receiver: Option<Receiver<VoicePacketPayload>>, //TODO unbounded? mbe ring buffer and drop the first packet
client_streams: Arc<Mutex<HashMap<u32, ClientStream>>>, //TODO move to user state
}
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index 322bde8..1f7a781 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -1,3 +1,9 @@
+use crate::state::State;
+
+use std::sync::{Arc, Mutex};
+use tokio::sync::mpsc;
+
+#[derive(Debug)]
pub enum Command {
ChannelJoin {
channel_id: u32,
@@ -12,3 +18,16 @@ pub enum Command {
ServerDisconnect,
Status,
}
+
+pub async fn handle(
+ state: Arc<Mutex<State>>,
+ mut command_receiver: mpsc::UnboundedReceiver<Command>,
+) {
+ // wait until we can send packages
+ let mut initialized_receiver = state.lock().unwrap().initialized_receiver();
+ while matches!(initialized_receiver.recv().await, Some(false)) {}
+
+ while let Some(command) = command_receiver.recv().await {
+ state.lock().unwrap().handle_command(command).await;
+ }
+}
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index a08db44..6d8d9bf 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -2,7 +2,9 @@ mod audio;
mod network;
mod command;
mod state;
+
use crate::state::State;
+use crate::command::Command;
use argparse::ArgumentParser;
use argparse::Store;
@@ -15,13 +17,13 @@ use mumble_protocol::control::ControlPacket;
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::voice::Serverbound;
use std::net::ToSocketAddrs;
-use std::sync::Arc;
-use std::sync::Mutex;
+use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// setup logger
+ //TODO? add newline before message if it contains newlines
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
@@ -74,9 +76,12 @@ async fn main() {
// Oneshot channel for setting UDP CryptState from control task
// For simplicity we don't deal with re-syncing, real applications would have to.
let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::<ClientCryptState>();
- let (packet_sender, packet_receiver) = mpsc::channel::<ControlPacket<Serverbound>>(10);
+ let (packet_sender, packet_receiver) = mpsc::unbounded_channel::<ControlPacket<Serverbound>>();
+ let (command_sender, command_receiver) = mpsc::unbounded_channel::<Command>();
- let state = Arc::new(Mutex::new(State::new(packet_sender, username)));
+ command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap();
+ let state = State::new(packet_sender, command_sender, username);
+ let state = Arc::new(Mutex::new(state));
// Run it
join!(
@@ -89,9 +94,13 @@ async fn main() {
packet_receiver,
),
network::udp::handle(
- state,
+ Arc::clone(&state),
server_addr,
crypt_state_receiver,
),
+ command::handle(
+ state,
+ command_receiver,
+ ),
);
}
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index fa4c4b6..72a2840 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -30,7 +30,7 @@ pub async fn handle(
server_host: String,
accept_invalid_cert: bool,
crypt_state_sender: oneshot::Sender<ClientCryptState>,
- packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>,
+ packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>,
) {
let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await;
let sink = Arc::new(Mutex::new(sink));
@@ -94,7 +94,7 @@ async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) {
}
async fn send_packets(sink: Arc<Mutex<TcpSender>>,
- mut packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>) {
+ mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>) {
while let Some(packet) = packet_receiver.recv().await {
sink.lock().unwrap().send(packet).await.unwrap();
@@ -153,8 +153,7 @@ async fn listen(
for (_, channel) in server.channels() {
info!("Found channel {}", channel.name());
}
- //TODO start listening for packets to send here
- state.handle_command(Command::ChannelJoin{channel_id: 1}).await;
+ state.initialized();
}
ControlPacket::Reject(msg) => {
warn!("Login rejected: {:?}", msg);
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index 89ae4cd..74b2037 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -6,25 +6,33 @@ use mumble_protocol::control::ControlPacket;
use mumble_protocol::voice::Serverbound;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
-use tokio::sync::mpsc;
+use tokio::sync::{mpsc, watch};
pub struct State {
server: Server,
audio: Audio,
- packet_sender: mpsc::Sender<ControlPacket<Serverbound>>,
+ packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
+ command_sender: mpsc::UnboundedSender<Command>,
+
+ initialized_watcher: (watch::Sender<bool>, watch::Receiver<bool>),
username: String,
session_id: Option<u32>,
}
impl State {
- pub fn new(packet_sender: mpsc::Sender<ControlPacket<Serverbound>>,
- username: String) -> Self {
+ pub fn new(
+ packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
+ command_sender: mpsc::UnboundedSender<Command>,
+ username: String,
+ ) -> Self {
Self {
server: Server::new(),
audio: Audio::new(),
packet_sender,
+ command_sender,
+ initialized_watcher: watch::channel(false),
username,
session_id: None,
}
@@ -41,7 +49,7 @@ impl State {
let mut msg = msgs::UserState::new();
msg.set_session(self.session_id.unwrap());
msg.set_channel_id(channel_id);
- self.packet_sender.send(msg.into()).await.unwrap();
+ self.packet_sender.send(msg.into()).unwrap();
}
_ => {}
}
@@ -76,12 +84,15 @@ impl State {
self.server.parse_user_state(msg);
}
+ pub fn initialized(&self) {
+ self.initialized_watcher.0.broadcast(true).unwrap();
+ }
+
pub fn audio(&self) -> &Audio { &self.audio }
pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio }
-
- pub fn username(&self) -> &str { &self.username }
-
+ pub fn initialized_receiver(&self) -> watch::Receiver<bool> { self.initialized_watcher.1.clone() }
pub fn server_mut(&mut self) -> &mut Server { &mut self.server }
+ pub fn username(&self) -> &str { &self.username }
}
pub struct Server {
@@ -147,7 +158,6 @@ impl Server {
}
}
-
pub struct Channel {
description: Option<String>,
links: Vec<u32>,