diff options
| -rw-r--r-- | mumd/src/audio.rs | 71 | ||||
| -rw-r--r-- | mumd/src/audio/input.rs | 46 | ||||
| -rw-r--r-- | mumd/src/main.rs | 2 | ||||
| -rw-r--r-- | mumd/src/network/udp.rs | 11 | ||||
| -rw-r--r-- | mumd/src/state.rs | 4 |
5 files changed, 45 insertions, 89 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index 0a2465e..a8af82d 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -13,9 +13,9 @@ use opus::Channels; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; use dasp_frame::Frame; -use dasp_sample::{Sample, SignedSample, ToSample}; +use dasp_sample::{SignedSample, ToSample, Sample}; use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; @@ -74,7 +74,7 @@ pub struct Audio { _output_stream: cpal::Stream, _input_stream: cpal::Stream, - input_channel_receiver: Option<mpsc::Receiver<VoicePacketPayload>>, + input_channel_receiver: Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>>, input_volume_sender: watch::Sender<f32>, output_volume_sender: watch::Sender<f32>, @@ -88,7 +88,7 @@ pub struct Audio { } impl Audio { - pub fn new(input_volume: f32, output_volume: f32) -> Self { + pub async fn new(input_volume: f32, output_volume: f32) -> Self { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); @@ -169,20 +169,7 @@ impl Audio { } .unwrap(); - let input_encoder = opus::Encoder::new( - input_config.sample_rate.0, - match input_config.channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!( - "Only 1 or 2 channels supported, got {})", - input_config.channels - ), - }, - opus::Application::Voip, - ) - .unwrap(); - let (input_sender, input_receiver) = mpsc::channel(100); + let (sample_sender, sample_receiver) = futures::channel::mpsc::channel(1_000_000); let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume); @@ -190,39 +177,37 @@ impl Audio { SampleFormat::F32 => input_device.build_input_stream( &input_config, input::callback::<f32>( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), SampleFormat::I16 => input_device.build_input_stream( &input_config, input::callback::<i16>( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), SampleFormat::U16 => input_device.build_input_stream( &input_config, input::callback::<u16>( - input_encoder, - input_sender, - input_config.sample_rate.0, + sample_sender, input_volume_receiver, - 4, // 10 ms ), err_fn, ), } .unwrap(); + let opus_stream = OpusEncoder::new( + 4, + input_config.sample_rate.0, + input_config.channels as usize, + StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver).await)); //TODO attach a noise gate + //TODO group frames correctly + output_stream.play().unwrap(); let sounds = EVENT_SOUNDS @@ -246,7 +231,7 @@ impl Audio { _ => unimplemented!() // TODO handle gracefully (this might not even happen) }; let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter); - let interp = Linear::new(signal.next(), signal.next()); + let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal)); let samples = signal .from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64) .until_exhausted() @@ -262,7 +247,7 @@ impl Audio { _output_stream: output_stream, _input_stream: input_stream, input_volume_sender, - input_channel_receiver: Some(input_receiver), + input_channel_receiver: Some(Box::new(opus_stream)), client_streams, sounds, output_volume_sender, @@ -312,7 +297,7 @@ impl Audio { } } - pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<VoicePacketPayload>> { + pub fn take_receiver(&mut self) -> Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>> { self.input_channel_receiver.take() } @@ -530,6 +515,9 @@ trait StreamingSignalExt: StreamingSignal { } } +impl<S> StreamingSignalExt for S + where S: StreamingSignal {} + struct Next<'a, S: ?Sized> { stream: &'a mut S } @@ -561,16 +549,16 @@ impl<S> Stream for IntoInterleavedSamples<S> fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let s = self.get_mut(); loop { - if s.current_frame.is_none() { - match S::poll_next(Pin::new(&mut s.signal), cx) { - Poll::Ready(val) => { - s.current_frame = Some(val.channels()); - } - Poll::Pending => return Poll::Pending, + if s.current_frame.is_some() { + if let Some(channel) = s.current_frame.as_mut().unwrap().next() { + return Poll::Ready(Some(channel)); } } - if let Some(channel) = s.current_frame.as_mut().unwrap().next() { - return Poll::Ready(Some(channel)); + match S::poll_next(Pin::new(&mut s.signal), cx) { + Poll::Ready(val) => { + s.current_frame = Some(val.channels()); + } + Poll::Pending => return Poll::Pending, } } } @@ -681,6 +669,7 @@ impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F> let mut data = s.next_buf.iter().cloned(); let n = F::from_samples(&mut data).unwrap(); + s.next_buf.clear(); let ret = mem::replace(&mut s.next_frame, Some(n)).unwrap(); Poll::Ready(ret) } diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs index 01fd1f3..8f0fe6e 100644 --- a/mumd/src/audio/input.rs +++ b/mumd/src/audio/input.rs @@ -1,9 +1,7 @@ -use bytes::Bytes; use cpal::{InputCallbackInfo, Sample}; -use mumble_protocol::voice::VoicePacketPayload; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; use log::*; use futures::Stream; use std::pin::Pin; @@ -11,50 +9,18 @@ use std::task::{Context, Poll}; use futures_util::task::Waker; pub fn callback<T: Sample>( - mut opus_encoder: opus::Encoder, - input_sender: mpsc::Sender<VoicePacketPayload>, - sample_rate: u32, + mut input_sender: futures::channel::mpsc::Sender<f32>, input_volume_receiver: watch::Receiver<f32>, - opus_frame_size_blocks: u32, // blocks of 2.5ms ) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { - if !(opus_frame_size_blocks == 1 - || opus_frame_size_blocks == 2 - || opus_frame_size_blocks == 4 - || opus_frame_size_blocks == 8) - { - panic!( - "Unsupported amount of opus frame blocks {}", - opus_frame_size_blocks - ); - } - let opus_frame_size = opus_frame_size_blocks * sample_rate / 400; - - let buf = Arc::new(Mutex::new(VecDeque::new())); move |data: &[T], _info: &InputCallbackInfo| { - debug!("{:?}", _info); - let mut buf = buf.lock().unwrap(); let input_volume = *input_volume_receiver.borrow(); - let out: Vec<f32> = data + for sample in data .iter() .map(|e| e.to_f32()) - .map(|e| e * input_volume) - .collect(); - buf.extend(out); - while buf.len() >= opus_frame_size as usize { - let tail = buf.split_off(opus_frame_size as usize); - let mut opus_buf: Vec<u8> = vec![0; opus_frame_size as usize]; - let result = opus_encoder - .encode_float(&Vec::from(buf.clone()), &mut opus_buf) - .unwrap(); - opus_buf.truncate(result); - let bytes = Bytes::copy_from_slice(&opus_buf); - match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) { - Ok(_) => {} - Err(_e) => { - //warn!("Error sending audio packet: {:?}", e); - } + .map(|e| e * input_volume) { + if let Err(_e) = input_sender.try_send(sample) { + // warn!("Error sending audio: {}", e) } - *buf = tail; } } } diff --git a/mumd/src/main.rs b/mumd/src/main.rs index db6d2ef..4d6f148 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -58,7 +58,7 @@ async fn main() { let (response_sender, response_receiver) = mpsc::unbounded_channel(); let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel(); - let state = State::new(packet_sender, connection_info_sender); + let state = State::new(packet_sender, connection_info_sender).await; let state = Arc::new(Mutex::new(state)); let (_, _, _, e, _) = join!( diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs index b592a60..d412d55 100644 --- a/mumd/src/network/udp.rs +++ b/mumd/src/network/udp.rs @@ -3,7 +3,7 @@ use crate::state::{State, StatePhase}; use log::*; use bytes::Bytes; -use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt}; +use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt, Stream}; use futures_util::stream::{SplitSink, SplitStream}; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::ping::{PingPacket, PongPacket}; @@ -54,7 +54,7 @@ pub async fn handle( Arc::clone(&sink), connection_info.socket_addr, phase_watcher, - &mut receiver + &mut *receiver ), new_crypt_state(&mut crypt_state_receiver, sink, source) ); @@ -198,8 +198,9 @@ async fn send_voice( sink: Arc<Mutex<UdpSender>>, server_addr: SocketAddr, mut phase_watcher: watch::Receiver<StatePhase>, - receiver: &mut mpsc::Receiver<VoicePacketPayload>, + receiver: &mut (dyn Stream<Item = Vec<u8>> + Unpin), ) { + pin_mut!(receiver); let (tx, rx) = oneshot::channel(); let phase_transition_block = async { loop { @@ -216,7 +217,7 @@ async fn send_voice( pin_mut!(rx); let mut count = 0; loop { - let packet_recv = receiver.recv().fuse(); + let packet_recv = receiver.next().fuse(); pin_mut!(packet_recv); let exitor = select! { data = packet_recv => Some(data), @@ -236,7 +237,7 @@ async fn send_voice( target: 0, // normal speech session_id: (), // unused for server-bound packets seq_num: count, - payload, + payload: VoicePacketPayload::Opus(payload.into(), false), position_info: None, }; count += 1; diff --git a/mumd/src/state.rs b/mumd/src/state.rs index 85e5449..1421691 100644 --- a/mumd/src/state.rs +++ b/mumd/src/state.rs @@ -64,7 +64,7 @@ pub struct State { } impl State { - pub fn new( + pub async fn new( packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>, connection_info_sender: watch::Sender<Option<ConnectionInfo>>, ) -> Self { @@ -72,7 +72,7 @@ impl State { let audio = Audio::new( config.audio.input_volume.unwrap_or(1.0), config.audio.output_volume.unwrap_or(1.0), - ); + ).await; let mut state = Self { config, server: None, |
