aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network/tcp.rs
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2020-10-13 00:40:38 +0200
committerGitHub <noreply@github.com>2020-10-13 00:40:38 +0200
commit63c2b18a558dc9c22be61c0068fd2ae62b188dea (patch)
treeb29faeac040627d851627b5ef547d102fade5bee /mumd/src/network/tcp.rs
parent57c258398831b180d5678be526854bc43f521e42 (diff)
downloadmum-63c2b18a558dc9c22be61c0068fd2ae62b188dea.tar.gz
split network.rs to tcp.rs and udp.rs (#4)
Diffstat (limited to 'mumd/src/network/tcp.rs')
-rw-r--r--mumd/src/network/tcp.rs180
1 files changed, 180 insertions, 0 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
new file mode 100644
index 0000000..dde98aa
--- /dev/null
+++ b/mumd/src/network/tcp.rs
@@ -0,0 +1,180 @@
+use crate::audio::Audio;
+use crate::state::Server;
+use log::*;
+
+use futures::channel::oneshot;
+use futures::{join, SinkExt, StreamExt};
+use futures_util::stream::{SplitSink, SplitStream};
+use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket};
+use mumble_protocol::crypt::ClientCryptState;
+use mumble_protocol::{Clientbound, Serverbound};
+use std::convert::{Into, TryInto};
+use std::net::{SocketAddr};
+use std::sync::{Arc, Mutex};
+use tokio::net::TcpStream;
+use tokio::time::{self, Duration};
+use tokio_tls::{TlsConnector, TlsStream};
+use tokio_util::codec::{Decoder, Framed};
+
+type TcpSender = SplitSink<
+ Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>,
+ ControlPacket<Serverbound>,
+>;
+type TcpReceiver =
+ SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>;
+
+pub async fn handle(
+ server: Arc<Mutex<Server>>,
+ server_addr: SocketAddr,
+ server_host: String,
+ username: String,
+ accept_invalid_cert: bool,
+ crypt_state_sender: oneshot::Sender<ClientCryptState>,
+ audio: Arc<Mutex<Audio>>,
+) {
+ let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await;
+ let sink = Arc::new(Mutex::new(sink));
+
+ // Handshake (omitting `Version` message for brevity)
+ authenticate(Arc::clone(&sink), username).await;
+
+ info!("Logging in...");
+
+ join!(
+ send_pings(Arc::clone(&sink), 10),
+ listen(server, sink, stream, crypt_state_sender, audio),
+ );
+}
+
+async fn connect(
+ server_addr: SocketAddr,
+ server_host: String,
+ accept_invalid_cert: bool,
+) -> (TcpSender, TcpReceiver) {
+ let stream = TcpStream::connect(&server_addr)
+ .await
+ .expect("failed to connect to server:");
+ debug!("TCP connected");
+
+ let mut builder = native_tls::TlsConnector::builder();
+ builder.danger_accept_invalid_certs(accept_invalid_cert);
+ let connector: TlsConnector = builder
+ .build()
+ .expect("failed to create TLS connector")
+ .into();
+ let tls_stream = connector
+ .connect(&server_host, stream)
+ .await
+ .expect("failed to connect TLS: {}");
+ debug!("TLS connected");
+
+ // Wrap the TLS stream with Mumble's client-side control-channel codec
+ ClientControlCodec::new().framed(tls_stream).split()
+}
+
+async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) {
+ let mut msg = msgs::Authenticate::new();
+ msg.set_username(username);
+ msg.set_opus(true);
+ sink.lock().unwrap().send(msg.into()).await.unwrap();
+}
+
+async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) {
+ let mut interval = time::interval(Duration::from_secs(delay_seconds));
+ loop {
+ interval.tick().await;
+ trace!("Sending ping");
+ let msg = msgs::Ping::new();
+ sink.lock().unwrap().send(msg.into()).await.unwrap();
+ }
+}
+
+async fn listen(
+ server: Arc<Mutex<Server>>,
+ sink: Arc<Mutex<TcpSender>>,
+ mut stream: TcpReceiver,
+ crypt_state_sender: oneshot::Sender<ClientCryptState>,
+ audio: Arc<Mutex<Audio>>,
+) {
+ let mut crypt_state = None;
+ let mut crypt_state_sender = Some(crypt_state_sender);
+
+ while let Some(packet) = stream.next().await {
+ //TODO handle types separately
+ match packet.unwrap() {
+ ControlPacket::TextMessage(mut msg) => {
+ info!(
+ "Got message from user with session ID {}: {}",
+ msg.get_actor(),
+ msg.get_message()
+ );
+ // Send reply back to server
+ let mut response = msgs::TextMessage::new();
+ response.mut_session().push(msg.get_actor());
+ response.set_message(msg.take_message());
+ let mut lock = sink.lock().unwrap();
+ lock.send(response.into()).await.unwrap();
+ }
+ ControlPacket::CryptSetup(msg) => {
+ debug!("Crypt setup");
+ // Wait until we're fully connected before initiating UDP voice
+ crypt_state = Some(ClientCryptState::new_from(
+ msg.get_key()
+ .try_into()
+ .expect("Server sent private key with incorrect size"),
+ msg.get_client_nonce()
+ .try_into()
+ .expect("Server sent client_nonce with incorrect size"),
+ msg.get_server_nonce()
+ .try_into()
+ .expect("Server sent server_nonce with incorrect size"),
+ ));
+ }
+ ControlPacket::ServerSync(msg) => {
+ info!("Logged in");
+ if let Some(sender) = crypt_state_sender.take() {
+ let _ = sender.send(
+ crypt_state
+ .take()
+ .expect("Server didn't send us any CryptSetup packet!"),
+ );
+ }
+ let mut server = server.lock().unwrap();
+ server.parse_server_sync(msg);
+ match &server.welcome_text {
+ Some(s) => info!("Welcome: {}", s),
+ None => info!("No welcome received"),
+ }
+ for (_, channel) in server.channels() {
+ info!("Found channel {}", channel.name());
+ }
+ sink.lock().unwrap().send(msgs::UserList::new().into()).await.unwrap();
+ }
+ ControlPacket::Reject(msg) => {
+ warn!("Login rejected: {:?}", msg);
+ }
+ ControlPacket::UserState(msg) => {
+ audio.lock().unwrap().add_client(msg.get_session());
+ let mut server = server.lock().unwrap();
+ let session = msg.get_session();
+ server.parse_user_state(msg);
+ let user = server.users().get(&session).unwrap();
+ info!("User {} connected to {}",
+ user.name(),
+ user.channel());
+ }
+ ControlPacket::UserRemove(msg) => {
+ info!("User {} left", msg.get_session());
+ audio.lock().unwrap().remove_client(msg.get_session());
+ }
+ ControlPacket::ChannelState(msg) => {
+ debug!("Channel state received");
+ server.lock().unwrap().parse_channel_state(msg);
+ }
+ ControlPacket::ChannelRemove(msg) => {
+ server.lock().unwrap().parse_channel_remove(msg);
+ }
+ _ => {}
+ }
+ }
+}