diff options
Diffstat (limited to 'mumd/src/network/tcp.rs')
| -rw-r--r-- | mumd/src/network/tcp.rs | 57 |
1 files changed, 32 insertions, 25 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs index dde98aa..fa4c4b6 100644 --- a/mumd/src/network/tcp.rs +++ b/mumd/src/network/tcp.rs @@ -1,5 +1,5 @@ -use crate::audio::Audio; -use crate::state::Server; +use crate::state::State; +use crate::command::Command; use log::*; use futures::channel::oneshot; @@ -12,6 +12,7 @@ use std::convert::{Into, TryInto}; use std::net::{SocketAddr}; use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio::time::{self, Duration}; use tokio_tls::{TlsConnector, TlsStream}; use tokio_util::codec::{Decoder, Framed}; @@ -24,25 +25,26 @@ type TcpReceiver = SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>; pub async fn handle( - server: Arc<Mutex<Server>>, + state: Arc<Mutex<State>>, server_addr: SocketAddr, server_host: String, - username: String, accept_invalid_cert: bool, crypt_state_sender: oneshot::Sender<ClientCryptState>, - audio: Arc<Mutex<Audio>>, + packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>, ) { let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await; let sink = Arc::new(Mutex::new(sink)); + // Handshake (omitting `Version` message for brevity) - authenticate(Arc::clone(&sink), username).await; + authenticate(Arc::clone(&sink), state.lock().unwrap().username().to_string()).await; info!("Logging in..."); join!( send_pings(Arc::clone(&sink), 10), - listen(server, sink, stream, crypt_state_sender, audio), + listen(state, stream, crypt_state_sender), + send_packets(sink, packet_receiver), ); } @@ -72,6 +74,7 @@ async fn connect( ClientControlCodec::new().framed(tls_stream).split() } +//TODO &mut sink? async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) { let mut msg = msgs::Authenticate::new(); msg.set_username(username); @@ -79,6 +82,7 @@ async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) { sink.lock().unwrap().send(msg.into()).await.unwrap(); } +//TODO move somewhere else (main?) and send through packet_sender async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) { let mut interval = time::interval(Duration::from_secs(delay_seconds)); loop { @@ -89,12 +93,18 @@ async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) { } } +async fn send_packets(sink: Arc<Mutex<TcpSender>>, + mut packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>) { + + while let Some(packet) = packet_receiver.recv().await { + sink.lock().unwrap().send(packet).await.unwrap(); + } +} + async fn listen( - server: Arc<Mutex<Server>>, - sink: Arc<Mutex<TcpSender>>, + state: Arc<Mutex<State>>, mut stream: TcpReceiver, crypt_state_sender: oneshot::Sender<ClientCryptState>, - audio: Arc<Mutex<Audio>>, ) { let mut crypt_state = None; let mut crypt_state_sender = Some(crypt_state_sender); @@ -102,18 +112,12 @@ async fn listen( while let Some(packet) = stream.next().await { //TODO handle types separately match packet.unwrap() { - ControlPacket::TextMessage(mut msg) => { + ControlPacket::TextMessage(msg) => { info!( "Got message from user with session ID {}: {}", msg.get_actor(), msg.get_message() ); - // Send reply back to server - let mut response = msgs::TextMessage::new(); - response.mut_session().push(msg.get_actor()); - response.set_message(msg.take_message()); - let mut lock = sink.lock().unwrap(); - lock.send(response.into()).await.unwrap(); } ControlPacket::CryptSetup(msg) => { debug!("Crypt setup"); @@ -139,7 +143,8 @@ async fn listen( .expect("Server didn't send us any CryptSetup packet!"), ); } - let mut server = server.lock().unwrap(); + let mut state = state.lock().unwrap(); + let server = state.server_mut(); server.parse_server_sync(msg); match &server.welcome_text { Some(s) => info!("Welcome: {}", s), @@ -148,16 +153,18 @@ async fn listen( for (_, channel) in server.channels() { info!("Found channel {}", channel.name()); } - sink.lock().unwrap().send(msgs::UserList::new().into()).await.unwrap(); + //TODO start listening for packets to send here + state.handle_command(Command::ChannelJoin{channel_id: 1}).await; } ControlPacket::Reject(msg) => { warn!("Login rejected: {:?}", msg); } ControlPacket::UserState(msg) => { - audio.lock().unwrap().add_client(msg.get_session()); - let mut server = server.lock().unwrap(); + let mut state = state.lock().unwrap(); let session = msg.get_session(); - server.parse_user_state(msg); + state.audio_mut().add_client(msg.get_session()); //TODO + state.parse_initial_user_state(msg); //TODO only if actually initiating state + let server = state.server_mut(); let user = server.users().get(&session).unwrap(); info!("User {} connected to {}", user.name(), @@ -165,14 +172,14 @@ async fn listen( } ControlPacket::UserRemove(msg) => { info!("User {} left", msg.get_session()); - audio.lock().unwrap().remove_client(msg.get_session()); + state.lock().unwrap().audio_mut().remove_client(msg.get_session()); } ControlPacket::ChannelState(msg) => { debug!("Channel state received"); - server.lock().unwrap().parse_channel_state(msg); + state.lock().unwrap().server_mut().parse_channel_state(msg); } ControlPacket::ChannelRemove(msg) => { - server.lock().unwrap().parse_channel_remove(msg); + state.lock().unwrap().server_mut().parse_channel_remove(msg); } _ => {} } |
