aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network/tcp.rs
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2020-10-13 02:31:09 +0200
committerGustav Sörnäs <gustav@sornas.net>2020-10-13 02:31:35 +0200
commitb583f6dbe521e01e879e16605026997dfa10c3d3 (patch)
treef318fbf00a47a30787b891094a522f313fd1d00b /mumd/src/network/tcp.rs
parent39c1ff5be55ade710bbe0fbe4701a070dadbb8e7 (diff)
downloadmum-b583f6dbe521e01e879e16605026997dfa10c3d3.tar.gz
join different channels
Co-authored-by: Eskil Queseth <eskilq@kth.se>
Diffstat (limited to 'mumd/src/network/tcp.rs')
-rw-r--r--mumd/src/network/tcp.rs57
1 files changed, 32 insertions, 25 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index dde98aa..fa4c4b6 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -1,5 +1,5 @@
-use crate::audio::Audio;
-use crate::state::Server;
+use crate::state::State;
+use crate::command::Command;
use log::*;
use futures::channel::oneshot;
@@ -12,6 +12,7 @@ use std::convert::{Into, TryInto};
use std::net::{SocketAddr};
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
+use tokio::sync::mpsc;
use tokio::time::{self, Duration};
use tokio_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
@@ -24,25 +25,26 @@ type TcpReceiver =
SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>;
pub async fn handle(
- server: Arc<Mutex<Server>>,
+ state: Arc<Mutex<State>>,
server_addr: SocketAddr,
server_host: String,
- username: String,
accept_invalid_cert: bool,
crypt_state_sender: oneshot::Sender<ClientCryptState>,
- audio: Arc<Mutex<Audio>>,
+ packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>,
) {
let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await;
let sink = Arc::new(Mutex::new(sink));
+
// Handshake (omitting `Version` message for brevity)
- authenticate(Arc::clone(&sink), username).await;
+ authenticate(Arc::clone(&sink), state.lock().unwrap().username().to_string()).await;
info!("Logging in...");
join!(
send_pings(Arc::clone(&sink), 10),
- listen(server, sink, stream, crypt_state_sender, audio),
+ listen(state, stream, crypt_state_sender),
+ send_packets(sink, packet_receiver),
);
}
@@ -72,6 +74,7 @@ async fn connect(
ClientControlCodec::new().framed(tls_stream).split()
}
+//TODO &mut sink?
async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) {
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
@@ -79,6 +82,7 @@ async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) {
sink.lock().unwrap().send(msg.into()).await.unwrap();
}
+//TODO move somewhere else (main?) and send through packet_sender
async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) {
let mut interval = time::interval(Duration::from_secs(delay_seconds));
loop {
@@ -89,12 +93,18 @@ async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) {
}
}
+async fn send_packets(sink: Arc<Mutex<TcpSender>>,
+ mut packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>) {
+
+ while let Some(packet) = packet_receiver.recv().await {
+ sink.lock().unwrap().send(packet).await.unwrap();
+ }
+}
+
async fn listen(
- server: Arc<Mutex<Server>>,
- sink: Arc<Mutex<TcpSender>>,
+ state: Arc<Mutex<State>>,
mut stream: TcpReceiver,
crypt_state_sender: oneshot::Sender<ClientCryptState>,
- audio: Arc<Mutex<Audio>>,
) {
let mut crypt_state = None;
let mut crypt_state_sender = Some(crypt_state_sender);
@@ -102,18 +112,12 @@ async fn listen(
while let Some(packet) = stream.next().await {
//TODO handle types separately
match packet.unwrap() {
- ControlPacket::TextMessage(mut msg) => {
+ ControlPacket::TextMessage(msg) => {
info!(
"Got message from user with session ID {}: {}",
msg.get_actor(),
msg.get_message()
);
- // Send reply back to server
- let mut response = msgs::TextMessage::new();
- response.mut_session().push(msg.get_actor());
- response.set_message(msg.take_message());
- let mut lock = sink.lock().unwrap();
- lock.send(response.into()).await.unwrap();
}
ControlPacket::CryptSetup(msg) => {
debug!("Crypt setup");
@@ -139,7 +143,8 @@ async fn listen(
.expect("Server didn't send us any CryptSetup packet!"),
);
}
- let mut server = server.lock().unwrap();
+ let mut state = state.lock().unwrap();
+ let server = state.server_mut();
server.parse_server_sync(msg);
match &server.welcome_text {
Some(s) => info!("Welcome: {}", s),
@@ -148,16 +153,18 @@ async fn listen(
for (_, channel) in server.channels() {
info!("Found channel {}", channel.name());
}
- sink.lock().unwrap().send(msgs::UserList::new().into()).await.unwrap();
+ //TODO start listening for packets to send here
+ state.handle_command(Command::ChannelJoin{channel_id: 1}).await;
}
ControlPacket::Reject(msg) => {
warn!("Login rejected: {:?}", msg);
}
ControlPacket::UserState(msg) => {
- audio.lock().unwrap().add_client(msg.get_session());
- let mut server = server.lock().unwrap();
+ let mut state = state.lock().unwrap();
let session = msg.get_session();
- server.parse_user_state(msg);
+ state.audio_mut().add_client(msg.get_session()); //TODO
+ state.parse_initial_user_state(msg); //TODO only if actually initiating state
+ let server = state.server_mut();
let user = server.users().get(&session).unwrap();
info!("User {} connected to {}",
user.name(),
@@ -165,14 +172,14 @@ async fn listen(
}
ControlPacket::UserRemove(msg) => {
info!("User {} left", msg.get_session());
- audio.lock().unwrap().remove_client(msg.get_session());
+ state.lock().unwrap().audio_mut().remove_client(msg.get_session());
}
ControlPacket::ChannelState(msg) => {
debug!("Channel state received");
- server.lock().unwrap().parse_channel_state(msg);
+ state.lock().unwrap().server_mut().parse_channel_state(msg);
}
ControlPacket::ChannelRemove(msg) => {
- server.lock().unwrap().parse_channel_remove(msg);
+ state.lock().unwrap().server_mut().parse_channel_remove(msg);
}
_ => {}
}