aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network
diff options
context:
space:
mode:
authorEskil Q <eskilq@kth.se>2020-12-25 15:24:19 +0100
committerEskil Q <eskilq@kth.se>2020-12-25 15:24:19 +0100
commit48ce14064d355ad0ed89e59b1d4b10256c85be6a (patch)
tree20a1d9010d424010e78173c76ca73e648be6113d /mumd/src/network
parent58947a7a3acaa1ae04887723643a49db76479f00 (diff)
downloadmum-48ce14064d355ad0ed89e59b1d4b10256c85be6a.tar.gz
initial tokio 1.0 commit
Diffstat (limited to 'mumd/src/network')
-rw-r--r--mumd/src/network/tcp.rs41
-rw-r--r--mumd/src/network/udp.rs60
2 files changed, 66 insertions, 35 deletions
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index b1743c8..9024ef3 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -17,8 +17,9 @@ use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::{self, Duration};
-use tokio_tls::{TlsConnector, TlsStream};
+use tokio_native_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
+// use tokio_util::codec::decoder::Decoder;
type TcpSender = SplitSink<
Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>,
@@ -48,16 +49,22 @@ pub async fn handle(
mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>,
) {
loop {
- let connection_info = loop {
- match connection_info_receiver.recv().await {
- None => {
- return;
- }
- Some(None) => {}
- Some(Some(connection_info)) => {
- break connection_info;
+ let connection_info = 'data: loop {
+ while let Ok(()) = connection_info_receiver.changed().await {
+ if let Some(data) = connection_info_receiver.borrow().clone() {
+ break 'data data;
}
}
+ return;
+ // match connection_info_receiver.changed().await {
+ // None => {
+ // return;
+ // }
+ // Some(None) => {}
+ // Some(Some(connection_info)) => {
+ // break connection_info;
+ // }
+ // }
};
let (mut sink, stream) = connect(
connection_info.socket_addr,
@@ -165,7 +172,7 @@ async fn send_packets(
},
|| async {
//clears queue of remaining packets
- while packet_receiver.borrow_mut().try_recv().is_ok() {}
+ // while packet_receiver.borrow_mut().try_recv().is_ok() {}
sink.borrow_mut().close().await.unwrap();
},
@@ -323,10 +330,16 @@ async fn run_until_disconnection<T, F, G, H>(
{
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
- while !matches!(
- phase_watcher.recv().await.unwrap(),
- StatePhase::Disconnected
- ) {}
+ loop {
+ phase_watcher.changed().await.unwrap();
+ if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) {
+ break;
+ }
+ }
+ // while !matches!(
+ // phase_watcher.recv().await.unwrap(),
+ // StatePhase::Disconnected
+ // ) {}
tx.send(true).unwrap();
};
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index b1c202a..55a47a3 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -29,16 +29,22 @@ pub async fn handle(
let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap();
loop {
- let connection_info = loop {
- match connection_info_receiver.recv().await {
- None => {
- return;
- }
- Some(None) => {}
- Some(Some(connection_info)) => {
- break connection_info;
+ let connection_info = 'data: loop {
+ while let Ok(()) = connection_info_receiver.changed().await {
+ if let Some(data) = connection_info_receiver.borrow().clone() {
+ break 'data data;
}
}
+ return;
+ // match connection_info_receiver.recv().await {
+ // None => {
+ // return;
+ // }
+ // Some(None) => {}
+ // Some(Some(connection_info)) => {
+ // break connection_info;
+ // }
+ // }
};
let (mut sink, source) = connect(&mut crypt_state_receiver).await;
@@ -114,10 +120,16 @@ async fn listen(
) {
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
- while !matches!(
- phase_watcher.recv().await.unwrap(),
- StatePhase::Disconnected
- ) {}
+ loop {
+ phase_watcher.changed().await.unwrap();
+ if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) {
+ break;
+ }
+ }
+ // while !matches!(
+ // phase_watcher.recv().await.unwrap(),
+ // StatePhase::Disconnected
+ // ) {}
tx.send(true).unwrap();
};
@@ -203,10 +215,16 @@ async fn send_voice(
) {
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
- while !matches!(
- phase_watcher.recv().await.unwrap(),
- StatePhase::Disconnected
- ) {}
+ loop {
+ phase_watcher.changed().await.unwrap();
+ if matches!(*phase_watcher.borrow(), StatePhase::Disconnected) {
+ break;
+ }
+ }
+ // while !matches!(
+ // phase_watcher.recv().await.unwrap(),
+ // StatePhase::Disconnected
+ // ) {}
tx.send(true).unwrap();
};
@@ -261,11 +279,11 @@ pub async fn handle_pings(
Box<dyn FnOnce(PongPacket)>,
)>,
) {
- let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
+ let udp_socket = Arc::new(UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
.await
- .expect("Failed to bind UDP socket");
+ .expect("Failed to bind UDP socket"));
- let (mut receiver, mut sender) = udp_socket.split();
+ // let (mut receiver, mut sender) = udp_socket.split();
let pending = Rc::new(Mutex::new(HashMap::new()));
@@ -273,14 +291,14 @@ pub async fn handle_pings(
while let Some((id, socket_addr, handle)) = ping_request_receiver.recv().await {
let packet = PingPacket { id };
let packet: [u8; 12] = packet.into();
- sender.send_to(&packet, &socket_addr).await.unwrap();
+ udp_socket.send_to(&packet, &socket_addr).await.unwrap();
pending.lock().unwrap().insert(id, handle);
}
};
let receiver_handle = async {
let mut buf = vec![0; 24];
- while let Ok(read) = receiver.recv(&mut buf).await {
+ while let Ok(read) = udp_socket.recv(&mut buf).await {
assert_eq!(read, 24);
let packet = PongPacket::try_from(buf.as_slice()).unwrap();