aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network/tcp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'mumd/src/network/tcp.rs')
-rw-r--r--mumd/src/network/tcp.rs113
1 files changed, 55 insertions, 58 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 47ea311..f767446 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -1,25 +1,28 @@
use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
+use futures::Stream;
use log::*;
-use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt};
+use futures::{join, SinkExt, StreamExt};
use futures_util::stream::{SplitSink, SplitStream};
use mumble_protocol::control::{msgs, ClientControlCodec, ControlCodec, ControlPacket};
use mumble_protocol::crypt::ClientCryptState;
+use mumble_protocol::voice::VoicePacket;
use mumble_protocol::{Clientbound, Serverbound};
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::{Into, TryInto};
-use std::future::Future;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
-use tokio::sync::{mpsc, oneshot, watch};
+use tokio::sync::{mpsc, watch};
use tokio::time::{self, Duration};
use tokio_native_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
+use super::{run_until, VoiceStreamType};
+
type TcpSender = SplitSink<
Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>,
ControlPacket<Serverbound>,
@@ -68,11 +71,13 @@ pub async fn handle(
let state_lock = state.lock().unwrap();
authenticate(&mut sink, state_lock.username().unwrap().to_string()).await;
let phase_watcher = state_lock.phase_receiver();
+ let input_receiver = state_lock.audio().input_receiver();
drop(state_lock);
let event_queue = Arc::new(Mutex::new(HashMap::new()));
info!("Logging in...");
+ //TODO force exit all futures on disconnection
join!(
send_pings(packet_sender.clone(), 10, phase_watcher.clone()),
listen(
@@ -82,6 +87,11 @@ pub async fn handle(
Arc::clone(&event_queue),
phase_watcher.clone(),
),
+ send_voice(
+ packet_sender.clone(),
+ Arc::clone(&input_receiver),
+ phase_watcher.clone(),
+ ),
send_packets(sink, &mut packet_receiver, phase_watcher.clone()),
register_events(&mut tcp_event_register_receiver, event_queue, phase_watcher),
);
@@ -133,7 +143,8 @@ async fn send_pings(
))));
let packet_sender = Rc::new(RefCell::new(packet_sender));
- run_until_disconnection(
+ run_until(
+ |phase| matches!(phase, StatePhase::Disconnected),
|| async { Some(interval.borrow_mut().tick().await) },
|_| async {
trace!("Sending ping");
@@ -155,7 +166,8 @@ async fn send_packets(
) {
let sink = Rc::new(RefCell::new(sink));
let packet_receiver = Rc::new(RefCell::new(packet_receiver));
- run_until_disconnection(
+ run_until(
+ |phase| matches!(phase, StatePhase::Disconnected),
|| async { packet_receiver.borrow_mut().recv().await },
|packet| async {
sink.borrow_mut().send(packet).await.unwrap();
@@ -170,6 +182,40 @@ async fn send_packets(
debug!("TCP packet sender killed");
}
+async fn send_voice(
+ packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
+ receiver: Arc<Mutex<Box<(dyn Stream<Item = VoicePacket<Serverbound>> + Unpin)>>>,
+ phase_watcher: watch::Receiver<StatePhase>,
+) {
+ let inner_phase_watcher = phase_watcher.clone();
+ run_until(
+ |phase| matches!(phase, StatePhase::Disconnected),
+ || async {
+ run_until(
+ |phase| !matches!(phase, StatePhase::Connected(VoiceStreamType::TCP)),
+ || async {
+ packet_sender.send(receiver
+ .lock()
+ .unwrap()
+ .next()
+ .await
+ .unwrap()
+ .into())
+ .unwrap();
+ Some(Some(()))
+ },
+ |_| async {},
+ || async {},
+ inner_phase_watcher.clone(),
+ ).await;
+ Some(Some(()))
+ },
+ |_| async {},
+ || async {},
+ phase_watcher,
+ ).await;
+}
+
async fn listen(
state: Arc<Mutex<State>>,
stream: TcpReceiver,
@@ -181,7 +227,8 @@ async fn listen(
let crypt_state_sender = Rc::new(RefCell::new(Some(crypt_state_sender)));
let stream = Rc::new(RefCell::new(stream));
- run_until_disconnection(
+ run_until(
+ |phase| matches!(phase, StatePhase::Disconnected),
|| async { stream.borrow_mut().next().await },
|packet| async {
match packet.unwrap() {
@@ -289,7 +336,8 @@ async fn register_events(
phase_watcher: watch::Receiver<StatePhase>,
) {
let tcp_event_register_receiver = Rc::new(RefCell::new(tcp_event_register_receiver));
- run_until_disconnection(
+ run_until(
+ |phase| matches!(phase, StatePhase::Disconnected),
|| async { tcp_event_register_receiver.borrow_mut().recv().await },
|(event, handler)| async {
event_data
@@ -304,54 +352,3 @@ async fn register_events(
)
.await;
}
-
-async fn run_until_disconnection<T, F, G, H>(
- mut generator: impl FnMut() -> F,
- mut handler: impl FnMut(T) -> G,
- mut shutdown: impl FnMut() -> H,
- mut phase_watcher: watch::Receiver<StatePhase>,
-) where
- F: Future<Output = Option<T>>,
- G: Future<Output = ()>,
- H: Future<Output = ()>,
-{
- let (tx, rx) = oneshot::channel();
- let phase_transition_block = async {
- loop {
- phase_watcher.changed().await.unwrap();
- if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) {
- break;
- }
- }
- tx.send(true).unwrap();
- };
-
- let main_block = async {
- let rx = rx.fuse();
- pin_mut!(rx);
- loop {
- let packet_recv = generator().fuse();
- pin_mut!(packet_recv);
- let exitor = select! {
- data = packet_recv => Some(data),
- _ = rx => None
- };
- match exitor {
- None => {
- break;
- }
- Some(None) => {
- //warn!("Channel closed before disconnect command"); //TODO make me informative
- break;
- }
- Some(Some(data)) => {
- handler(data).await;
- }
- }
- }
-
- shutdown().await;
- };
-
- join!(main_block, phase_transition_block);
-}