use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig}; use log::*; use tokio::sync::watch; use crate::audio::SAMPLE_RATE; use crate::audio::transformers::{NoiseGate, Transformer}; use crate::error::{AudioError, AudioStream}; use crate::state::StatePhase; pub fn callback( mut input_sender: futures_channel::mpsc::Sender>, mut transformers: Vec>, mut opus_encoder: opus::Encoder, buffer_size: usize, input_volume_receiver: watch::Receiver, phase_watcher: watch::Receiver, ) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static { let mut buffer = Vec::with_capacity(buffer_size); move |data: &[T], _info: &InputCallbackInfo| { if !matches!(&*phase_watcher.borrow(), StatePhase::Connected(_)) { return; } let input_volume = *input_volume_receiver.borrow(); let mut data = data.iter().map(|e| e.to_f32()).map(|e| e * input_volume); while buffer.len() + data.len() > buffer_size { buffer.extend(data.by_ref().take(buffer_size - buffer.len())); let encoded = transformers .iter_mut() .try_fold((opus::Channels::Mono, &mut buffer[..]), |acc, e| e.transform(acc)) .map(|buf| opus_encoder.encode_vec_float(&*buf.1, buffer_size).unwrap()); if let Some(encoded) = encoded { if let Err(e) = input_sender.try_send(encoded) { warn!("Error sending audio: {}", e); } } buffer.clear(); } buffer.extend(data); } } pub trait AudioInputDevice { fn play(&self) -> Result<(), AudioError>; fn pause(&self) -> Result<(), AudioError>; fn set_volume(&self, volume: f32); fn sample_receiver(&mut self) -> Option>>; fn num_channels(&self) -> usize; } pub struct DefaultAudioInputDevice { stream: cpal::Stream, sample_receiver: Option>>, volume_sender: watch::Sender, channels: u16, } impl DefaultAudioInputDevice { pub fn new( input_volume: f32, phase_watcher: watch::Receiver, frame_size: u32, // blocks of 2.5 ms ) -> Result { let sample_rate = SampleRate(SAMPLE_RATE); let host = cpal::default_host(); let input_device = host .default_input_device() .ok_or(AudioError::NoDevice(AudioStream::Input))?; let input_supported_config = input_device .supported_input_configs() .map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))? .find_map(|c| { if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate { Some(c) } else { None } }) .ok_or(AudioError::NoSupportedConfig(AudioStream::Input))? .with_sample_rate(sample_rate); let input_supported_sample_format = input_supported_config.sample_format(); let input_config: StreamConfig = input_supported_config.into(); let err_fn = |err| error!("An error occurred on the output audio stream: {}", err); let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000); let (volume_sender, input_volume_receiver) = watch::channel::(input_volume); let opus_encoder = opus::Encoder::new( sample_rate.0, match input_config.channels { 1 => opus::Channels::Mono, 2 => opus::Channels::Stereo, _ => unimplemented!("Only 1 or 2 channels supported, got {}", input_config.channels), }, opus::Application::Voip, ) .unwrap(); let buffer_size = (sample_rate.0 * frame_size / 400) as usize; let transformers = vec![Box::new(NoiseGate::new(50)) as Box]; let input_stream = match input_supported_sample_format { SampleFormat::F32 => input_device.build_input_stream( &input_config, callback::( sample_sender, transformers, opus_encoder, buffer_size, input_volume_receiver, phase_watcher ), err_fn, ), SampleFormat::I16 => input_device.build_input_stream( &input_config, callback::( sample_sender, transformers, opus_encoder, buffer_size, input_volume_receiver, phase_watcher ), err_fn, ), SampleFormat::U16 => input_device.build_input_stream( &input_config, callback::( sample_sender, transformers, opus_encoder, buffer_size, input_volume_receiver, phase_watcher ), err_fn, ), } .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?; let res = Self { stream: input_stream, sample_receiver: Some(sample_receiver), volume_sender, channels: input_config.channels, }; Ok(res) } } impl AudioInputDevice for DefaultAudioInputDevice { fn play(&self) -> Result<(), AudioError> { self.stream .play() .map_err(|e| AudioError::InputPlayError(e)) } fn pause(&self) -> Result<(), AudioError> { self.stream .pause() .map_err(|e| AudioError::InputPauseError(e)) } fn set_volume(&self, volume: f32) { self.volume_sender.send(volume).unwrap(); } fn sample_receiver(&mut self) -> Option>> { self.sample_receiver.take() } fn num_channels(&self) -> usize { self.channels as usize } }