diff options
| -rw-r--r-- | mumd/src/audio.rs | 7 | ||||
| -rw-r--r-- | mumd/src/command.rs | 2 | ||||
| -rw-r--r-- | mumd/src/main.rs | 21 | ||||
| -rw-r--r-- | mumd/src/network/tcp.rs | 57 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 48 | ||||
| -rw-r--r-- | mumd/src/state.rs | 81 |
6 files changed, 151 insertions, 65 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 9b794a6..3c24f1c 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -1,6 +1,5 @@ use bytes::Bytes; -use cpal::traits::DeviceTrait; -use cpal::traits::HostTrait; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{ InputCallbackInfo, OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig, }; @@ -30,7 +29,7 @@ pub struct Audio { pub input_buffer: Arc<Mutex<VecDeque<f32>>>, input_channel_receiver: Option<Receiver<VoicePacketPayload>>, - client_streams: Arc<Mutex<HashMap<u32, ClientStream>>>, + client_streams: Arc<Mutex<HashMap<u32, ClientStream>>>, //TODO move to user state } //TODO split into input/output @@ -129,6 +128,8 @@ impl Audio { } .unwrap(); + output_stream.play().unwrap(); + Self { output_config, output_stream, diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 5d6cca4..322bde8 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -1,4 +1,4 @@ -enum Command { +pub enum Command { ChannelJoin { channel_id: u32, }, diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 2a0fcbd..a08db44 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -2,21 +2,22 @@ mod audio; mod network; mod command; mod state; -use crate::audio::Audio; -use crate::state::Server; +use crate::state::State; use argparse::ArgumentParser; use argparse::Store; use argparse::StoreTrue; use colored::*; -use cpal::traits::StreamTrait; use futures::channel::oneshot; use futures::join; use log::*; +use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; +use mumble_protocol::voice::Serverbound; use std::net::ToSocketAddrs; use std::sync::Arc; use std::sync::Mutex; +use tokio::sync::mpsc; #[tokio::main] async fn main() { @@ -73,28 +74,24 @@ async fn main() { // Oneshot channel for setting UDP CryptState from control task // For simplicity we don't deal with re-syncing, real applications would have to. let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::<ClientCryptState>(); + let (packet_sender, packet_receiver) = mpsc::channel::<ControlPacket<Serverbound>>(10); - let audio = Audio::new(); - audio.output_stream.play().unwrap(); - let audio = Arc::new(Mutex::new(audio)); - - let server_state = Arc::new(Mutex::new(Server::new())); + let state = Arc::new(Mutex::new(State::new(packet_sender, username))); // Run it join!( network::tcp::handle( - server_state, + Arc::clone(&state), server_addr, server_host, - username, accept_invalid_cert, crypt_state_sender, - Arc::clone(&audio), + packet_receiver, ), network::udp::handle( + state, server_addr, crypt_state_receiver, - audio, ), ); } diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index dde98aa..fa4c4b6 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,5 +1,5 @@ -use crate::audio::Audio; -use crate::state::Server; +use crate::state::State; +use crate::command::Command; use log::*; use futures::channel::oneshot; @@ -12,6 +12,7 @@ use std::convert::{Into, TryInto}; use std::net::{SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -24,25 +25,26 @@ type TcpReceiver = SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>; pub async fn handle( - server: Arc<Mutex<Server>>, + state: Arc<Mutex<State>>, server_addr: SocketAddr, server_host: String, - username: String, accept_invalid_cert: bool, crypt_state_sender: oneshot::Sender<ClientCryptState>, - audio: Arc<Mutex<Audio>>, + packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>, ) { let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; let sink = Arc::new(Mutex::new(sink)); + // Handshake (omitting `Version` message for brevity) - authenticate(Arc::clone(&sink), username).await; + authenticate(Arc::clone(&sink), state.lock().unwrap().username().to_string()).await; info!("Logging in..."); join!( send_pings(Arc::clone(&sink), 10), - listen(server, sink, stream, crypt_state_sender, audio), + listen(state, stream, crypt_state_sender), + send_packets(sink, packet_receiver), ); } @@ -72,6 +74,7 @@ async fn connect( ClientControlCodec::new().framed(tls_stream).split() } +//TODO &mut sink? async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) { let mut msg = msgs::Authenticate::new(); msg.set_username(username); @@ -79,6 +82,7 @@ async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) { sink.lock().unwrap().send(msg.into()).await.unwrap(); } +//TODO move somewhere else (main?) and send through packet_sender async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) { let mut interval = time::interval(Duration::from_secs(delay_seconds)); loop { @@ -89,12 +93,18 @@ async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) { } } +async fn send_packets(sink: Arc<Mutex<TcpSender>>, + mut packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>) { + + while let Some(packet) = packet_receiver.recv().await { + sink.lock().unwrap().send(packet).await.unwrap(); + } +} + async fn listen( - server: Arc<Mutex<Server>>, - sink: Arc<Mutex<TcpSender>>, + state: Arc<Mutex<State>>, mut stream: TcpReceiver, crypt_state_sender: oneshot::Sender<ClientCryptState>, - audio: Arc<Mutex<Audio>>, ) { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); @@ -102,18 +112,12 @@ async fn listen( while let Some(packet) = stream.next().await { //TODO handle types separately match packet.unwrap() { - ControlPacket::TextMessage(mut msg) => { + ControlPacket::TextMessage(msg) => { info!( "Got message from user with session ID {}: {}", msg.get_actor(), msg.get_message() ); - // Send reply back to server - let mut response = msgs::TextMessage::new(); - response.mut_session().push(msg.get_actor()); - response.set_message(msg.take_message()); - let mut lock = sink.lock().unwrap(); - lock.send(response.into()).await.unwrap(); } ControlPacket::CryptSetup(msg) => { debug!("Crypt setup"); @@ -139,7 +143,8 @@ async fn listen( .expect("Server didn't send us any CryptSetup packet!"), ); } - let mut server = server.lock().unwrap(); + let mut state = state.lock().unwrap(); + let server = state.server_mut(); server.parse_server_sync(msg); match &server.welcome_text { Some(s) => info!("Welcome: {}", s), @@ -148,16 +153,18 @@ async fn listen( for (_, channel) in server.channels() { info!("Found channel {}", channel.name()); } - sink.lock().unwrap().send(msgs::UserList::new().into()).await.unwrap(); + //TODO start listening for packets to send here + state.handle_command(Command::ChannelJoin{channel_id: 1}).await; } ControlPacket::Reject(msg) => { warn!("Login rejected: {:?}", msg); } ControlPacket::UserState(msg) => { - audio.lock().unwrap().add_client(msg.get_session()); - let mut server = server.lock().unwrap(); + let mut state = state.lock().unwrap(); let session = msg.get_session(); - server.parse_user_state(msg); + state.audio_mut().add_client(msg.get_session()); //TODO + state.parse_initial_user_state(msg); //TODO only if actually initiating state + let server = state.server_mut(); let user = server.users().get(&session).unwrap(); info!("User {} connected to {}", user.name(), @@ -165,14 +172,14 @@ async fn listen( } ControlPacket::UserRemove(msg) => { info!("User {} left", msg.get_session()); - audio.lock().unwrap().remove_client(msg.get_session()); + state.lock().unwrap().audio_mut().remove_client(msg.get_session()); } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - server.lock().unwrap().parse_channel_state(msg); + state.lock().unwrap().server_mut().parse_channel_state(msg); } ControlPacket::ChannelRemove(msg) => { - server.lock().unwrap().parse_channel_remove(msg); + state.lock().unwrap().server_mut().parse_channel_remove(msg); } _ => {} } diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index 39f16b6..5f76501 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -1,4 +1,4 @@ -use crate::audio::Audio; +use crate::state::State; use log::*; use bytes::Bytes; @@ -36,10 +36,28 @@ pub async fn connect( UdpFramed::new(udp_socket, crypt_state).split() } +pub async fn handle( + state: Arc<Mutex<State>>, + server_addr: SocketAddr, + crypt_state: oneshot::Receiver<ClientCryptState>, +) { + let (mut sink, source) = connect(crypt_state).await; + + // Note: A normal application would also send periodic Ping packets, and its own audio + // via UDP. We instead trick the server into accepting us by sending it one + // dummy voice packet. + send_ping(&mut sink, server_addr).await; + + let sink = Arc::new(Mutex::new(sink)); + join!( + listen(Arc::clone(&state), source), + send_voice(state, sink, server_addr) + ); +} + async fn listen( - _sink: Arc<Mutex<UdpSender>>, + state: Arc<Mutex<State>>, mut source: UdpReceiver, - audio: Arc<Mutex<Audio>>, ) { while let Some(packet) = source.next().await { let (packet, _src_addr) = match packet { @@ -63,7 +81,7 @@ async fn listen( // position_info, .. } => { - audio.lock().unwrap().decode_packet(session_id, payload); + state.lock().unwrap().audio().decode_packet(session_id, payload); } } } @@ -86,11 +104,11 @@ async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) { } async fn send_voice( + state: Arc<Mutex<State>>, sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr, - audio: Arc<Mutex<Audio>>, ) { - let mut receiver = audio.lock().unwrap().take_receiver().unwrap(); + let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); let mut count = 0; while let Some(payload) = receiver.recv().await { @@ -111,21 +129,3 @@ async fn send_voice( } } -pub async fn handle( - server_addr: SocketAddr, - crypt_state: oneshot::Receiver<ClientCryptState>, - audio: Arc<Mutex<Audio>>, -) { - let (mut sink, source) = connect(crypt_state).await; - - // Note: A normal application would also send periodic Ping packets, and its own audio - // via UDP. We instead trick the server into accepting us by sending it one - // dummy voice packet. - send_ping(&mut sink, server_addr).await; - - let sink = Arc::new(Mutex::new(sink)); - join!( - listen(Arc::clone(&sink), source, Arc::clone(&audio)), - send_voice(sink, server_addr, audio) - ); -} diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 1ef8467..566adaf 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -1,7 +1,88 @@ use log::*; +use crate::audio::Audio; +use crate::command::Command; use mumble_protocol::control::msgs; +use mumble_protocol::control::ControlPacket; +use mumble_protocol::voice::Serverbound; use std::collections::HashMap; use std::collections::hash_map::Entry; +use tokio::sync::mpsc; + +pub struct State { + server: Server, + audio: Audio, + + packet_sender: mpsc::Sender<ControlPacket<Serverbound>>, + + username: String, + session_id: Option<u32>, //FIXME set +} + +impl State { + pub fn new(packet_sender: mpsc::Sender<ControlPacket<Serverbound>>, + username: String) -> Self { + Self { + server: Server::new(), + audio: Audio::new(), + packet_sender, + username, + session_id: None, + } + } + + //TODO result + pub async fn handle_command(&mut self, command: Command) { + match command { + Command::ChannelJoin{channel_id} => { + if self.session_id.is_none() { + warn!("Tried to join channel but we don't have a session id"); + return; + } + let mut msg = msgs::UserState::new(); + msg.set_session(self.session_id.unwrap()); + msg.set_channel_id(channel_id); + self.packet_sender.send(msg.into()).await.unwrap(); + } + _ => {} + } + } + + pub fn parse_initial_user_state(&mut self, msg: Box<msgs::UserState>) { + if !msg.has_session() { + warn!("Can't parse user state without session"); + return; + } + if !msg.has_name() { + warn!("Missing name in initial user state"); + } else { + if msg.get_name() == self.username { + match self.session_id { + None => { + debug!("Found our session id: {}", msg.get_session()); + self.session_id = Some(msg.get_session()); + } + Some(session) => { + if session != msg.get_session() { + error!("Got two different session IDs ({} and {}) for ourselves", + session, + msg.get_session()); + } else { + debug!("Got our session ID twice"); + } + } + } + } + } + self.server.parse_user_state(msg); + } + + pub fn audio(&self) -> &Audio { &self.audio } + pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio } + + pub fn username(&self) -> &str { &self.username } + + pub fn server_mut(&mut self) -> &mut Server { &mut self.server } +} pub struct Server { channels: HashMap<u32, Channel>, |
