From b583f6dbe521e01e879e16605026997dfa10c3d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 02:31:09 +0200 Subject: join different channels Co-authored-by: Eskil Queseth --- mumd/src/audio.rs | 7 +++-- mumd/src/command.rs | 2 +- mumd/src/main.rs | 21 ++++++------- mumd/src/network/tcp.rs | 57 +++++++++++++++++++--------------- mumd/src/network/udp.rs | 48 ++++++++++++++--------------- mumd/src/state.rs | 81 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 151 insertions(+), 65 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 9b794a6..3c24f1c 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -1,6 +1,5 @@ use bytes::Bytes; -use cpal::traits::DeviceTrait; -use cpal::traits::HostTrait; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{ InputCallbackInfo, OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig, }; @@ -30,7 +29,7 @@ pub struct Audio { pub input_buffer: Arc>>, input_channel_receiver: Option>, - client_streams: Arc>>, + client_streams: Arc>>, //TODO move to user state } //TODO split into input/output @@ -129,6 +128,8 @@ impl Audio { } .unwrap(); + output_stream.play().unwrap(); + Self { output_config, output_stream, diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 5d6cca4..322bde8 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,4 +1,4 @@ -enum Command { +pub enum Command { ChannelJoin { channel_id: u32, }, diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 2a0fcbd..a08db44 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -2,21 +2,22 @@ mod audio; mod network; mod command; mod state; -use crate::audio::Audio; -use crate::state::Server; +use crate::state::State; use argparse::ArgumentParser; use argparse::Store; use argparse::StoreTrue; use colored::*; -use cpal::traits::StreamTrait; use futures::channel::oneshot; use futures::join; use log::*; +use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; +use mumble_protocol::voice::Serverbound; use std::net::ToSocketAddrs; use std::sync::Arc; use std::sync::Mutex; +use tokio::sync::mpsc; #[tokio::main] async fn main() { @@ -73,28 +74,24 @@ async fn main() { // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::(); + let (packet_sender, packet_receiver) = mpsc::channel::>(10); - let audio = Audio::new(); - audio.output_stream.play().unwrap(); - let audio = Arc::new(Mutex::new(audio)); - - let server_state = Arc::new(Mutex::new(Server::new())); + let state = Arc::new(Mutex::new(State::new(packet_sender, username))); // Run it join!( network::tcp::handle( - server_state, + Arc::clone(&state), server_addr, server_host, - username, accept_invalid_cert, crypt_state_sender, - Arc::clone(&audio), + packet_receiver, ), network::udp::handle( + state, server_addr, crypt_state_receiver, - audio, ), ); } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index dde98aa..fa4c4b6 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,5 +1,5 @@ -use crate::audio::Audio; -use crate::state::Server; +use crate::state::State; +use crate::command::Command; use log::*; use futures::channel::oneshot; @@ -12,6 +12,7 @@ use std::convert::{Into, TryInto}; use std::net::{SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -24,25 +25,26 @@ type TcpReceiver = SplitStream, ControlCodec>>; pub async fn handle( - server: Arc>, + state: Arc>, server_addr: SocketAddr, server_host: String, - username: String, accept_invalid_cert: bool, crypt_state_sender: oneshot::Sender, - audio: Arc>, + packet_receiver: mpsc::Receiver>, ) { let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; let sink = Arc::new(Mutex::new(sink)); + // Handshake (omitting `Version` message for brevity) - authenticate(Arc::clone(&sink), username).await; + authenticate(Arc::clone(&sink), state.lock().unwrap().username().to_string()).await; info!("Logging in..."); join!( send_pings(Arc::clone(&sink), 10), - listen(server, sink, stream, crypt_state_sender, audio), + listen(state, stream, crypt_state_sender), + send_packets(sink, packet_receiver), ); } @@ -72,6 +74,7 @@ async fn connect( ClientControlCodec::new().framed(tls_stream).split() } +//TODO &mut sink? async fn authenticate(sink: Arc>, username: String) { let mut msg = msgs::Authenticate::new(); msg.set_username(username); @@ -79,6 +82,7 @@ async fn authenticate(sink: Arc>, username: String) { sink.lock().unwrap().send(msg.into()).await.unwrap(); } +//TODO move somewhere else (main?) and send through packet_sender async fn send_pings(sink: Arc>, delay_seconds: u64) { let mut interval = time::interval(Duration::from_secs(delay_seconds)); loop { @@ -89,12 +93,18 @@ async fn send_pings(sink: Arc>, delay_seconds: u64) { } } +async fn send_packets(sink: Arc>, + mut packet_receiver: mpsc::Receiver>) { + + while let Some(packet) = packet_receiver.recv().await { + sink.lock().unwrap().send(packet).await.unwrap(); + } +} + async fn listen( - server: Arc>, - sink: Arc>, + state: Arc>, mut stream: TcpReceiver, crypt_state_sender: oneshot::Sender, - audio: Arc>, ) { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); @@ -102,18 +112,12 @@ async fn listen( while let Some(packet) = stream.next().await { //TODO handle types separately match packet.unwrap() { - ControlPacket::TextMessage(mut msg) => { + ControlPacket::TextMessage(msg) => { info!( "Got message from user with session ID {}: {}", msg.get_actor(), msg.get_message() ); - // Send reply back to server - let mut response = msgs::TextMessage::new(); - response.mut_session().push(msg.get_actor()); - response.set_message(msg.take_message()); - let mut lock = sink.lock().unwrap(); - lock.send(response.into()).await.unwrap(); } ControlPacket::CryptSetup(msg) => { debug!("Crypt setup"); @@ -139,7 +143,8 @@ async fn listen( .expect("Server didn't send us any CryptSetup packet!"), ); } - let mut server = server.lock().unwrap(); + let mut state = state.lock().unwrap(); + let server = state.server_mut(); server.parse_server_sync(msg); match &server.welcome_text { Some(s) => info!("Welcome: {}", s), @@ -148,16 +153,18 @@ async fn listen( for (_, channel) in server.channels() { info!("Found channel {}", channel.name()); } - sink.lock().unwrap().send(msgs::UserList::new().into()).await.unwrap(); + //TODO start listening for packets to send here + state.handle_command(Command::ChannelJoin{channel_id: 1}).await; } ControlPacket::Reject(msg) => { warn!("Login rejected: {:?}", msg); } ControlPacket::UserState(msg) => { - audio.lock().unwrap().add_client(msg.get_session()); - let mut server = server.lock().unwrap(); + let mut state = state.lock().unwrap(); let session = msg.get_session(); - server.parse_user_state(msg); + state.audio_mut().add_client(msg.get_session()); //TODO + state.parse_initial_user_state(msg); //TODO only if actually initiating state + let server = state.server_mut(); let user = server.users().get(&session).unwrap(); info!("User {} connected to {}", user.name(), @@ -165,14 +172,14 @@ async fn listen( } ControlPacket::UserRemove(msg) => { info!("User {} left", msg.get_session()); - audio.lock().unwrap().remove_client(msg.get_session()); + state.lock().unwrap().audio_mut().remove_client(msg.get_session()); } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - server.lock().unwrap().parse_channel_state(msg); + state.lock().unwrap().server_mut().parse_channel_state(msg); } ControlPacket::ChannelRemove(msg) => { - server.lock().unwrap().parse_channel_remove(msg); + state.lock().unwrap().server_mut().parse_channel_remove(msg); } _ => {} } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 39f16b6..5f76501 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,4 +1,4 @@ -use crate::audio::Audio; +use crate::state::State; use log::*; use bytes::Bytes; @@ -36,10 +36,28 @@ pub async fn connect( UdpFramed::new(udp_socket, crypt_state).split() } +pub async fn handle( + state: Arc>, + server_addr: SocketAddr, + crypt_state: oneshot::Receiver, +) { + let (mut sink, source) = connect(crypt_state).await; + + // Note: A normal application would also send periodic Ping packets, and its own audio + // via UDP. We instead trick the server into accepting us by sending it one + // dummy voice packet. + send_ping(&mut sink, server_addr).await; + + let sink = Arc::new(Mutex::new(sink)); + join!( + listen(Arc::clone(&state), source), + send_voice(state, sink, server_addr) + ); +} + async fn listen( - _sink: Arc>, + state: Arc>, mut source: UdpReceiver, - audio: Arc>, ) { while let Some(packet) = source.next().await { let (packet, _src_addr) = match packet { @@ -63,7 +81,7 @@ async fn listen( // position_info, .. } => { - audio.lock().unwrap().decode_packet(session_id, payload); + state.lock().unwrap().audio().decode_packet(session_id, payload); } } } @@ -86,11 +104,11 @@ async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { } async fn send_voice( + state: Arc>, sink: Arc>, server_addr: SocketAddr, - audio: Arc>, ) { - let mut receiver = audio.lock().unwrap().take_receiver().unwrap(); + let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); let mut count = 0; while let Some(payload) = receiver.recv().await { @@ -111,21 +129,3 @@ async fn send_voice( } } -pub async fn handle( - server_addr: SocketAddr, - crypt_state: oneshot::Receiver, - audio: Arc>, -) { - let (mut sink, source) = connect(crypt_state).await; - - // Note: A normal application would also send periodic Ping packets, and its own audio - // via UDP. We instead trick the server into accepting us by sending it one - // dummy voice packet. - send_ping(&mut sink, server_addr).await; - - let sink = Arc::new(Mutex::new(sink)); - join!( - listen(Arc::clone(&sink), source, Arc::clone(&audio)), - send_voice(sink, server_addr, audio) - ); -} diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 1ef8467..566adaf 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -1,7 +1,88 @@ use log::*; +use crate::audio::Audio; +use crate::command::Command; use mumble_protocol::control::msgs; +use mumble_protocol::control::ControlPacket; +use mumble_protocol::voice::Serverbound; use std::collections::HashMap; use std::collections::hash_map::Entry; +use tokio::sync::mpsc; + +pub struct State { + server: Server, + audio: Audio, + + packet_sender: mpsc::Sender>, + + username: String, + session_id: Option, //FIXME set +} + +impl State { + pub fn new(packet_sender: mpsc::Sender>, + username: String) -> Self { + Self { + server: Server::new(), + audio: Audio::new(), + packet_sender, + username, + session_id: None, + } + } + + //TODO result + pub async fn handle_command(&mut self, command: Command) { + match command { + Command::ChannelJoin{channel_id} => { + if self.session_id.is_none() { + warn!("Tried to join channel but we don't have a session id"); + return; + } + let mut msg = msgs::UserState::new(); + msg.set_session(self.session_id.unwrap()); + msg.set_channel_id(channel_id); + self.packet_sender.send(msg.into()).await.unwrap(); + } + _ => {} + } + } + + pub fn parse_initial_user_state(&mut self, msg: Box) { + if !msg.has_session() { + warn!("Can't parse user state without session"); + return; + } + if !msg.has_name() { + warn!("Missing name in initial user state"); + } else { + if msg.get_name() == self.username { + match self.session_id { + None => { + debug!("Found our session id: {}", msg.get_session()); + self.session_id = Some(msg.get_session()); + } + Some(session) => { + if session != msg.get_session() { + error!("Got two different session IDs ({} and {}) for ourselves", + session, + msg.get_session()); + } else { + debug!("Got our session ID twice"); + } + } + } + } + } + self.server.parse_user_state(msg); + } + + pub fn audio(&self) -> &Audio { &self.audio } + pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio } + + pub fn username(&self) -> &str { &self.username } + + pub fn server_mut(&mut self) -> &mut Server { &mut self.server } +} pub struct Server { channels: HashMap, -- cgit v1.2.1 From cd353c875c3c8bcae4f4ece597468728341362c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 02:38:22 +0200 Subject: command todos --- mumd/src/state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'mumd/src') diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 566adaf..89ae4cd 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -15,7 +15,7 @@ pub struct State { packet_sender: mpsc::Sender>, username: String, - session_id: Option, //FIXME set + session_id: Option, } impl State { -- cgit v1.2.1 From 503f6c90395682bf5d7fd3fb8a79bfcfc3c2f329 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 17:05:22 +0200 Subject: wait for complete state before sending commands --- mumd/src/audio.rs | 2 +- mumd/src/command.rs | 19 +++++++++++++++++++ mumd/src/main.rs | 19 ++++++++++++++----- mumd/src/network/tcp.rs | 7 +++---- mumd/src/state.rs | 28 +++++++++++++++++++--------- 5 files changed, 56 insertions(+), 19 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 3c24f1c..e13845e 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -27,7 +27,7 @@ pub struct Audio { pub input_config: StreamConfig, pub input_stream: Stream, pub input_buffer: Arc>>, - input_channel_receiver: Option>, + input_channel_receiver: Option>, //TODO unbounded? mbe ring buffer and drop the first packet client_streams: Arc>>, //TODO move to user state } diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 322bde8..1f7a781 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,3 +1,9 @@ +use crate::state::State; + +use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; + +#[derive(Debug)] pub enum Command { ChannelJoin { channel_id: u32, @@ -12,3 +18,16 @@ pub enum Command { ServerDisconnect, Status, } + +pub async fn handle( + state: Arc>, + mut command_receiver: mpsc::UnboundedReceiver, +) { + // wait until we can send packages + let mut initialized_receiver = state.lock().unwrap().initialized_receiver(); + while matches!(initialized_receiver.recv().await, Some(false)) {} + + while let Some(command) = command_receiver.recv().await { + state.lock().unwrap().handle_command(command).await; + } +} diff --git a/mumd/src/main.rs b/mumd/src/main.rs index a08db44..6d8d9bf 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -2,7 +2,9 @@ mod audio; mod network; mod command; mod state; + use crate::state::State; +use crate::command::Command; use argparse::ArgumentParser; use argparse::Store; @@ -15,13 +17,13 @@ use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; use std::net::ToSocketAddrs; -use std::sync::Arc; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; #[tokio::main] async fn main() { // setup logger + //TODO? add newline before message if it contains newlines fern::Dispatch::new() .format(|out, message, record| { out.finish(format_args!( @@ -74,9 +76,12 @@ async fn main() { // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::(); - let (packet_sender, packet_receiver) = mpsc::channel::>(10); + let (packet_sender, packet_receiver) = mpsc::unbounded_channel::>(); + let (command_sender, command_receiver) = mpsc::unbounded_channel::(); - let state = Arc::new(Mutex::new(State::new(packet_sender, username))); + command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); + let state = State::new(packet_sender, command_sender, username); + let state = Arc::new(Mutex::new(state)); // Run it join!( @@ -89,9 +94,13 @@ async fn main() { packet_receiver, ), network::udp::handle( - state, + Arc::clone(&state), server_addr, crypt_state_receiver, ), + command::handle( + state, + command_receiver, + ), ); } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index fa4c4b6..72a2840 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -30,7 +30,7 @@ pub async fn handle( server_host: String, accept_invalid_cert: bool, crypt_state_sender: oneshot::Sender, - packet_receiver: mpsc::Receiver>, + packet_receiver: mpsc::UnboundedReceiver>, ) { let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; let sink = Arc::new(Mutex::new(sink)); @@ -94,7 +94,7 @@ async fn send_pings(sink: Arc>, delay_seconds: u64) { } async fn send_packets(sink: Arc>, - mut packet_receiver: mpsc::Receiver>) { + mut packet_receiver: mpsc::UnboundedReceiver>) { while let Some(packet) = packet_receiver.recv().await { sink.lock().unwrap().send(packet).await.unwrap(); @@ -153,8 +153,7 @@ async fn listen( for (_, channel) in server.channels() { info!("Found channel {}", channel.name()); } - //TODO start listening for packets to send here - state.handle_command(Command::ChannelJoin{channel_id: 1}).await; + state.initialized(); } ControlPacket::Reject(msg) => { warn!("Login rejected: {:?}", msg); diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 89ae4cd..74b2037 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -6,25 +6,33 @@ use mumble_protocol::control::ControlPacket; use mumble_protocol::voice::Serverbound; use std::collections::HashMap; use std::collections::hash_map::Entry; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; pub struct State { server: Server, audio: Audio, - packet_sender: mpsc::Sender>, + packet_sender: mpsc::UnboundedSender>, + command_sender: mpsc::UnboundedSender, + + initialized_watcher: (watch::Sender, watch::Receiver), username: String, session_id: Option, } impl State { - pub fn new(packet_sender: mpsc::Sender>, - username: String) -> Self { + pub fn new( + packet_sender: mpsc::UnboundedSender>, + command_sender: mpsc::UnboundedSender, + username: String, + ) -> Self { Self { server: Server::new(), audio: Audio::new(), packet_sender, + command_sender, + initialized_watcher: watch::channel(false), username, session_id: None, } @@ -41,7 +49,7 @@ impl State { let mut msg = msgs::UserState::new(); msg.set_session(self.session_id.unwrap()); msg.set_channel_id(channel_id); - self.packet_sender.send(msg.into()).await.unwrap(); + self.packet_sender.send(msg.into()).unwrap(); } _ => {} } @@ -76,12 +84,15 @@ impl State { self.server.parse_user_state(msg); } + pub fn initialized(&self) { + self.initialized_watcher.0.broadcast(true).unwrap(); + } + pub fn audio(&self) -> &Audio { &self.audio } pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio } - - pub fn username(&self) -> &str { &self.username } - + pub fn initialized_receiver(&self) -> watch::Receiver { self.initialized_watcher.1.clone() } pub fn server_mut(&mut self) -> &mut Server { &mut self.server } + pub fn username(&self) -> &str { &self.username } } pub struct Server { @@ -147,7 +158,6 @@ impl Server { } } - pub struct Channel { description: Option, links: Vec, -- cgit v1.2.1 From 321d0400bb8760ab215a602cc74f36a2a7dd6788 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 17:12:46 +0200 Subject: respect if we're initializing when parsing user state --- mumd/src/network/tcp.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 72a2840..3fc36a3 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -162,7 +162,11 @@ async fn listen( let mut state = state.lock().unwrap(); let session = msg.get_session(); state.audio_mut().add_client(msg.get_session()); //TODO - state.parse_initial_user_state(msg); //TODO only if actually initiating state + if *state.initialized_receiver().borrow() { + state.server_mut().parse_user_state(msg); + } else { + state.parse_initial_user_state(msg); + } let server = state.server_mut(); let user = server.users().get(&session).unwrap(); info!("User {} connected to {}", @@ -175,7 +179,7 @@ async fn listen( } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - state.lock().unwrap().server_mut().parse_channel_state(msg); + state.lock().unwrap().server_mut().parse_channel_state(msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { state.lock().unwrap().server_mut().parse_channel_remove(msg); -- cgit v1.2.1 From ccd7cbac5e8080240988b01cc9f2e64af9082f5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 23:58:51 +0200 Subject: send tcp pings via packet sender Co-authored-by: Eskil Queseth --- mumd/src/network/tcp.rs | 6 +++--- mumd/src/state.rs | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 3fc36a3..f86447b 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -82,14 +82,14 @@ async fn authenticate(sink: Arc>, username: String) { sink.lock().unwrap().send(msg.into()).await.unwrap(); } -//TODO move somewhere else (main?) and send through packet_sender -async fn send_pings(sink: Arc>, delay_seconds: u64) { +async fn send_pings(packet_sender: mpsc::UnboundedSender>, + delay_seconds: u64) { let mut interval = time::interval(Duration::from_secs(delay_seconds)); loop { interval.tick().await; trace!("Sending ping"); let msg = msgs::Ping::new(); - sink.lock().unwrap().send(msg.into()).await.unwrap(); + packet_sender.send(msg.into()).unwrap(); } } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 74b2037..cd266d7 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -90,6 +90,7 @@ impl State { pub fn audio(&self) -> &Audio { &self.audio } pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio } + pub fn packet_sender(&self) -> mpsc::UnboundedSender> { self.packet_sender.clone() } pub fn initialized_receiver(&self) -> watch::Receiver { self.initialized_watcher.1.clone() } pub fn server_mut(&mut self) -> &mut Server { &mut self.server } pub fn username(&self) -> &str { &self.username } -- cgit v1.2.1 From 6693c994f1baa6fc08787ef47759f58834f6c3c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Tue, 13 Oct 2020 23:59:16 +0200 Subject: remove shared mutability of tcp sink Co-authored-by: Eskil Queseth --- mumd/src/network/tcp.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index f86447b..6f60b63 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,5 +1,4 @@ use crate::state::State; -use crate::command::Command; use log::*; use futures::channel::oneshot; @@ -32,17 +31,15 @@ pub async fn handle( crypt_state_sender: oneshot::Sender, packet_receiver: mpsc::UnboundedReceiver>, ) { - let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; - let sink = Arc::new(Mutex::new(sink)); - + let (mut sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; // Handshake (omitting `Version` message for brevity) - authenticate(Arc::clone(&sink), state.lock().unwrap().username().to_string()).await; + authenticate(&mut sink, state.lock().unwrap().username().to_string()).await; info!("Logging in..."); join!( - send_pings(Arc::clone(&sink), 10), + send_pings(state.lock().unwrap().packet_sender(), 10), listen(state, stream, crypt_state_sender), send_packets(sink, packet_receiver), ); @@ -74,12 +71,11 @@ async fn connect( ClientControlCodec::new().framed(tls_stream).split() } -//TODO &mut sink? -async fn authenticate(sink: Arc>, username: String) { +async fn authenticate(sink: &mut TcpSender, username: String) { let mut msg = msgs::Authenticate::new(); msg.set_username(username); msg.set_opus(true); - sink.lock().unwrap().send(msg.into()).await.unwrap(); + sink.send(msg.into()).await.unwrap(); } async fn send_pings(packet_sender: mpsc::UnboundedSender>, @@ -93,11 +89,11 @@ async fn send_pings(packet_sender: mpsc::UnboundedSender>, +async fn send_packets(mut sink: TcpSender, mut packet_receiver: mpsc::UnboundedReceiver>) { while let Some(packet) = packet_receiver.recv().await { - sink.lock().unwrap().send(packet).await.unwrap(); + sink.send(packet).await.unwrap(); } } -- cgit v1.2.1 From 3d8009a0201fba0bdc464fae0797d3bb3bcf69f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 01:48:07 +0200 Subject: wip handle more commands (panics) --- mumd/src/command.rs | 20 ++++++++++++---- mumd/src/main.rs | 35 +++++++++++++++++---------- mumd/src/network/mod.rs | 23 ++++++++++++++++++ mumd/src/network/tcp.rs | 29 ++++++++++++++-------- mumd/src/network/udp.rs | 15 +++++++++--- mumd/src/state.rs | 64 +++++++++++++++++++++++++++++++++++++++---------- 6 files changed, 142 insertions(+), 44 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 1f7a781..c3b72bf 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,5 +1,6 @@ -use crate::state::State; +use crate::state::{Channel, Server, State}; +use std::collections::HashMap; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; @@ -19,14 +20,23 @@ pub enum Command { Status, } +#[derive(Debug)] +pub enum CommandResponse { + ChannelList { + channels: HashMap, + }, + Status { + username: String, + server_state: Server, + } +} + pub async fn handle( state: Arc>, mut command_receiver: mpsc::UnboundedReceiver, + command_response_sender: mpsc::UnboundedSender, ()>>, ) { - // wait until we can send packages - let mut initialized_receiver = state.lock().unwrap().initialized_receiver(); - while matches!(initialized_receiver.recv().await, Some(false)) {} - + //TODO err if not connected while let Some(command) = command_receiver.recv().await { state.lock().unwrap().handle_command(command).await; } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 6d8d9bf..93bb0d0 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -3,8 +3,9 @@ mod network; mod command; mod state; +use crate::network::ConnectionInfo; +use crate::command::{Command, CommandResponse}; use crate::state::State; -use crate::command::Command; use argparse::ArgumentParser; use argparse::Store; @@ -16,9 +17,8 @@ use log::*; use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; -use std::net::ToSocketAddrs; use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; #[tokio::main] async fn main() { @@ -67,40 +67,49 @@ async fn main() { ); ap.parse_args_or_exit(); } - let server_addr = (server_host.as_ref(), server_port) - .to_socket_addrs() - .expect("Failed to parse server address") - .next() - .expect("Failed to resolve server address"); // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::(); let (packet_sender, packet_receiver) = mpsc::unbounded_channel::>(); let (command_sender, command_receiver) = mpsc::unbounded_channel::(); + let (command_response_sender, command_response_receiver) = mpsc::unbounded_channel::, ()>>(); + let (connection_info_sender, connection_info_receiver) = watch::channel::>(None); + + command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert}); command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); - let state = State::new(packet_sender, command_sender, username); + let state = State::new(packet_sender, command_sender.clone(), connection_info_sender, username); let state = Arc::new(Mutex::new(state)); // Run it join!( network::tcp::handle( Arc::clone(&state), - server_addr, - server_host, - accept_invalid_cert, + connection_info_receiver.clone(), crypt_state_sender, packet_receiver, ), network::udp::handle( Arc::clone(&state), - server_addr, + connection_info_receiver.clone(), crypt_state_receiver, ), command::handle( state, command_receiver, + command_response_sender, + ), + send_commands( + command_sender, + command_response_receiver, ), ); } + +async fn send_commands( + command_sender: mpsc::UnboundedSender, + command_response_receiver: mpsc::UnboundedReceiver, ()>>, +) { + +} diff --git a/mumd/src/network/mod.rs b/mumd/src/network/mod.rs index f7a6a76..777faad 100644 --- a/mumd/src/network/mod.rs +++ b/mumd/src/network/mod.rs @@ -1,2 +1,25 @@ pub mod tcp; pub mod udp; + +use std::net::SocketAddr; + +#[derive(Clone, Debug)] +pub struct ConnectionInfo { + socket_addr: SocketAddr, + hostname: String, + accept_invalid_cert: bool, +} + +impl ConnectionInfo { + pub fn new( + socket_addr: SocketAddr, + hostname: String, + accept_invalid_cert: bool, + ) -> Self { + Self { + socket_addr, + hostname, + accept_invalid_cert, + } + } +} diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 6f60b63..9fb5ae4 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,4 +1,5 @@ -use crate::state::State; +use crate::network::ConnectionInfo; +use crate::state::{State, StatePhase}; use log::*; use futures::channel::oneshot; @@ -11,7 +12,7 @@ use std::convert::{Into, TryInto}; use std::net::{SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -25,16 +26,24 @@ type TcpReceiver = pub async fn handle( state: Arc>, - server_addr: SocketAddr, - server_host: String, - accept_invalid_cert: bool, + mut connection_info_receiver: watch::Receiver>, crypt_state_sender: oneshot::Sender, packet_receiver: mpsc::UnboundedReceiver>, ) { - let (mut sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; + let (mut sink, stream) = connect(connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert) + .await; // Handshake (omitting `Version` message for brevity) - authenticate(&mut sink, state.lock().unwrap().username().to_string()).await; + authenticate(&mut sink, state.lock().unwrap().username().unwrap().to_string()).await; info!("Logging in..."); @@ -158,10 +167,10 @@ async fn listen( let mut state = state.lock().unwrap(); let session = msg.get_session(); state.audio_mut().add_client(msg.get_session()); //TODO - if *state.initialized_receiver().borrow() { - state.server_mut().parse_user_state(msg); - } else { + if *state.phase_receiver().borrow() == StatePhase::Connecting { state.parse_initial_user_state(msg); + } else { + state.server_mut().parse_user_state(msg); } let server = state.server_mut(); let user = server.users().get(&session).unwrap(); diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 5f76501..cf0305b 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,3 +1,4 @@ +use crate::network::ConnectionInfo; use crate::state::State; use log::*; @@ -11,6 +12,7 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; +use tokio::sync::watch; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; @@ -38,20 +40,27 @@ pub async fn connect( pub async fn handle( state: Arc>, - server_addr: SocketAddr, + mut connection_info_receiver: watch::Receiver>, crypt_state: oneshot::Receiver, ) { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; let (mut sink, source) = connect(crypt_state).await; // Note: A normal application would also send periodic Ping packets, and its own audio // via UDP. We instead trick the server into accepting us by sending it one // dummy voice packet. - send_ping(&mut sink, server_addr).await; + send_ping(&mut sink, connection_info.socket_addr).await; let sink = Arc::new(Mutex::new(sink)); join!( listen(Arc::clone(&state), source), - send_voice(state, sink, server_addr) + send_voice(state, sink, connection_info.socket_addr), ); } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index cd266d7..8689a9a 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -1,23 +1,33 @@ use log::*; use crate::audio::Audio; -use crate::command::Command; +use crate::command::{Command, CommandResponse}; +use crate::network::ConnectionInfo; use mumble_protocol::control::msgs; use mumble_protocol::control::ControlPacket; use mumble_protocol::voice::Serverbound; use std::collections::HashMap; use std::collections::hash_map::Entry; +use std::net::ToSocketAddrs; use tokio::sync::{mpsc, watch}; +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum StatePhase { + Disconnected, + Connecting, + Connected, +} + pub struct State { server: Server, audio: Audio, packet_sender: mpsc::UnboundedSender>, command_sender: mpsc::UnboundedSender, + connection_info_sender: watch::Sender>, - initialized_watcher: (watch::Sender, watch::Receiver), + phase_watcher: (watch::Sender, watch::Receiver), - username: String, + username: Option, session_id: Option, } @@ -25,6 +35,7 @@ impl State { pub fn new( packet_sender: mpsc::UnboundedSender>, command_sender: mpsc::UnboundedSender, + connection_info_sender: watch::Sender>, username: String, ) -> Self { Self { @@ -32,26 +43,50 @@ impl State { audio: Audio::new(), packet_sender, command_sender, - initialized_watcher: watch::channel(false), - username, + connection_info_sender, + phase_watcher: watch::channel(StatePhase::Disconnected), + username: None, session_id: None, } } - //TODO result - pub async fn handle_command(&mut self, command: Command) { + pub async fn handle_command(&mut self, command: Command) -> Result, ()> { match command { Command::ChannelJoin{channel_id} => { if self.session_id.is_none() { warn!("Tried to join channel but we don't have a session id"); - return; + return Err(()); } let mut msg = msgs::UserState::new(); msg.set_session(self.session_id.unwrap()); msg.set_channel_id(channel_id); self.packet_sender.send(msg.into()).unwrap(); + Ok(None) + } + Command::ChannelList => { + Ok(Some(CommandResponse::ChannelList{channels: self.server.channels.clone()})) + } + Command::ServerConnect{host, port, username, accept_invalid_cert} => { + if !matches!(*self.phase_receiver().borrow(), StatePhase::Disconnected) { + warn!("Tried to connect to a server while already connected"); + return Err(()); + } + self.username = Some(username); + self.phase_watcher.0.broadcast(StatePhase::Connecting).unwrap(); + let socket_addr = (host.as_ref(), port) + .to_socket_addrs() + .expect("Failed to parse server address") + .next() + .expect("Failed to resolve server address"); + self.connection_info_sender.broadcast(Some(ConnectionInfo::new( + socket_addr, + host, + accept_invalid_cert, + ))); + while !matches!(self.phase_receiver().recv().await.unwrap(), StatePhase::Connected) {} + Ok(None) } - _ => {} + _ => { Ok(None) } } } @@ -63,7 +98,7 @@ impl State { if !msg.has_name() { warn!("Missing name in initial user state"); } else { - if msg.get_name() == self.username { + if msg.get_name() == self.username.as_ref().unwrap() { match self.session_id { None => { debug!("Found our session id: {}", msg.get_session()); @@ -85,17 +120,18 @@ impl State { } pub fn initialized(&self) { - self.initialized_watcher.0.broadcast(true).unwrap(); + self.phase_watcher.0.broadcast(StatePhase::Connected).unwrap(); } pub fn audio(&self) -> &Audio { &self.audio } pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio } pub fn packet_sender(&self) -> mpsc::UnboundedSender> { self.packet_sender.clone() } - pub fn initialized_receiver(&self) -> watch::Receiver { self.initialized_watcher.1.clone() } + pub fn phase_receiver(&self) -> watch::Receiver { self.phase_watcher.1.clone() } pub fn server_mut(&mut self) -> &mut Server { &mut self.server } - pub fn username(&self) -> &str { &self.username } + pub fn username(&self) -> Option<&String> { self.username.as_ref() } } +#[derive(Debug)] pub struct Server { channels: HashMap, users: HashMap, @@ -159,6 +195,7 @@ impl Server { } } +#[derive(Clone, Debug)] pub struct Channel { description: Option, links: Vec, @@ -212,6 +249,7 @@ impl Channel { } } +#[derive(Debug)] pub struct User { channel: u32, comment: Option, -- cgit v1.2.1 From a3c393a711c71698ef833f1923374798cbb0d0b4 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 02:04:16 +0200 Subject: fix code so that it doesn't deadlock --- mumd/src/command.rs | 12 ++++++++++-- mumd/src/state.rs | 15 +++++++-------- 2 files changed, 17 insertions(+), 10 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index c3b72bf..8a5c715 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,8 +1,9 @@ -use crate::state::{Channel, Server, State}; +use crate::state::{Channel, Server, State, StatePhase}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; +use log::*; #[derive(Debug)] pub enum Command { @@ -38,6 +39,13 @@ pub async fn handle( ) { //TODO err if not connected while let Some(command) = command_receiver.recv().await { - state.lock().unwrap().handle_command(command).await; + debug!("Parsing command {:?}", command); + let mut state = state.lock().unwrap(); + let (wait_for_connected, _) = state.handle_command(command).await; + if wait_for_connected { + let mut watcher = state.phase_receiver(); + drop(state); + while !matches!(watcher.recv().await.unwrap(), StatePhase::Connected) {} + } } } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 8689a9a..84e78f0 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -50,26 +50,26 @@ impl State { } } - pub async fn handle_command(&mut self, command: Command) -> Result, ()> { + pub async fn handle_command(&mut self, command: Command) -> (bool, Result, ()>) { match command { Command::ChannelJoin{channel_id} => { if self.session_id.is_none() { warn!("Tried to join channel but we don't have a session id"); - return Err(()); + return (false, Err(())); } let mut msg = msgs::UserState::new(); msg.set_session(self.session_id.unwrap()); msg.set_channel_id(channel_id); self.packet_sender.send(msg.into()).unwrap(); - Ok(None) + (false, Ok(None)) } Command::ChannelList => { - Ok(Some(CommandResponse::ChannelList{channels: self.server.channels.clone()})) + (false, Ok(Some(CommandResponse::ChannelList{channels: self.server.channels.clone()}))) } Command::ServerConnect{host, port, username, accept_invalid_cert} => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Disconnected) { warn!("Tried to connect to a server while already connected"); - return Err(()); + return (false, Err(())); } self.username = Some(username); self.phase_watcher.0.broadcast(StatePhase::Connecting).unwrap(); @@ -83,10 +83,9 @@ impl State { host, accept_invalid_cert, ))); - while !matches!(self.phase_receiver().recv().await.unwrap(), StatePhase::Connected) {} - Ok(None) + (true, Ok(None)) } - _ => { Ok(None) } + _ => { (true, Ok(None)) } } } -- cgit v1.2.1 From b5528c2198d54028ef03d35d5aa4d7fdde6af8f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 02:33:50 +0200 Subject: some changes --- mumd/src/command.rs | 5 +++-- mumd/src/main.rs | 15 ++++++++------- mumd/src/state.rs | 25 ++++++++++++++++++++----- 3 files changed, 31 insertions(+), 14 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 8a5c715..0e5bdc7 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -27,7 +27,7 @@ pub enum CommandResponse { channels: HashMap, }, Status { - username: String, + username: Option, server_state: Server, } } @@ -41,11 +41,12 @@ pub async fn handle( while let Some(command) = command_receiver.recv().await { debug!("Parsing command {:?}", command); let mut state = state.lock().unwrap(); - let (wait_for_connected, _) = state.handle_command(command).await; + let (wait_for_connected, command_response) = state.handle_command(command).await; if wait_for_connected { let mut watcher = state.phase_receiver(); drop(state); while !matches!(watcher.recv().await.unwrap(), StatePhase::Connected) {} } + command_response_sender.send(command_response).unwrap(); } } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 93bb0d0..24c2567 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -76,9 +76,10 @@ async fn main() { let (command_response_sender, command_response_receiver) = mpsc::unbounded_channel::, ()>>(); let (connection_info_sender, connection_info_receiver) = watch::channel::>(None); + command_sender.send(Command::ChannelList).unwrap(); command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert}); - command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); + command_sender.send(Command::ChannelList).unwrap(); let state = State::new(packet_sender, command_sender.clone(), connection_info_sender, username); let state = Arc::new(Mutex::new(state)); @@ -100,16 +101,16 @@ async fn main() { command_receiver, command_response_sender, ), - send_commands( - command_sender, + receive_command_responses( command_response_receiver, ), ); } -async fn send_commands( - command_sender: mpsc::UnboundedSender, - command_response_receiver: mpsc::UnboundedReceiver, ()>>, +async fn receive_command_responses( + mut command_response_receiver: mpsc::UnboundedReceiver, ()>>, ) { - + while let Some(command_response) = command_response_receiver.recv().await { + debug!("{:#?}", command_response); + } } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 84e78f0..ef1cd6d 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -50,11 +50,12 @@ impl State { } } + //TODO? move bool inside Result pub async fn handle_command(&mut self, command: Command) -> (bool, Result, ()>) { match command { Command::ChannelJoin{channel_id} => { - if self.session_id.is_none() { - warn!("Tried to join channel but we don't have a session id"); + if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) { + warn!("Not connected"); return (false, Err(())); } let mut msg = msgs::UserState::new(); @@ -64,6 +65,10 @@ impl State { (false, Ok(None)) } Command::ChannelList => { + if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) { + warn!("Not connected"); + return (false, Err(())); + } (false, Ok(Some(CommandResponse::ChannelList{channels: self.server.channels.clone()}))) } Command::ServerConnect{host, port, username, accept_invalid_cert} => { @@ -85,7 +90,17 @@ impl State { ))); (true, Ok(None)) } - _ => { (true, Ok(None)) } + Command::Status => { + if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) { + warn!("Not connected"); + return (false, Err(())); + } + (false, Ok(Some(CommandResponse::Status{ + username: self.username.clone(), + server_state: self.server.clone(), + }))) + } + _ => { (false, Ok(None)) } } } @@ -130,7 +145,7 @@ impl State { pub fn username(&self) -> Option<&String> { self.username.as_ref() } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Server { channels: HashMap, users: HashMap, @@ -248,7 +263,7 @@ impl Channel { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct User { channel: u32, comment: Option, -- cgit v1.2.1 From dcb71982eab550535298b2d879a3a83820a0798a Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 14:49:56 +0200 Subject: add newline when logging if logged message contains a newline --- mumd/src/main.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 24c2567..a2665ba 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -23,11 +23,11 @@ use tokio::sync::{mpsc, watch}; #[tokio::main] async fn main() { // setup logger - //TODO? add newline before message if it contains newlines fern::Dispatch::new() .format(|out, message, record| { + let message = message.to_string(); out.finish(format_args!( - "{} {}:{} {}", + "{} {}:{}{}{}", //TODO runtime flag that disables color match record.level() { Level::Error => "ERROR".red(), @@ -38,6 +38,7 @@ async fn main() { }, record.file().unwrap(), record.line().unwrap(), + if message.chars().any(|e| e == '\n') { "\n" } else { " " }, message )) }) -- cgit v1.2.1 From 7fb14d648aacd398f720f60236020dab6bf9fd35 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 16:54:27 +0200 Subject: add support for disconnect command --- mumd/src/command.rs | 2 + mumd/src/main.rs | 24 +++- mumd/src/network/tcp.rs | 291 +++++++++++++++++++++++++++++++++--------------- mumd/src/network/udp.rs | 144 +++++++++++++++++++----- mumd/src/state.rs | 5 +- 5 files changed, 344 insertions(+), 122 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 0e5bdc7..bfdb7dd 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -49,4 +49,6 @@ pub async fn handle( } command_response_sender.send(command_response).unwrap(); } + + debug!("Finished handling commands"); } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index a2665ba..c923857 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -11,14 +11,18 @@ use argparse::ArgumentParser; use argparse::Store; use argparse::StoreTrue; use colored::*; -use futures::channel::oneshot; -use futures::join; +use tokio::sync::oneshot; +use futures::{join, select}; use log::*; use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; +use std::thread; +use std::time::Duration; +use tokio::stream::StreamExt; +use futures::FutureExt; #[tokio::main] async fn main() { @@ -79,7 +83,7 @@ async fn main() { command_sender.send(Command::ChannelList).unwrap(); command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert}); - command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); + //command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); command_sender.send(Command::ChannelList).unwrap(); let state = State::new(packet_sender, command_sender.clone(), connection_info_sender, username); let state = Arc::new(Mutex::new(state)); @@ -102,16 +106,28 @@ async fn main() { command_receiver, command_response_sender, ), + send_commands( + command_sender + ), receive_command_responses( command_response_receiver, ), ); } +async fn send_commands(command_sender: mpsc::UnboundedSender) { + tokio::time::delay_for(Duration::from_secs(5)).await; + command_sender.send(Command::ServerDisconnect); + + debug!("Finished sending commands"); +} + async fn receive_command_responses( mut command_response_receiver: mpsc::UnboundedReceiver, ()>>, ) { while let Some(command_response) = command_response_receiver.recv().await { - debug!("{:#?}", command_response); + debug!("{:?}", command_response); } + + debug!("Finished receiving commands"); } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 9fb5ae4..0aca19e 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -2,8 +2,8 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; -use futures::channel::oneshot; -use futures::{join, SinkExt, StreamExt}; +use tokio::sync::oneshot; +use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; @@ -16,6 +16,7 @@ use tokio::sync::{mpsc, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; +use futures_util::core_reexport::cell::RefCell; type TcpSender = SplitSink< Framed, ControlCodec>, @@ -43,15 +44,21 @@ pub async fn handle( .await; // Handshake (omitting `Version` message for brevity) - authenticate(&mut sink, state.lock().unwrap().username().unwrap().to_string()).await; + let mut state_lock = state.lock().unwrap(); + authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; + let phase_watcher = state_lock.phase_receiver(); + let packet_sender = state_lock.packet_sender(); + drop(state_lock); info!("Logging in..."); join!( - send_pings(state.lock().unwrap().packet_sender(), 10), - listen(state, stream, crypt_state_sender), - send_packets(sink, packet_receiver), + send_pings(packet_sender, 10, phase_watcher.clone()), + listen(state, stream, crypt_state_sender, phase_watcher.clone()), + send_packets(sink, packet_receiver, phase_watcher), ); + + debug!("Fully disconnected TCP stream"); } async fn connect( @@ -87,109 +94,209 @@ async fn authenticate(sink: &mut TcpSender, username: String) { sink.send(msg.into()).await.unwrap(); } -async fn send_pings(packet_sender: mpsc::UnboundedSender>, - delay_seconds: u64) { +async fn send_pings( + packet_sender: mpsc::UnboundedSender>, + delay_seconds: u64, + mut phase_watcher: watch::Receiver, +) { + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + let mut interval = time::interval(Duration::from_secs(delay_seconds)); - loop { - interval.tick().await; - trace!("Sending ping"); - let msg = msgs::Ping::new(); - packet_sender.send(msg.into()).unwrap(); - } + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let interval_waiter = interval.tick().fuse(); + pin_mut!(interval_waiter); + let exitor = select! { + data = interval_waiter => Some(data), + _ = rx => None + }; + + match exitor { + Some(_) => { + trace!("Sending ping"); + let msg = msgs::Ping::new(); + packet_sender.send(msg.into()).unwrap(); + } + None => break, + } + } + }; + + join!(main_block, phase_transition_block); + + debug!("Ping sender process killed"); } -async fn send_packets(mut sink: TcpSender, - mut packet_receiver: mpsc::UnboundedReceiver>) { +async fn send_packets( + mut sink: TcpSender, + mut packet_receiver: mpsc::UnboundedReceiver>, + mut phase_watcher: watch::Receiver, +) { + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = packet_receiver.recv().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + warn!("Channel closed before disconnect command"); + break; + } + Some(Some(packet)) => { + sink.send(packet).await.unwrap(); + } + } + } - while let Some(packet) = packet_receiver.recv().await { - sink.send(packet).await.unwrap(); - } + //clears queue of remaining packets + while let Ok(_) = packet_receiver.try_recv() {} + + sink.close().await.unwrap(); + }; + + join!(main_block, phase_transition_block); + + debug!("TCP packet sender killed"); } async fn listen( state: Arc>, mut stream: TcpReceiver, crypt_state_sender: oneshot::Sender, + mut phase_watcher: watch::Receiver, ) { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); - while let Some(packet) = stream.next().await { - //TODO handle types separately - match packet.unwrap() { - ControlPacket::TextMessage(msg) => { - info!( - "Got message from user with session ID {}: {}", - msg.get_actor(), - msg.get_message() - ); - } - ControlPacket::CryptSetup(msg) => { - debug!("Crypt setup"); - // Wait until we're fully connected before initiating UDP voice - crypt_state = Some(ClientCryptState::new_from( - msg.get_key() - .try_into() - .expect("Server sent private key with incorrect size"), - msg.get_client_nonce() - .try_into() - .expect("Server sent client_nonce with incorrect size"), - msg.get_server_nonce() - .try_into() - .expect("Server sent server_nonce with incorrect size"), - )); - } - ControlPacket::ServerSync(msg) => { - info!("Logged in"); - if let Some(sender) = crypt_state_sender.take() { - let _ = sender.send( - crypt_state - .take() - .expect("Server didn't send us any CryptSetup packet!"), - ); - } - let mut state = state.lock().unwrap(); - let server = state.server_mut(); - server.parse_server_sync(msg); - match &server.welcome_text { - Some(s) => info!("Welcome: {}", s), - None => info!("No welcome received"), + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + + let listener_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = stream.next().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; } - for (_, channel) in server.channels() { - info!("Found channel {}", channel.name()); + Some(None) => { + warn!("Channel closed before disconnect command"); + break; } - state.initialized(); - } - ControlPacket::Reject(msg) => { - warn!("Login rejected: {:?}", msg); - } - ControlPacket::UserState(msg) => { - let mut state = state.lock().unwrap(); - let session = msg.get_session(); - state.audio_mut().add_client(msg.get_session()); //TODO - if *state.phase_receiver().borrow() == StatePhase::Connecting { - state.parse_initial_user_state(msg); - } else { - state.server_mut().parse_user_state(msg); + Some(Some(packet)) => { + //TODO handle types separately + match packet.unwrap() { + ControlPacket::TextMessage(msg) => { + info!( + "Got message from user with session ID {}: {}", + msg.get_actor(), + msg.get_message() + ); + } + ControlPacket::CryptSetup(msg) => { + debug!("Crypt setup"); + // Wait until we're fully connected before initiating UDP voice + crypt_state = Some(ClientCryptState::new_from( + msg.get_key() + .try_into() + .expect("Server sent private key with incorrect size"), + msg.get_client_nonce() + .try_into() + .expect("Server sent client_nonce with incorrect size"), + msg.get_server_nonce() + .try_into() + .expect("Server sent server_nonce with incorrect size"), + )); + } + ControlPacket::ServerSync(msg) => { + info!("Logged in"); + if let Some(sender) = crypt_state_sender.take() { + let _ = sender.send( + crypt_state + .take() + .expect("Server didn't send us any CryptSetup packet!"), + ); + } + let mut state = state.lock().unwrap(); + let server = state.server_mut(); + server.parse_server_sync(msg); + match &server.welcome_text { + Some(s) => info!("Welcome: {}", s), + None => info!("No welcome received"), + } + for (_, channel) in server.channels() { + info!("Found channel {}", channel.name()); + } + state.initialized(); + } + ControlPacket::Reject(msg) => { + warn!("Login rejected: {:?}", msg); + } + ControlPacket::UserState(msg) => { + let mut state = state.lock().unwrap(); + let session = msg.get_session(); + state.audio_mut().add_client(msg.get_session()); //TODO + if *state.phase_receiver().borrow() == StatePhase::Connecting { + state.parse_initial_user_state(msg); + } else { + state.server_mut().parse_user_state(msg); + } + let server = state.server_mut(); + let user = server.users().get(&session).unwrap(); + info!("User {} connected to {}", + user.name(), + user.channel()); + } + ControlPacket::UserRemove(msg) => { + info!("User {} left", msg.get_session()); + state.lock().unwrap().audio_mut().remove_client(msg.get_session()); + } + ControlPacket::ChannelState(msg) => { + debug!("Channel state received"); + state.lock().unwrap().server_mut().parse_channel_state(msg); //TODO parse initial if initial + } + ControlPacket::ChannelRemove(msg) => { + state.lock().unwrap().server_mut().parse_channel_remove(msg); + } + _ => {} + } } - let server = state.server_mut(); - let user = server.users().get(&session).unwrap(); - info!("User {} connected to {}", - user.name(), - user.channel()); - } - ControlPacket::UserRemove(msg) => { - info!("User {} left", msg.get_session()); - state.lock().unwrap().audio_mut().remove_client(msg.get_session()); - } - ControlPacket::ChannelState(msg) => { - debug!("Channel state received"); - state.lock().unwrap().server_mut().parse_channel_state(msg); //TODO parse initial if initial - } - ControlPacket::ChannelRemove(msg) => { - state.lock().unwrap().server_mut().parse_channel_remove(msg); } - _ => {} } - } + + //TODO? clean up stream + }; + + join!(phase_transition_block, listener_block); + + debug!("Killing TCP listener block"); } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index cf0305b..ab4ca1d 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,10 +1,9 @@ use crate::network::ConnectionInfo; -use crate::state::State; +use crate::state::{State, StatePhase}; use log::*; use bytes::Bytes; -use futures::channel::oneshot; -use futures::{join, SinkExt, StreamExt}; +use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; @@ -12,7 +11,7 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::watch; +use tokio::sync::{watch, oneshot}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; @@ -58,17 +57,80 @@ pub async fn handle( send_ping(&mut sink, connection_info.socket_addr).await; let sink = Arc::new(Mutex::new(sink)); + + let phase_watcher = state.lock().unwrap().phase_receiver(); join!( - listen(Arc::clone(&state), source), - send_voice(state, sink, connection_info.socket_addr), + listen(Arc::clone(&state), source, phase_watcher.clone()), + send_voice(state, sink, connection_info.socket_addr, phase_watcher), ); + + debug!("Fully disconnected UPD stream"); } async fn listen( state: Arc>, mut source: UdpReceiver, + mut phase_watcher: watch::Receiver, ) { - while let Some(packet) = source.next().await { + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + loop { + let packet_recv = source.next().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + warn!("Channel closed before disconnect command"); + break; + } + Some(Some(packet)) => { + let (packet, _src_addr) = match packet { + Ok(packet) => packet, + Err(err) => { + warn!("Got an invalid UDP packet: {}", err); + // To be expected, considering this is the internet, just ignore it + continue; + } + }; + match packet { + VoicePacket::Ping { .. } => { + // Note: A normal application would handle these and only use UDP for voice + // once it has received one. + continue; + } + VoicePacket::Audio { + session_id, + // seq_num, + payload, + // position_info, + .. + } => { + state.lock().unwrap().audio().decode_packet(session_id, payload); + } + } + } + } + } + }; + + join!(main_block, phase_transition_block); + + debug!("UDP listener process killed"); + + /*while let Some(packet) = source.next().await { let (packet, _src_addr) = match packet { Ok(packet) => packet, Err(err) => { @@ -93,7 +155,7 @@ async fn listen( state.lock().unwrap().audio().decode_packet(session_id, payload); } } - } + }*/ } async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { @@ -116,25 +178,57 @@ async fn send_voice( state: Arc>, sink: Arc>, server_addr: SocketAddr, + mut phase_watcher: watch::Receiver, ) { let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); - let mut count = 0; - while let Some(payload) = receiver.recv().await { - let reply = VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: count, - payload, - position_info: None, - }; - count += 1; - sink.lock() - .unwrap() - .send((reply, server_addr)) - .await - .unwrap(); - } + let (tx, rx) = oneshot::channel(); + let phase_transition_block = async { + while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + tx.send(true); + }; + + let main_block = async { + let rx = rx.fuse(); + pin_mut!(rx); + let mut count = 0; + loop { + let packet_recv = receiver.recv().fuse(); + pin_mut!(packet_recv); + let exitor = select! { + data = packet_recv => Some(data), + _ = rx => None + }; + match exitor { + None => { + break; + } + Some(None) => { + warn!("Channel closed before disconnect command"); + break; + } + Some(Some(payload)) => { + let reply = VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: count, + payload, + position_info: None, + }; + count += 1; + sink.lock() + .unwrap() + .send((reply, server_addr)) + .await + .unwrap(); + } + } + } + }; + + join!(main_block, phase_transition_block); + + debug!("UDP listener process killed"); } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index ef1cd6d..0de29f1 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -100,7 +100,10 @@ impl State { server_state: self.server.clone(), }))) } - _ => { (false, Ok(None)) } + Command::ServerDisconnect => { + self.phase_watcher.0.broadcast(StatePhase::Disconnected).unwrap(); + (false, Ok(None)) + } } } -- cgit v1.2.1 From e4406676a28f2dfb756f8f9e38a4242166f19c0e Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 17:02:49 +0200 Subject: resolve some compiler warnings --- mumd/src/main.rs | 7 ++----- mumd/src/network/tcp.rs | 9 ++++----- mumd/src/network/udp.rs | 4 ++-- mumd/src/state.rs | 3 +-- 4 files changed, 9 insertions(+), 14 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index c923857..812e7a1 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -12,17 +12,14 @@ use argparse::Store; use argparse::StoreTrue; use colored::*; use tokio::sync::oneshot; -use futures::{join, select}; +use futures::join; use log::*; use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; -use std::thread; use std::time::Duration; -use tokio::stream::StreamExt; -use futures::FutureExt; #[tokio::main] async fn main() { @@ -85,7 +82,7 @@ async fn main() { command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert}); //command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); command_sender.send(Command::ChannelList).unwrap(); - let state = State::new(packet_sender, command_sender.clone(), connection_info_sender, username); + let state = State::new(packet_sender, command_sender.clone(), connection_info_sender); let state = Arc::new(Mutex::new(state)); // Run it diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 0aca19e..1e0feee 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -16,7 +16,6 @@ use tokio::sync::{mpsc, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; -use futures_util::core_reexport::cell::RefCell; type TcpSender = SplitSink< Framed, ControlCodec>, @@ -44,7 +43,7 @@ pub async fn handle( .await; // Handshake (omitting `Version` message for brevity) - let mut state_lock = state.lock().unwrap(); + let state_lock = state.lock().unwrap(); authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; let phase_watcher = state_lock.phase_receiver(); let packet_sender = state_lock.packet_sender(); @@ -102,7 +101,7 @@ async fn send_pings( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} - tx.send(true); + tx.send(true).unwrap(); }; let mut interval = time::interval(Duration::from_secs(delay_seconds)); @@ -141,7 +140,7 @@ async fn send_packets( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} - tx.send(true); + tx.send(true).unwrap(); }; let main_block = async { @@ -191,7 +190,7 @@ async fn listen( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} - tx.send(true); + tx.send(true).unwrap(); }; let listener_block = async { diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index ab4ca1d..a757a2b 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -75,7 +75,7 @@ async fn listen( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} - tx.send(true); + tx.send(true).unwrap(); }; let main_block = async { @@ -185,7 +185,7 @@ async fn send_voice( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} - tx.send(true); + tx.send(true).unwrap(); }; let main_block = async { diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 0de29f1..016783b 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -36,7 +36,6 @@ impl State { packet_sender: mpsc::UnboundedSender>, command_sender: mpsc::UnboundedSender, connection_info_sender: watch::Sender>, - username: String, ) -> Self { Self { server: Server::new(), @@ -87,7 +86,7 @@ impl State { socket_addr, host, accept_invalid_cert, - ))); + ))).unwrap(); (true, Ok(None)) } Command::Status => { -- cgit v1.2.1 From ab0cdc240c65fdc6b764ed17f6611786d449acc3 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 17:45:04 +0200 Subject: add support for reconnecting to server --- mumd/src/audio.rs | 10 +++-- mumd/src/command.rs | 2 +- mumd/src/main.rs | 19 ++++----- mumd/src/network/tcp.rs | 65 +++++++++++++++--------------- mumd/src/network/udp.rs | 105 ++++++++++++++++++------------------------------ 5 files changed, 91 insertions(+), 110 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index e13845e..aa06a9d 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -293,9 +293,13 @@ fn input_callback( .unwrap(); opus_buf.truncate(result); let bytes = Bytes::copy_from_slice(&opus_buf); - input_sender - .try_send(VoicePacketPayload::Opus(bytes, false)) - .unwrap(); //TODO handle full buffer / disconnect + match input_sender + .try_send(VoicePacketPayload::Opus(bytes, false)) { //TODO handle full buffer / disconnect + Ok(_) => {}, + Err(_e) => { + //warn!("Error sending audio packet: {:?}", e); + } + } *buf = tail; } } diff --git a/mumd/src/command.rs b/mumd/src/command.rs index bfdb7dd..1104671 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; use log::*; -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum Command { ChannelJoin { channel_id: u32, diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 812e7a1..6d435fa 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -11,7 +11,6 @@ use argparse::ArgumentParser; use argparse::Store; use argparse::StoreTrue; use colored::*; -use tokio::sync::oneshot; use futures::join; use log::*; use mumble_protocol::control::ControlPacket; @@ -72,16 +71,12 @@ async fn main() { // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. - let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::(); + let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::(1); // crypt state should always be consumed before sending a new one let (packet_sender, packet_receiver) = mpsc::unbounded_channel::>(); let (command_sender, command_receiver) = mpsc::unbounded_channel::(); let (command_response_sender, command_response_receiver) = mpsc::unbounded_channel::, ()>>(); let (connection_info_sender, connection_info_receiver) = watch::channel::>(None); - command_sender.send(Command::ChannelList).unwrap(); - command_sender.send(Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert}); - //command_sender.send(Command::ChannelJoin{channel_id: 1}).unwrap(); - command_sender.send(Command::ChannelList).unwrap(); let state = State::new(packet_sender, command_sender.clone(), connection_info_sender); let state = Arc::new(Mutex::new(state)); @@ -104,7 +99,8 @@ async fn main() { command_response_sender, ), send_commands( - command_sender + command_sender, + Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert} ), receive_command_responses( command_response_receiver, @@ -112,8 +108,13 @@ async fn main() { ); } -async fn send_commands(command_sender: mpsc::UnboundedSender) { - tokio::time::delay_for(Duration::from_secs(5)).await; +async fn send_commands(command_sender: mpsc::UnboundedSender, connect_command: Command) { + command_sender.send(connect_command.clone()); + tokio::time::delay_for(Duration::from_secs(2)).await; + command_sender.send(Command::ServerDisconnect); + tokio::time::delay_for(Duration::from_secs(2)).await; + command_sender.send(connect_command.clone()); + tokio::time::delay_for(Duration::from_secs(2)).await; command_sender.send(Command::ServerDisconnect); debug!("Finished sending commands"); diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 1e0feee..d45b49d 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -2,7 +2,6 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; -use tokio::sync::oneshot; use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; @@ -12,7 +11,7 @@ use std::convert::{Into, TryInto}; use std::net::{SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, oneshot}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -27,37 +26,39 @@ type TcpReceiver = pub async fn handle( state: Arc>, mut connection_info_receiver: watch::Receiver>, - crypt_state_sender: oneshot::Sender, - packet_receiver: mpsc::UnboundedReceiver>, + crypt_state_sender: mpsc::Sender, + mut packet_receiver: mpsc::UnboundedReceiver>, ) { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { return; } - Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } - } - }; - let (mut sink, stream) = connect(connection_info.socket_addr, - connection_info.hostname, - connection_info.accept_invalid_cert) - .await; + loop { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; + let (mut sink, stream) = connect(connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert) + .await; - // Handshake (omitting `Version` message for brevity) - let state_lock = state.lock().unwrap(); - authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; - let phase_watcher = state_lock.phase_receiver(); - let packet_sender = state_lock.packet_sender(); - drop(state_lock); + // Handshake (omitting `Version` message for brevity) + let state_lock = state.lock().unwrap(); + authenticate(&mut sink, state_lock.username().unwrap().to_string()).await; + let phase_watcher = state_lock.phase_receiver(); + let packet_sender = state_lock.packet_sender(); + drop(state_lock); - info!("Logging in..."); + info!("Logging in..."); - join!( - send_pings(packet_sender, 10, phase_watcher.clone()), - listen(state, stream, crypt_state_sender, phase_watcher.clone()), - send_packets(sink, packet_receiver, phase_watcher), - ); + join!( + send_pings(packet_sender, 10, phase_watcher.clone()), + listen(Arc::clone(&state), stream, crypt_state_sender.clone(), phase_watcher.clone()), + send_packets(sink, &mut packet_receiver, phase_watcher), + ); - debug!("Fully disconnected TCP stream"); + debug!("Fully disconnected TCP stream, waiting for new connection info"); + } } async fn connect( @@ -134,7 +135,7 @@ async fn send_pings( async fn send_packets( mut sink: TcpSender, - mut packet_receiver: mpsc::UnboundedReceiver>, + packet_receiver: &mut mpsc::UnboundedReceiver>, mut phase_watcher: watch::Receiver, ) { let (tx, rx) = oneshot::channel(); @@ -181,7 +182,7 @@ async fn send_packets( async fn listen( state: Arc>, mut stream: TcpReceiver, - crypt_state_sender: oneshot::Sender, + crypt_state_sender: mpsc::Sender, mut phase_watcher: watch::Receiver, ) { let mut crypt_state = None; @@ -238,12 +239,12 @@ async fn listen( } ControlPacket::ServerSync(msg) => { info!("Logged in"); - if let Some(sender) = crypt_state_sender.take() { + if let Some(mut sender) = crypt_state_sender.take() { let _ = sender.send( crypt_state .take() .expect("Server didn't send us any CryptSetup packet!"), - ); + ).await; } let mut state = state.lock().unwrap(); let server = state.server_mut(); diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index a757a2b..45e6e80 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -11,14 +11,48 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::{watch, oneshot}; +use tokio::sync::{watch, oneshot, mpsc}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; type UdpReceiver = SplitStream>; +pub async fn handle( + state: Arc>, + mut connection_info_receiver: watch::Receiver>, + mut crypt_state: mpsc::Receiver, +) { + let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); + + loop { + let connection_info = loop { + match connection_info_receiver.recv().await { + None => { return; } + Some(None) => {} + Some(Some(connection_info)) => { break connection_info; } + } + }; + let (mut sink, source) = connect(&mut crypt_state).await; + + // Note: A normal application would also send periodic Ping packets, and its own audio + // via UDP. We instead trick the server into accepting us by sending it one + // dummy voice packet. + send_ping(&mut sink, connection_info.socket_addr).await; + + let sink = Arc::new(Mutex::new(sink)); + + let phase_watcher = state.lock().unwrap().phase_receiver(); + join!( + listen(Arc::clone(&state), source, phase_watcher.clone()), + send_voice(sink, connection_info.socket_addr, phase_watcher, &mut receiver), + ); + + debug!("Fully disconnected UDP stream, waiting for new connection info"); + } +} + pub async fn connect( - crypt_state: oneshot::Receiver, + crypt_state: &mut mpsc::Receiver, ) -> (UdpSender, UdpReceiver) { // Bind UDP socket let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) @@ -26,10 +60,10 @@ pub async fn connect( .expect("Failed to bind UDP socket"); // Wait for initial CryptState - let crypt_state = match crypt_state.await { - Ok(crypt_state) => crypt_state, + let crypt_state = match crypt_state.recv().await { + Some(crypt_state) => crypt_state, // disconnected before we received the CryptSetup packet, oh well - Err(_) => panic!("disconnect before crypt packet received"), //TODO exit gracefully + None => panic!("disconnect before crypt packet received"), //TODO exit gracefully }; debug!("UDP connected"); @@ -37,36 +71,6 @@ pub async fn connect( UdpFramed::new(udp_socket, crypt_state).split() } -pub async fn handle( - state: Arc>, - mut connection_info_receiver: watch::Receiver>, - crypt_state: oneshot::Receiver, -) { - let connection_info = loop { - match connection_info_receiver.recv().await { - None => { return; } - Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } - } - }; - let (mut sink, source) = connect(crypt_state).await; - - // Note: A normal application would also send periodic Ping packets, and its own audio - // via UDP. We instead trick the server into accepting us by sending it one - // dummy voice packet. - send_ping(&mut sink, connection_info.socket_addr).await; - - let sink = Arc::new(Mutex::new(sink)); - - let phase_watcher = state.lock().unwrap().phase_receiver(); - join!( - listen(Arc::clone(&state), source, phase_watcher.clone()), - send_voice(state, sink, connection_info.socket_addr, phase_watcher), - ); - - debug!("Fully disconnected UPD stream"); -} - async fn listen( state: Arc>, mut source: UdpReceiver, @@ -129,33 +133,6 @@ async fn listen( join!(main_block, phase_transition_block); debug!("UDP listener process killed"); - - /*while let Some(packet) = source.next().await { - let (packet, _src_addr) = match packet { - Ok(packet) => packet, - Err(err) => { - warn!("Got an invalid UDP packet: {}", err); - // To be expected, considering this is the internet, just ignore it - continue; - } - }; - match packet { - VoicePacket::Ping { .. } => { - // Note: A normal application would handle these and only use UDP for voice - // once it has received one. - continue; - } - VoicePacket::Audio { - session_id, - // seq_num, - payload, - // position_info, - .. - } => { - state.lock().unwrap().audio().decode_packet(session_id, payload); - } - } - }*/ } async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { @@ -175,13 +152,11 @@ async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { } async fn send_voice( - state: Arc>, sink: Arc>, server_addr: SocketAddr, mut phase_watcher: watch::Receiver, + receiver: &mut mpsc::Receiver, ) { - let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); - let (tx, rx) = oneshot::channel(); let phase_transition_block = async { while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} -- cgit v1.2.1 From 17b84132b72cb45785738270ba5982889d447222 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 14 Oct 2020 19:04:11 +0200 Subject: update server so that we clear information once we leave a server --- mumd/src/network/tcp.rs | 10 +++++----- mumd/src/state.rs | 17 +++++++++++------ 2 files changed, 16 insertions(+), 11 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index d45b49d..0a53266 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -247,7 +247,7 @@ async fn listen( ).await; } let mut state = state.lock().unwrap(); - let server = state.server_mut(); + let server = state.server_mut().unwrap(); server.parse_server_sync(msg); match &server.welcome_text { Some(s) => info!("Welcome: {}", s), @@ -268,9 +268,9 @@ async fn listen( if *state.phase_receiver().borrow() == StatePhase::Connecting { state.parse_initial_user_state(msg); } else { - state.server_mut().parse_user_state(msg); + state.server_mut().unwrap().parse_user_state(msg); } - let server = state.server_mut(); + let server = state.server_mut().unwrap(); let user = server.users().get(&session).unwrap(); info!("User {} connected to {}", user.name(), @@ -282,10 +282,10 @@ async fn listen( } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - state.lock().unwrap().server_mut().parse_channel_state(msg); //TODO parse initial if initial + state.lock().unwrap().server_mut().unwrap().parse_channel_state(msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { - state.lock().unwrap().server_mut().parse_channel_remove(msg); + state.lock().unwrap().server_mut().unwrap().parse_channel_remove(msg); } _ => {} } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 016783b..8371be9 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -18,7 +18,7 @@ pub enum StatePhase { } pub struct State { - server: Server, + server: Option, audio: Audio, packet_sender: mpsc::UnboundedSender>, @@ -38,7 +38,7 @@ impl State { connection_info_sender: watch::Sender>, ) -> Self { Self { - server: Server::new(), + server: None, audio: Audio::new(), packet_sender, command_sender, @@ -68,13 +68,14 @@ impl State { warn!("Not connected"); return (false, Err(())); } - (false, Ok(Some(CommandResponse::ChannelList{channels: self.server.channels.clone()}))) + (false, Ok(Some(CommandResponse::ChannelList{channels: self.server.as_ref().unwrap().channels.clone()}))) } Command::ServerConnect{host, port, username, accept_invalid_cert} => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Disconnected) { warn!("Tried to connect to a server while already connected"); return (false, Err(())); } + self.server = Some(Server::new()); self.username = Some(username); self.phase_watcher.0.broadcast(StatePhase::Connecting).unwrap(); let socket_addr = (host.as_ref(), port) @@ -96,10 +97,14 @@ impl State { } (false, Ok(Some(CommandResponse::Status{ username: self.username.clone(), - server_state: self.server.clone(), + server_state: self.server.clone().unwrap(), }))) } Command::ServerDisconnect => { + self.session_id = None; + self.username = None; + self.server = None; + self.phase_watcher.0.broadcast(StatePhase::Disconnected).unwrap(); (false, Ok(None)) } @@ -132,7 +137,7 @@ impl State { } } } - self.server.parse_user_state(msg); + self.server.as_mut().unwrap().parse_user_state(msg); } pub fn initialized(&self) { @@ -143,7 +148,7 @@ impl State { pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio } pub fn packet_sender(&self) -> mpsc::UnboundedSender> { self.packet_sender.clone() } pub fn phase_receiver(&self) -> watch::Receiver { self.phase_watcher.1.clone() } - pub fn server_mut(&mut self) -> &mut Server { &mut self.server } + pub fn server_mut(&mut self) -> Option<&mut Server> { self.server.as_mut() } pub fn username(&self) -> Option<&String> { self.username.as_ref() } } -- cgit v1.2.1 From af272afbcd9e0e283b88f37f2bf3d7b4da604321 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 19:18:13 +0200 Subject: update debug message --- mumd/src/network/udp.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 45e6e80..31e33e3 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -63,7 +63,7 @@ pub async fn connect( let crypt_state = match crypt_state.recv().await { Some(crypt_state) => crypt_state, // disconnected before we received the CryptSetup packet, oh well - None => panic!("disconnect before crypt packet received"), //TODO exit gracefully + None => panic!("Disconnect before crypt packet received"), //TODO exit gracefully }; debug!("UDP connected"); @@ -204,6 +204,6 @@ async fn send_voice( join!(main_block, phase_transition_block); - debug!("UDP listener process killed"); + debug!("UDP sender process killed"); } -- cgit v1.2.1 From 8f32d34f1cf31cfd10d07e623842dd3f7fc86e8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 19:29:34 +0200 Subject: cargo fmt --- mumd/src/audio.rs | 49 +++++++++-------- mumd/src/command.rs | 4 +- mumd/src/main.rs | 48 ++++++++++------- mumd/src/network/mod.rs | 6 +-- mumd/src/network/tcp.rs | 82 ++++++++++++++++++++--------- mumd/src/network/udp.rs | 36 +++++++++---- mumd/src/state.rs | 136 +++++++++++++++++++++++++++++++----------------- 7 files changed, 234 insertions(+), 127 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index aa06a9d..58424b6 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -103,26 +103,32 @@ impl Audio { let input_stream = match input_supported_sample_format { SampleFormat::F32 => input_device.build_input_stream( &input_config, - input_callback::(input_encoder, - input_sender, - input_config.sample_rate.0, - 10.0), + input_callback::( + input_encoder, + input_sender, + input_config.sample_rate.0, + 10.0, + ), err_fn, ), SampleFormat::I16 => input_device.build_input_stream( &input_config, - input_callback::(input_encoder, - input_sender, - input_config.sample_rate.0, - 10.0), + input_callback::( + input_encoder, + input_sender, + input_config.sample_rate.0, + 10.0, + ), err_fn, ), SampleFormat::U16 => input_device.build_input_stream( &input_config, - input_callback::(input_encoder, - input_sender, - input_config.sample_rate.0, - 10.0), + input_callback::( + input_encoder, + input_sender, + input_config.sample_rate.0, + 10.0, + ), err_fn, ), } @@ -207,7 +213,8 @@ impl ClientStream { match payload { VoicePacketPayload::Opus(bytes, _eot) => { let mut out: Vec = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode - let parsed = self.opus_decoder + let parsed = self + .opus_decoder .decode_float(&bytes, &mut out, false) .expect("Error decoding"); out.truncate(parsed); @@ -271,15 +278,15 @@ fn input_callback( sample_rate: u32, opus_frame_size_ms: f32, ) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { - if ! ( opus_frame_size_ms == 2.5 - || opus_frame_size_ms == 5.0 - || opus_frame_size_ms == 10.0 - || opus_frame_size_ms == 20.0) { + if !(opus_frame_size_ms == 2.5 + || opus_frame_size_ms == 5.0 + || opus_frame_size_ms == 10.0 + || opus_frame_size_ms == 20.0) + { panic!("Unsupported opus frame size {}", opus_frame_size_ms); } let opus_frame_size = (opus_frame_size_ms * sample_rate as f32) as u32 / 1000; - let buf = Arc::new(Mutex::new(VecDeque::new())); move |data: &[T], _info: &InputCallbackInfo| { let mut buf = buf.lock().unwrap(); @@ -293,9 +300,9 @@ fn input_callback( .unwrap(); opus_buf.truncate(result); let bytes = Bytes::copy_from_slice(&opus_buf); - match input_sender - .try_send(VoicePacketPayload::Opus(bytes, false)) { //TODO handle full buffer / disconnect - Ok(_) => {}, + match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) { + //TODO handle full buffer / disconnect + Ok(_) => {} Err(_e) => { //warn!("Error sending audio packet: {:?}", e); } diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 1104671..b4bd1b7 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,9 +1,9 @@ use crate::state::{Channel, Server, State, StatePhase}; +use log::*; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; -use log::*; #[derive(Clone, Debug)] pub enum Command { @@ -29,7 +29,7 @@ pub enum CommandResponse { Status { username: Option, server_state: Server, - } + }, } pub async fn handle( diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 6d435fa..797b71f 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -1,10 +1,10 @@ mod audio; -mod network; mod command; +mod network; mod state; -use crate::network::ConnectionInfo; use crate::command::{Command, CommandResponse}; +use crate::network::ConnectionInfo; use crate::state::State; use argparse::ArgumentParser; @@ -17,8 +17,8 @@ use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::Serverbound; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, watch}; use std::time::Duration; +use tokio::sync::{mpsc, watch}; #[tokio::main] async fn main() { @@ -31,20 +31,25 @@ async fn main() { //TODO runtime flag that disables color match record.level() { Level::Error => "ERROR".red(), - Level::Warn => "WARN ".yellow(), - Level::Info => "INFO ".normal(), + Level::Warn => "WARN ".yellow(), + Level::Info => "INFO ".normal(), Level::Debug => "DEBUG".green(), Level::Trace => "TRACE".normal(), }, record.file().unwrap(), record.line().unwrap(), - if message.chars().any(|e| e == '\n') { "\n" } else { " " }, + if message.chars().any(|e| e == '\n') { + "\n" + } else { + " " + }, message )) }) .level(log::LevelFilter::Debug) .chain(std::io::stderr()) - .apply().unwrap(); + .apply() + .unwrap(); // Handle command line arguments let mut server_host = "".to_string(); @@ -74,10 +79,16 @@ async fn main() { let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::(1); // crypt state should always be consumed before sending a new one let (packet_sender, packet_receiver) = mpsc::unbounded_channel::>(); let (command_sender, command_receiver) = mpsc::unbounded_channel::(); - let (command_response_sender, command_response_receiver) = mpsc::unbounded_channel::, ()>>(); - let (connection_info_sender, connection_info_receiver) = watch::channel::>(None); + let (command_response_sender, command_response_receiver) = + mpsc::unbounded_channel::, ()>>(); + let (connection_info_sender, connection_info_receiver) = + watch::channel::>(None); - let state = State::new(packet_sender, command_sender.clone(), connection_info_sender); + let state = State::new( + packet_sender, + command_sender.clone(), + connection_info_sender, + ); let state = Arc::new(Mutex::new(state)); // Run it @@ -93,18 +104,17 @@ async fn main() { connection_info_receiver.clone(), crypt_state_receiver, ), - command::handle( - state, - command_receiver, - command_response_sender, - ), + command::handle(state, command_receiver, command_response_sender,), send_commands( command_sender, - Command::ServerConnect{host: server_host, port: server_port, username: username.clone(), accept_invalid_cert} - ), - receive_command_responses( - command_response_receiver, + Command::ServerConnect { + host: server_host, + port: server_port, + username: username.clone(), + accept_invalid_cert + } ), + receive_command_responses(command_response_receiver,), ); } diff --git a/mumd/src/network/mod.rs b/mumd/src/network/mod.rs index 777faad..1a31ee2 100644 --- a/mumd/src/network/mod.rs +++ b/mumd/src/network/mod.rs @@ -11,11 +11,7 @@ pub struct ConnectionInfo { } impl ConnectionInfo { - pub fn new( - socket_addr: SocketAddr, - hostname: String, - accept_invalid_cert: bool, - ) -> Self { + pub fn new(socket_addr: SocketAddr, hostname: String, accept_invalid_cert: bool) -> Self { Self { socket_addr, hostname, diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index 0a53266..e096843 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -2,16 +2,16 @@ use crate::network::ConnectionInfo; use crate::state::{State, StatePhase}; use log::*; -use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::{Clientbound, Serverbound}; use std::convert::{Into, TryInto}; -use std::net::{SocketAddr}; +use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, watch, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -32,15 +32,21 @@ pub async fn handle( loop { let connection_info = loop { match connection_info_receiver.recv().await { - None => { return; } + None => { + return; + } Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } + Some(Some(connection_info)) => { + break connection_info; + } } }; - let (mut sink, stream) = connect(connection_info.socket_addr, - connection_info.hostname, - connection_info.accept_invalid_cert) - .await; + let (mut sink, stream) = connect( + connection_info.socket_addr, + connection_info.hostname, + connection_info.accept_invalid_cert, + ) + .await; // Handshake (omitting `Version` message for brevity) let state_lock = state.lock().unwrap(); @@ -53,7 +59,12 @@ pub async fn handle( join!( send_pings(packet_sender, 10, phase_watcher.clone()), - listen(Arc::clone(&state), stream, crypt_state_sender.clone(), phase_watcher.clone()), + listen( + Arc::clone(&state), + stream, + crypt_state_sender.clone(), + phase_watcher.clone() + ), send_packets(sink, &mut packet_receiver, phase_watcher), ); @@ -101,7 +112,10 @@ async fn send_pings( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -140,7 +154,10 @@ async fn send_packets( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -190,7 +207,10 @@ async fn listen( let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -240,11 +260,13 @@ async fn listen( ControlPacket::ServerSync(msg) => { info!("Logged in"); if let Some(mut sender) = crypt_state_sender.take() { - let _ = sender.send( - crypt_state - .take() - .expect("Server didn't send us any CryptSetup packet!"), - ).await; + let _ = sender + .send( + crypt_state + .take() + .expect("Server didn't send us any CryptSetup packet!"), + ) + .await; } let mut state = state.lock().unwrap(); let server = state.server_mut().unwrap(); @@ -272,20 +294,32 @@ async fn listen( } let server = state.server_mut().unwrap(); let user = server.users().get(&session).unwrap(); - info!("User {} connected to {}", - user.name(), - user.channel()); + info!("User {} connected to {}", user.name(), user.channel()); } ControlPacket::UserRemove(msg) => { info!("User {} left", msg.get_session()); - state.lock().unwrap().audio_mut().remove_client(msg.get_session()); + state + .lock() + .unwrap() + .audio_mut() + .remove_client(msg.get_session()); } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - state.lock().unwrap().server_mut().unwrap().parse_channel_state(msg); //TODO parse initial if initial + state + .lock() + .unwrap() + .server_mut() + .unwrap() + .parse_channel_state(msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { - state.lock().unwrap().server_mut().unwrap().parse_channel_remove(msg); + state + .lock() + .unwrap() + .server_mut() + .unwrap() + .parse_channel_remove(msg); } _ => {} } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 31e33e3..4f96c4c 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -3,7 +3,7 @@ use crate::state::{State, StatePhase}; use log::*; use bytes::Bytes; -use futures::{join, select, pin_mut, SinkExt, StreamExt, FutureExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::{VoicePacket, VoicePacketPayload}; @@ -11,7 +11,7 @@ use mumble_protocol::Serverbound; use std::net::{Ipv6Addr, SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::{watch, oneshot, mpsc}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::udp::UdpFramed; type UdpSender = SplitSink, (VoicePacket, SocketAddr)>; @@ -27,9 +27,13 @@ pub async fn handle( loop { let connection_info = loop { match connection_info_receiver.recv().await { - None => { return; } + None => { + return; + } Some(None) => {} - Some(Some(connection_info)) => { break connection_info; } + Some(Some(connection_info)) => { + break connection_info; + } } }; let (mut sink, source) = connect(&mut crypt_state).await; @@ -44,7 +48,12 @@ pub async fn handle( let phase_watcher = state.lock().unwrap().phase_receiver(); join!( listen(Arc::clone(&state), source, phase_watcher.clone()), - send_voice(sink, connection_info.socket_addr, phase_watcher, &mut receiver), + send_voice( + sink, + connection_info.socket_addr, + phase_watcher, + &mut receiver + ), ); debug!("Fully disconnected UDP stream, waiting for new connection info"); @@ -78,7 +87,10 @@ async fn listen( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -122,7 +134,11 @@ async fn listen( // position_info, .. } => { - state.lock().unwrap().audio().decode_packet(session_id, payload); + state + .lock() + .unwrap() + .audio() + .decode_packet(session_id, payload); } } } @@ -159,7 +175,10 @@ async fn send_voice( ) { let (tx, rx) = oneshot::channel(); let phase_transition_block = async { - while !matches!(phase_watcher.recv().await.unwrap(), StatePhase::Disconnected) {} + while !matches!( + phase_watcher.recv().await.unwrap(), + StatePhase::Disconnected + ) {} tx.send(true).unwrap(); }; @@ -206,4 +225,3 @@ async fn send_voice( debug!("UDP sender process killed"); } - diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 8371be9..69a462d 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -1,12 +1,12 @@ -use log::*; use crate::audio::Audio; use crate::command::{Command, CommandResponse}; use crate::network::ConnectionInfo; +use log::*; use mumble_protocol::control::msgs; use mumble_protocol::control::ControlPacket; use mumble_protocol::voice::Serverbound; -use std::collections::HashMap; use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::net::ToSocketAddrs; use tokio::sync::{mpsc, watch}; @@ -50,9 +50,12 @@ impl State { } //TODO? move bool inside Result - pub async fn handle_command(&mut self, command: Command) -> (bool, Result, ()>) { + pub async fn handle_command( + &mut self, + command: Command, + ) -> (bool, Result, ()>) { match command { - Command::ChannelJoin{channel_id} => { + Command::ChannelJoin { channel_id } => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) { warn!("Not connected"); return (false, Err(())); @@ -68,26 +71,41 @@ impl State { warn!("Not connected"); return (false, Err(())); } - (false, Ok(Some(CommandResponse::ChannelList{channels: self.server.as_ref().unwrap().channels.clone()}))) + ( + false, + Ok(Some(CommandResponse::ChannelList { + channels: self.server.as_ref().unwrap().channels.clone(), + })), + ) } - Command::ServerConnect{host, port, username, accept_invalid_cert} => { + Command::ServerConnect { + host, + port, + username, + accept_invalid_cert, + } => { if !matches!(*self.phase_receiver().borrow(), StatePhase::Disconnected) { warn!("Tried to connect to a server while already connected"); return (false, Err(())); } self.server = Some(Server::new()); self.username = Some(username); - self.phase_watcher.0.broadcast(StatePhase::Connecting).unwrap(); + self.phase_watcher + .0 + .broadcast(StatePhase::Connecting) + .unwrap(); let socket_addr = (host.as_ref(), port) .to_socket_addrs() .expect("Failed to parse server address") .next() .expect("Failed to resolve server address"); - self.connection_info_sender.broadcast(Some(ConnectionInfo::new( - socket_addr, - host, - accept_invalid_cert, - ))).unwrap(); + self.connection_info_sender + .broadcast(Some(ConnectionInfo::new( + socket_addr, + host, + accept_invalid_cert, + ))) + .unwrap(); (true, Ok(None)) } Command::Status => { @@ -95,17 +113,23 @@ impl State { warn!("Not connected"); return (false, Err(())); } - (false, Ok(Some(CommandResponse::Status{ - username: self.username.clone(), - server_state: self.server.clone().unwrap(), - }))) + ( + false, + Ok(Some(CommandResponse::Status { + username: self.username.clone(), + server_state: self.server.clone().unwrap(), + })), + ) } Command::ServerDisconnect => { self.session_id = None; self.username = None; self.server = None; - self.phase_watcher.0.broadcast(StatePhase::Disconnected).unwrap(); + self.phase_watcher + .0 + .broadcast(StatePhase::Disconnected) + .unwrap(); (false, Ok(None)) } } @@ -127,9 +151,11 @@ impl State { } Some(session) => { if session != msg.get_session() { - error!("Got two different session IDs ({} and {}) for ourselves", + error!( + "Got two different session IDs ({} and {}) for ourselves", session, - msg.get_session()); + msg.get_session() + ); } else { debug!("Got our session ID twice"); } @@ -141,15 +167,30 @@ impl State { } pub fn initialized(&self) { - self.phase_watcher.0.broadcast(StatePhase::Connected).unwrap(); + self.phase_watcher + .0 + .broadcast(StatePhase::Connected) + .unwrap(); } - pub fn audio(&self) -> &Audio { &self.audio } - pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio } - pub fn packet_sender(&self) -> mpsc::UnboundedSender> { self.packet_sender.clone() } - pub fn phase_receiver(&self) -> watch::Receiver { self.phase_watcher.1.clone() } - pub fn server_mut(&mut self) -> Option<&mut Server> { self.server.as_mut() } - pub fn username(&self) -> Option<&String> { self.username.as_ref() } + pub fn audio(&self) -> &Audio { + &self.audio + } + pub fn audio_mut(&mut self) -> &mut Audio { + &mut self.audio + } + pub fn packet_sender(&self) -> mpsc::UnboundedSender> { + self.packet_sender.clone() + } + pub fn phase_receiver(&self) -> watch::Receiver { + self.phase_watcher.1.clone() + } + pub fn server_mut(&mut self) -> Option<&mut Server> { + self.server.as_mut() + } + pub fn username(&self) -> Option<&String> { + self.username.as_ref() + } } #[derive(Clone, Debug)] @@ -180,7 +221,9 @@ impl Server { return; } match self.channels.entry(msg.get_channel_id()) { - Entry::Vacant(e) => { e.insert(Channel::new(msg)); }, + Entry::Vacant(e) => { + e.insert(Channel::new(msg)); + } Entry::Occupied(mut e) => e.get_mut().parse_channel_state(msg), } } @@ -191,8 +234,12 @@ impl Server { return; } match self.channels.entry(msg.get_channel_id()) { - Entry::Vacant(_) => { warn!("Attempted to remove channel that doesn't exist"); } - Entry::Occupied(e) => { e.remove(); } + Entry::Vacant(_) => { + warn!("Attempted to remove channel that doesn't exist"); + } + Entry::Occupied(e) => { + e.remove(); + } } } @@ -202,7 +249,9 @@ impl Server { return; } match self.users.entry(msg.get_session()) { - Entry::Vacant(e) => { e.insert(User::new(msg)); }, + Entry::Vacant(e) => { + e.insert(User::new(msg)); + } Entry::Occupied(mut e) => e.get_mut().parse_user_state(msg), } } @@ -279,11 +328,11 @@ pub struct User { priority_speaker: bool, recording: bool, - suppress: bool, // by me + suppress: bool, // by me self_mute: bool, // by self self_deaf: bool, // by self - mute: bool, // by admin - deaf: bool, // by admin + mute: bool, // by admin + deaf: bool, // by admin } impl User { @@ -301,20 +350,13 @@ impl User { None }, name: msg.take_name(), - priority_speaker: msg.has_priority_speaker() - && msg.get_priority_speaker(), - recording: msg.has_recording() - && msg.get_recording(), - suppress: msg.has_suppress() - && msg.get_suppress(), - self_mute: msg.has_self_mute() - && msg.get_self_mute(), - self_deaf: msg.has_self_deaf() - && msg.get_self_deaf(), - mute: msg.has_mute() - && msg.get_mute(), - deaf: msg.has_deaf() - && msg.get_deaf(), + priority_speaker: msg.has_priority_speaker() && msg.get_priority_speaker(), + recording: msg.has_recording() && msg.get_recording(), + suppress: msg.has_suppress() && msg.get_suppress(), + self_mute: msg.has_self_mute() && msg.get_self_mute(), + self_deaf: msg.has_self_deaf() && msg.get_self_deaf(), + mute: msg.has_mute() && msg.get_mute(), + deaf: msg.has_deaf() && msg.get_deaf(), } } -- cgit v1.2.1 From afd537e085ddf2c92fb1f1879a72d290010fa570 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 19:42:28 +0200 Subject: cargo clippy --- mumd/src/audio.rs | 20 ++++++++++---------- mumd/src/main.rs | 8 ++++---- mumd/src/network/tcp.rs | 14 +++++++------- mumd/src/state.rs | 50 ++++++++++++++++++++++++------------------------- 4 files changed, 45 insertions(+), 47 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 58424b6..d8a37a8 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -107,7 +107,7 @@ impl Audio { input_encoder, input_sender, input_config.sample_rate.0, - 10.0, + 4, // 10 ms ), err_fn, ), @@ -117,7 +117,7 @@ impl Audio { input_encoder, input_sender, input_config.sample_rate.0, - 10.0, + 4, // 10 ms ), err_fn, ), @@ -127,7 +127,7 @@ impl Audio { input_encoder, input_sender, input_config.sample_rate.0, - 10.0, + 4, // 10 ms ), err_fn, ), @@ -276,16 +276,16 @@ fn input_callback( mut opus_encoder: opus::Encoder, mut input_sender: Sender, sample_rate: u32, - opus_frame_size_ms: f32, + opus_frame_size_blocks: u32, // blocks of 2.5ms ) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { - if !(opus_frame_size_ms == 2.5 - || opus_frame_size_ms == 5.0 - || opus_frame_size_ms == 10.0 - || opus_frame_size_ms == 20.0) + if !(opus_frame_size_blocks == 1 + || opus_frame_size_blocks == 2 + || opus_frame_size_blocks == 4 + || opus_frame_size_blocks == 8) { - panic!("Unsupported opus frame size {}", opus_frame_size_ms); + panic!("Unsupported amount of opus frame blocks {}", opus_frame_size_blocks); } - let opus_frame_size = (opus_frame_size_ms * sample_rate as f32) as u32 / 1000; + let opus_frame_size = opus_frame_size_blocks * sample_rate / 400; let buf = Arc::new(Mutex::new(VecDeque::new())); move |data: &[T], _info: &InputCallbackInfo| { diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 797b71f..f837a52 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -119,13 +119,13 @@ async fn main() { } async fn send_commands(command_sender: mpsc::UnboundedSender, connect_command: Command) { - command_sender.send(connect_command.clone()); + command_sender.send(connect_command.clone()).unwrap(); tokio::time::delay_for(Duration::from_secs(2)).await; - command_sender.send(Command::ServerDisconnect); + command_sender.send(Command::ServerDisconnect).unwrap(); tokio::time::delay_for(Duration::from_secs(2)).await; - command_sender.send(connect_command.clone()); + command_sender.send(connect_command.clone()).unwrap(); tokio::time::delay_for(Duration::from_secs(2)).await; - command_sender.send(Command::ServerDisconnect); + command_sender.send(Command::ServerDisconnect).unwrap(); debug!("Finished sending commands"); } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index e096843..6a369e5 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -186,7 +186,7 @@ async fn send_packets( } //clears queue of remaining packets - while let Ok(_) = packet_receiver.try_recv() {} + while packet_receiver.try_recv().is_ok() {} sink.close().await.unwrap(); }; @@ -270,12 +270,12 @@ async fn listen( } let mut state = state.lock().unwrap(); let server = state.server_mut().unwrap(); - server.parse_server_sync(msg); + server.parse_server_sync(*msg); match &server.welcome_text { Some(s) => info!("Welcome: {}", s), None => info!("No welcome received"), } - for (_, channel) in server.channels() { + for channel in server.channels().values() { info!("Found channel {}", channel.name()); } state.initialized(); @@ -288,9 +288,9 @@ async fn listen( let session = msg.get_session(); state.audio_mut().add_client(msg.get_session()); //TODO if *state.phase_receiver().borrow() == StatePhase::Connecting { - state.parse_initial_user_state(msg); + state.parse_initial_user_state(*msg); } else { - state.server_mut().unwrap().parse_user_state(msg); + state.server_mut().unwrap().parse_user_state(*msg); } let server = state.server_mut().unwrap(); let user = server.users().get(&session).unwrap(); @@ -311,7 +311,7 @@ async fn listen( .unwrap() .server_mut() .unwrap() - .parse_channel_state(msg); //TODO parse initial if initial + .parse_channel_state(*msg); //TODO parse initial if initial } ControlPacket::ChannelRemove(msg) => { state @@ -319,7 +319,7 @@ async fn listen( .unwrap() .server_mut() .unwrap() - .parse_channel_remove(msg); + .parse_channel_remove(*msg); } _ => {} } diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 69a462d..b6fe780 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -135,30 +135,28 @@ impl State { } } - pub fn parse_initial_user_state(&mut self, msg: Box) { + pub fn parse_initial_user_state(&mut self, msg: msgs::UserState) { if !msg.has_session() { warn!("Can't parse user state without session"); return; } if !msg.has_name() { warn!("Missing name in initial user state"); - } else { - if msg.get_name() == self.username.as_ref().unwrap() { - match self.session_id { - None => { - debug!("Found our session id: {}", msg.get_session()); - self.session_id = Some(msg.get_session()); - } - Some(session) => { - if session != msg.get_session() { - error!( - "Got two different session IDs ({} and {}) for ourselves", - session, - msg.get_session() - ); - } else { - debug!("Got our session ID twice"); - } + } else if msg.get_name() == self.username.as_ref().unwrap() { + match self.session_id { + None => { + debug!("Found our session id: {}", msg.get_session()); + self.session_id = Some(msg.get_session()); + } + Some(session) => { + if session != msg.get_session() { + error!( + "Got two different session IDs ({} and {}) for ourselves", + session, + msg.get_session() + ); + } else { + debug!("Got our session ID twice"); } } } @@ -209,13 +207,13 @@ impl Server { } } - pub fn parse_server_sync(&mut self, mut msg: Box) { + pub fn parse_server_sync(&mut self, mut msg: msgs::ServerSync) { if msg.has_welcome_text() { self.welcome_text = Some(msg.take_welcome_text()); } } - pub fn parse_channel_state(&mut self, msg: Box) { + pub fn parse_channel_state(&mut self, msg: msgs::ChannelState) { if !msg.has_channel_id() { warn!("Can't parse channel state without channel id"); return; @@ -228,7 +226,7 @@ impl Server { } } - pub fn parse_channel_remove(&mut self, msg: Box) { + pub fn parse_channel_remove(&mut self, msg: msgs::ChannelRemove) { if !msg.has_channel_id() { warn!("Can't parse channel remove without channel id"); return; @@ -243,7 +241,7 @@ impl Server { } } - pub fn parse_user_state(&mut self, msg: Box) { + pub fn parse_user_state(&mut self, msg: msgs::UserState) { if !msg.has_session() { warn!("Can't parse user state without session"); return; @@ -276,7 +274,7 @@ pub struct Channel { } impl Channel { - pub fn new(mut msg: Box) -> Self { + pub fn new(mut msg: msgs::ChannelState) -> Self { Self { description: if msg.has_description() { Some(msg.take_description()) @@ -295,7 +293,7 @@ impl Channel { } } - pub fn parse_channel_state(&mut self, mut msg: Box) { + pub fn parse_channel_state(&mut self, mut msg: msgs::ChannelState) { if msg.has_description() { self.description = Some(msg.take_description()); } @@ -336,7 +334,7 @@ pub struct User { } impl User { - pub fn new(mut msg: Box) -> Self { + pub fn new(mut msg: msgs::UserState) -> Self { Self { channel: msg.get_channel_id(), comment: if msg.has_comment() { @@ -360,7 +358,7 @@ impl User { } } - pub fn parse_user_state(&mut self, mut msg: Box) { + pub fn parse_user_state(&mut self, mut msg: msgs::UserState) { if msg.has_channel_id() { self.channel = msg.get_channel_id(); } -- cgit v1.2.1 From 6ac72067a75d5e1904226efb5c45bcf0e54a0ae5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Wed, 14 Oct 2020 19:42:46 +0200 Subject: cargo fmt 2 eletric boogaloo --- mumd/src/audio.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'mumd/src') diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index d8a37a8..1445415 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -283,7 +283,10 @@ fn input_callback( || opus_frame_size_blocks == 4 || opus_frame_size_blocks == 8) { - panic!("Unsupported amount of opus frame blocks {}", opus_frame_size_blocks); + panic!( + "Unsupported amount of opus frame blocks {}", + opus_frame_size_blocks + ); } let opus_frame_size = opus_frame_size_blocks * sample_rate / 400; -- cgit v1.2.1