aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/network/udp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'mumd/src/network/udp.rs')
-rw-r--r--mumd/src/network/udp.rs24
1 files changed, 8 insertions, 16 deletions
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index f7eeb62..0c00029 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -2,7 +2,7 @@ use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
use bytes::Bytes;
-use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt};
+use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt, Stream};
use futures_util::stream::{SplitSink, SplitStream};
use log::*;
use mumble_protocol::crypt::ClientCryptState;
@@ -28,7 +28,7 @@ pub async fn handle(
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>,
) {
- let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap();
+ let receiver = state.lock().unwrap().audio_mut().take_receiver();
loop {
let connection_info = 'data: loop {
@@ -50,13 +50,14 @@ pub async fn handle(
let source = Arc::new(Mutex::new(source));
let phase_watcher = state.lock().unwrap().phase_receiver();
+ let mut audio_receiver_lock = receiver.lock().unwrap();
join!(
listen(Arc::clone(&state), Arc::clone(&source), phase_watcher.clone()),
send_voice(
Arc::clone(&sink),
connection_info.socket_addr,
phase_watcher,
- &mut receiver
+ &mut *audio_receiver_lock
),
new_crypt_state(&mut crypt_state_receiver, sink, source)
);
@@ -197,8 +198,9 @@ async fn send_voice(
sink: Arc<Mutex<UdpSender>>,
server_addr: SocketAddr,
mut phase_watcher: watch::Receiver<StatePhase>,
- receiver: &mut mpsc::Receiver<VoicePacketPayload>,
+ receiver: &mut (dyn Stream<Item = VoicePacket<Serverbound>> + Unpin),
) {
+ pin_mut!(receiver);
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
loop {
@@ -213,9 +215,8 @@ async fn send_voice(
let main_block = async {
let rx = rx.fuse();
pin_mut!(rx);
- let mut count = 0;
loop {
- let packet_recv = receiver.recv().fuse();
+ let packet_recv = receiver.next().fuse();
pin_mut!(packet_recv);
let exitor = select! {
data = packet_recv => Some(data),
@@ -229,16 +230,7 @@ async fn send_voice(
warn!("Channel closed before disconnect command");
break;
}
- Some(Some(payload)) => {
- let reply = VoicePacket::Audio {
- _dst: std::marker::PhantomData,
- target: 0, // normal speech
- session_id: (), // unused for server-bound packets
- seq_num: count,
- payload,
- position_info: None,
- };
- count += 1;
+ Some(Some(reply)) => {
sink.lock()
.unwrap()
.send((reply, server_addr))