use bytes::Bytes; use cpal::{InputCallbackInfo, Sample}; use mumble_protocol::voice::VoicePacketPayload; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; use log::*; use futures::Stream; use std::pin::Pin; use std::task::{Context, Poll}; use futures_util::task::Waker; pub fn callback( mut opus_encoder: opus::Encoder, input_sender: mpsc::Sender, sample_rate: u32, input_volume_receiver: watch::Receiver, opus_frame_size_blocks: u32, // blocks of 2.5ms ) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { if !(opus_frame_size_blocks == 1 || opus_frame_size_blocks == 2 || opus_frame_size_blocks == 4 || opus_frame_size_blocks == 8) { panic!( "Unsupported amount of opus frame blocks {}", opus_frame_size_blocks ); } let opus_frame_size = opus_frame_size_blocks * sample_rate / 400; let buf = Arc::new(Mutex::new(VecDeque::new())); move |data: &[T], _info: &InputCallbackInfo| { debug!("{:?}", _info); let mut buf = buf.lock().unwrap(); let input_volume = *input_volume_receiver.borrow(); let out: Vec = data .iter() .map(|e| e.to_f32()) .map(|e| e * input_volume) .collect(); buf.extend(out); while buf.len() >= opus_frame_size as usize { let tail = buf.split_off(opus_frame_size as usize); let mut opus_buf: Vec = vec![0; opus_frame_size as usize]; let result = opus_encoder .encode_float(&Vec::from(buf.clone()), &mut opus_buf) .unwrap(); opus_buf.truncate(result); let bytes = Bytes::copy_from_slice(&opus_buf); match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) { Ok(_) => {} Err(_e) => { //warn!("Error sending audio packet: {:?}", e); } } *buf = tail; } } } struct AudioStream { data: Arc, Option)>>, } impl AudioStream { fn new() -> Self { Self { data: Arc::new(Mutex::new((VecDeque::new(), None))) } } fn insert_sample(&self, sample: T) { let mut data = self.data.lock().unwrap(); data.0.push_back(sample); if let Some(waker) = data.1.take() { waker.wake(); } } } impl Stream for AudioStream { type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let s = self.get_mut(); let mut data = s.data.lock().unwrap(); if data.0.len() > 0 { Poll::Ready(data.0.pop_front()) } else { data.1 = Some(cx.waker().clone()); Poll::Pending } } }