aboutsummaryrefslogtreecommitdiffstats
path: root/mumd
diff options
context:
space:
mode:
authorEskil Q <eskilq@kth.se>2021-01-04 17:22:07 +0100
committerEskil Q <eskilq@kth.se>2021-01-04 17:22:07 +0100
commit0179caa3f696ab88710b86943bc697fa1c6cf158 (patch)
treea13277f75f188a8e38c23a6c79a00d0cdfadee73 /mumd
parent49f4b7a7158f768e5bd04047b44b759f84529036 (diff)
downloadmum-0179caa3f696ab88710b86943bc697fa1c6cf158.tar.gz
change audio stream to send packages
Diffstat (limited to 'mumd')
-rw-r--r--mumd/src/audio.rs37
-rw-r--r--mumd/src/network/udp.rs19
2 files changed, 28 insertions, 28 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index d7b2060..bf3f7f4 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -5,22 +5,24 @@ use crate::audio::output::SaturatingAdd;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{SampleFormat, SampleRate, StreamConfig};
+use dasp_frame::Frame;
use dasp_interpolate::linear::Linear;
+use dasp_sample::{SignedSample, ToSample, Sample};
use dasp_signal::{self as signal, Signal};
+use futures::Stream;
+use futures::stream::StreamExt;
+use futures::task::{Context, Poll};
use log::*;
-use mumble_protocol::voice::VoicePacketPayload;
+use mumble_protocol::voice::{VoicePacketPayload, VoicePacket};
use opus::Channels;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
+use std::fmt::Debug;
+use std::future::{Future};
+use std::pin::Pin;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
-use dasp_frame::Frame;
-use dasp_sample::{SignedSample, ToSample, Sample};
-use futures::Stream;
-use futures::task::{Context, Poll};
-use std::pin::Pin;
-use std::future::{Future};
-use std::fmt::Debug;
+use mumble_protocol::Serverbound;
//TODO? move to mumlib
pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[
@@ -72,7 +74,7 @@ pub struct Audio {
_output_stream: cpal::Stream,
_input_stream: cpal::Stream,
- input_channel_receiver: Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>>,
+ input_channel_receiver: Arc<Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>,
input_volume_sender: watch::Sender<f32>,
output_volume_sender: watch::Sender<f32>,
@@ -207,7 +209,14 @@ impl Audio {
StreamingNoiseGate::new(
from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly
0.09,
- 10_000)));
+ 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0, // normal speech
+ session_id: (), // unused for server-bound packets
+ seq_num: i as u64,
+ payload: VoicePacketPayload::Opus(e.into(), false),
+ position_info: None,
+ });
output_stream.play().unwrap();
@@ -248,7 +257,7 @@ impl Audio {
_output_stream: output_stream,
_input_stream: input_stream,
input_volume_sender,
- input_channel_receiver: Some(Box::new(opus_stream)),
+ input_channel_receiver: Arc::new(Mutex::new(Box::new(opus_stream))),
client_streams,
sounds,
output_volume_sender,
@@ -298,8 +307,8 @@ impl Audio {
}
}
- pub fn take_receiver(&mut self) -> Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>> {
- self.input_channel_receiver.take()
+ pub fn take_receiver(&mut self) -> Arc<Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> {
+ Arc::clone(&self.input_channel_receiver)
}
pub fn clear_clients(&mut self) {
@@ -668,4 +677,4 @@ impl<S, I> Stream for OpusEncoder<S>
s.input_buffer.clear();
Poll::Ready(Some(encoded))
}
-} \ No newline at end of file
+}
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index d412d55..4dde268 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -26,7 +26,7 @@ pub async fn handle(
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>,
) {
- let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap();
+ let receiver = state.lock().unwrap().audio_mut().take_receiver();
loop {
let connection_info = 'data: loop {
@@ -48,13 +48,14 @@ pub async fn handle(
let source = Arc::new(Mutex::new(source));
let phase_watcher = state.lock().unwrap().phase_receiver();
+ let mut audio_receiver_lock = receiver.lock().unwrap();
join!(
listen(Arc::clone(&state), Arc::clone(&source), phase_watcher.clone()),
send_voice(
Arc::clone(&sink),
connection_info.socket_addr,
phase_watcher,
- &mut *receiver
+ &mut *audio_receiver_lock
),
new_crypt_state(&mut crypt_state_receiver, sink, source)
);
@@ -198,7 +199,7 @@ async fn send_voice(
sink: Arc<Mutex<UdpSender>>,
server_addr: SocketAddr,
mut phase_watcher: watch::Receiver<StatePhase>,
- receiver: &mut (dyn Stream<Item = Vec<u8>> + Unpin),
+ receiver: &mut (dyn Stream<Item = VoicePacket<Serverbound>> + Unpin),
) {
pin_mut!(receiver);
let (tx, rx) = oneshot::channel();
@@ -215,7 +216,6 @@ async fn send_voice(
let main_block = async {
let rx = rx.fuse();
pin_mut!(rx);
- let mut count = 0;
loop {
let packet_recv = receiver.next().fuse();
pin_mut!(packet_recv);
@@ -231,16 +231,7 @@ async fn send_voice(
warn!("Channel closed before disconnect command");
break;
}
- Some(Some(payload)) => {
- let reply = VoicePacket::Audio {
- _dst: std::marker::PhantomData,
- target: 0, // normal speech
- session_id: (), // unused for server-bound packets
- seq_num: count,
- payload: VoicePacketPayload::Opus(payload.into(), false),
- position_info: None,
- };
- count += 1;
+ Some(Some(reply)) => {
sink.lock()
.unwrap()
.send((reply, server_addr))