diff options
Diffstat (limited to 'mumd')
| -rw-r--r-- | mumd/src/audio.rs | 2 | ||||
| -rw-r--r-- | mumd/src/command.rs | 19 | ||||
| -rw-r--r-- | mumd/src/main.rs | 19 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 7 | ||||
| -rw-r--r-- | mumd/src/state.rs | 28 |
5 files changed, 56 insertions, 19 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 3c24f1c..e13845e 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -27,7 +27,7 @@ pub struct Audio { pub input_config: StreamConfig, pub input_stream: Stream, pub input_buffer: Arc<Mutex<VecDeque<f32>>>, - input_channel_receiver: Option<Receiver<VoicePacketPayload>>, + input_channel_receiver: Option<Receiver<VoicePacketPayload>>, //TODO unbounded? mbe ring buffer and drop the first packet client_streams: Arc<Mutex<HashMap<u32, ClientStream>>>, //TODO move to user state } diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 322bde8..1f7a781 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,3 +1,9 @@ +use crate::state::State; + +use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; + +#[derive(Debug)] pub enum Command { ChannelJoin { channel_id: u32, @@ -12,3 +18,16 @@ pub enum Command { ServerDisconnect, Status, } + +pub async fn handle( + state: Arc<Mutex<State>>, + mut command_receiver: mpsc::UnboundedReceiver<Command>, +) { + // wait until we can send packages + let mut initialized_receiver = state.lock().unwrap().initialized_receiver(); + while matches!(initialized_receiver.recv().await, Some(false)) {} + + while let Some(command) = command_receiver.recv().await { + state.lock().unwrap().handle_command(command).await; + } +} diff --git a/mumd/src/main.rs b/mumd/src/main.rs index a08db44..6d8d9bf 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -2,7 +2,9 @@ mod audio; mod network; mod command; mod state; + use crate::state::State; +use crate::command::Command; use argparse::ArgumentParser; use argparse::Store; @@ -15,13 +17,13 @@ use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; use std::net::ToSocketAddrs; -use std::sync::Arc; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; #[tokio::main] async fn main() { // setup logger + //TODO? add newline before message if it contains newlines fern::Dispatch::new() .format(|out, message, record| { out.finish(format_args!( @@ -74,9 +76,12 @@ async fn main() { // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::<ClientCryptState>(); - let (packet_sender, packet_receiver) = mpsc::channel::<ControlPacket<Serverbound>>(10); + let (packet_sender, packet_receiver) = mpsc::unbounded_channel::<ControlPacket<Serverbound>>(); + let (command_sender, command_receiver) = mpsc::unbounded_channel::<Command>(); - let state = Arc::new(Mutex::new(State::new(packet_sender, username))); + command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); + let state = State::new(packet_sender, command_sender, username); + let state = Arc::new(Mutex::new(state)); // Run it join!( @@ -89,9 +94,13 @@ async fn main() { packet_receiver, ), network::udp::handle( - state, + Arc::clone(&state), server_addr, crypt_state_receiver, ), + command::handle( + state, + command_receiver, + ), ); } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index fa4c4b6..72a2840 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -30,7 +30,7 @@ pub async fn handle( server_host: String, accept_invalid_cert: bool, crypt_state_sender: oneshot::Sender<ClientCryptState>, - packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>, + packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>, ) { let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; let sink = Arc::new(Mutex::new(sink)); @@ -94,7 +94,7 @@ async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) { } async fn send_packets(sink: Arc<Mutex<TcpSender>>, - mut packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>) { + mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>) { while let Some(packet) = packet_receiver.recv().await { sink.lock().unwrap().send(packet).await.unwrap(); @@ -153,8 +153,7 @@ async fn listen( for (_, channel) in server.channels() { info!("Found channel {}", channel.name()); } - //TODO start listening for packets to send here - state.handle_command(Command::ChannelJoin{channel_id: 1}).await; + state.initialized(); } ControlPacket::Reject(msg) => { warn!("Login rejected: {:?}", msg); diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 89ae4cd..74b2037 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -6,25 +6,33 @@ use mumble_protocol::control::ControlPacket; use mumble_protocol::voice::Serverbound; use std::collections::HashMap; use std::collections::hash_map::Entry; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; pub struct State { server: Server, audio: Audio, - packet_sender: mpsc::Sender<ControlPacket<Serverbound>>, + packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, + command_sender: mpsc::UnboundedSender<Command>, + + initialized_watcher: (watch::Sender<bool>, watch::Receiver<bool>), username: String, session_id: Option<u32>, } impl State { - pub fn new(packet_sender: mpsc::Sender<ControlPacket<Serverbound>>, - username: String) -> Self { + pub fn new( + packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, + command_sender: mpsc::UnboundedSender<Command>, + username: String, + ) -> Self { Self { server: Server::new(), audio: Audio::new(), packet_sender, + command_sender, + initialized_watcher: watch::channel(false), username, session_id: None, } @@ -41,7 +49,7 @@ impl State { let mut msg = msgs::UserState::new(); msg.set_session(self.session_id.unwrap()); msg.set_channel_id(channel_id); - self.packet_sender.send(msg.into()).await.unwrap(); + self.packet_sender.send(msg.into()).unwrap(); } _ => {} } @@ -76,12 +84,15 @@ impl State { self.server.parse_user_state(msg); } + pub fn initialized(&self) { + self.initialized_watcher.0.broadcast(true).unwrap(); + } + pub fn audio(&self) -> &Audio { &self.audio } pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio } - - pub fn username(&self) -> &str { &self.username } - + pub fn initialized_receiver(&self) -> watch::Receiver<bool> { self.initialized_watcher.1.clone() } pub fn server_mut(&mut self) -> &mut Server { &mut self.server } + pub fn username(&self) -> &str { &self.username } } pub struct Server { @@ -147,7 +158,6 @@ impl Server { } } - pub struct Channel { description: Option<String>, links: Vec<u32>, |
