diff options
| author | Eskil Q <eskilq@kth.se> | 2021-01-04 17:22:07 +0100 |
|---|---|---|
| committer | Eskil Q <eskilq@kth.se> | 2021-01-04 17:22:07 +0100 |
| commit | 0179caa3f696ab88710b86943bc697fa1c6cf158 (patch) | |
| tree | a13277f75f188a8e38c23a6c79a00d0cdfadee73 /mumd/src | |
| parent | 49f4b7a7158f768e5bd04047b44b759f84529036 (diff) | |
| download | mum-0179caa3f696ab88710b86943bc697fa1c6cf158.tar.gz | |
change audio stream to send packages
Diffstat (limited to 'mumd/src')
| -rw-r--r-- | mumd/src/audio.rs | 37 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 19 |
2 files changed, 28 insertions, 28 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index d7b2060..bf3f7f4 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -5,22 +5,24 @@ use crate::audio::output::SaturatingAdd; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{SampleFormat, SampleRate, StreamConfig}; +use dasp_frame::Frame; use dasp_interpolate::linear::Linear; +use dasp_sample::{SignedSample, ToSample, Sample}; use dasp_signal::{self as signal, Signal}; +use futures::Stream; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; use log::*; -use mumble_protocol::voice::VoicePacketPayload; +use mumble_protocol::voice::{VoicePacketPayload, VoicePacket}; use opus::Channels; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; +use std::fmt::Debug; +use std::future::{Future}; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use tokio::sync::watch; -use dasp_frame::Frame; -use dasp_sample::{SignedSample, ToSample, Sample}; -use futures::Stream; -use futures::task::{Context, Poll}; -use std::pin::Pin; -use std::future::{Future}; -use std::fmt::Debug; +use mumble_protocol::Serverbound; //TODO? move to mumlib pub const EVENT_SOUNDS: &[(&'static [u8], NotificationEvents)] = &[ @@ -72,7 +74,7 @@ pub struct Audio { _output_stream: cpal::Stream, _input_stream: cpal::Stream, - input_channel_receiver: Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>>, + input_channel_receiver: Arc<Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>, input_volume_sender: watch::Sender<f32>, output_volume_sender: watch::Sender<f32>, @@ -207,7 +209,14 @@ impl Audio { StreamingNoiseGate::new( from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly 0.09, - 10_000))); + 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, // normal speech + session_id: (), // unused for server-bound packets + seq_num: i as u64, + payload: VoicePacketPayload::Opus(e.into(), false), + position_info: None, + }); output_stream.play().unwrap(); @@ -248,7 +257,7 @@ impl Audio { _output_stream: output_stream, _input_stream: input_stream, input_volume_sender, - input_channel_receiver: Some(Box::new(opus_stream)), + input_channel_receiver: Arc::new(Mutex::new(Box::new(opus_stream))), client_streams, sounds, output_volume_sender, @@ -298,8 +307,8 @@ impl Audio { } } - pub fn take_receiver(&mut self) -> Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>> { - self.input_channel_receiver.take() + pub fn take_receiver(&mut self) -> Arc<Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> { + Arc::clone(&self.input_channel_receiver) } pub fn clear_clients(&mut self) { @@ -668,4 +677,4 @@ impl<S, I> Stream for OpusEncoder<S> s.input_buffer.clear(); Poll::Ready(Some(encoded)) } -}
\ No newline at end of file +} diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index d412d55..4dde268 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -26,7 +26,7 @@ pub async fn handle( mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>, mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>, ) { - let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap(); + let receiver = state.lock().unwrap().audio_mut().take_receiver(); loop { let connection_info = 'data: loop { @@ -48,13 +48,14 @@ pub async fn handle( let source = Arc::new(Mutex::new(source)); let phase_watcher = state.lock().unwrap().phase_receiver(); + let mut audio_receiver_lock = receiver.lock().unwrap(); join!( listen(Arc::clone(&state), Arc::clone(&source), phase_watcher.clone()), send_voice( Arc::clone(&sink), connection_info.socket_addr, phase_watcher, - &mut *receiver + &mut *audio_receiver_lock ), new_crypt_state(&mut crypt_state_receiver, sink, source) ); @@ -198,7 +199,7 @@ async fn send_voice( sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr, mut phase_watcher: watch::Receiver<StatePhase>, - receiver: &mut (dyn Stream<Item = Vec<u8>> + Unpin), + receiver: &mut (dyn Stream<Item = VoicePacket<Serverbound>> + Unpin), ) { pin_mut!(receiver); let (tx, rx) = oneshot::channel(); @@ -215,7 +216,6 @@ async fn send_voice( let main_block = async { let rx = rx.fuse(); pin_mut!(rx); - let mut count = 0; loop { let packet_recv = receiver.next().fuse(); pin_mut!(packet_recv); @@ -231,16 +231,7 @@ async fn send_voice( warn!("Channel closed before disconnect command"); break; } - Some(Some(payload)) => { - let reply = VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, // normal speech - session_id: (), // unused for server-bound packets - seq_num: count, - payload: VoicePacketPayload::Opus(payload.into(), false), - position_info: None, - }; - count += 1; + Some(Some(reply)) => { sink.lock() .unwrap() .send((reply, server_addr)) |
