aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/audio/input.rs
blob: 01fd1f370f8611f09936afbb643a35223e7f1b3d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use bytes::Bytes;
use cpal::{InputCallbackInfo, Sample};
use mumble_protocol::voice::VoicePacketPayload;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, watch};
use log::*;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::task::Waker;

pub fn callback<T: Sample>(
    mut opus_encoder: opus::Encoder,
    input_sender: mpsc::Sender<VoicePacketPayload>,
    sample_rate: u32,
    input_volume_receiver: watch::Receiver<f32>,
    opus_frame_size_blocks: u32, // blocks of 2.5ms
) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static {
    if !(opus_frame_size_blocks == 1
        || opus_frame_size_blocks == 2
        || opus_frame_size_blocks == 4
        || opus_frame_size_blocks == 8)
    {
        panic!(
            "Unsupported amount of opus frame blocks {}",
            opus_frame_size_blocks
        );
    }
    let opus_frame_size = opus_frame_size_blocks * sample_rate / 400;

    let buf = Arc::new(Mutex::new(VecDeque::new()));
    move |data: &[T], _info: &InputCallbackInfo| {
        debug!("{:?}", _info);
        let mut buf = buf.lock().unwrap();
        let input_volume = *input_volume_receiver.borrow();
        let out: Vec<f32> = data
            .iter()
            .map(|e| e.to_f32())
            .map(|e| e * input_volume)
            .collect();
        buf.extend(out);
        while buf.len() >= opus_frame_size as usize {
            let tail = buf.split_off(opus_frame_size as usize);
            let mut opus_buf: Vec<u8> = vec![0; opus_frame_size as usize];
            let result = opus_encoder
                .encode_float(&Vec::from(buf.clone()), &mut opus_buf)
                .unwrap();
            opus_buf.truncate(result);
            let bytes = Bytes::copy_from_slice(&opus_buf);
            match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) {
                Ok(_) => {}
                Err(_e) => {
                    //warn!("Error sending audio packet: {:?}", e);
                }
            }
            *buf = tail;
        }
    }
}

struct AudioStream<T> {
    data: Arc<Mutex<(VecDeque<T>, Option<Waker>)>>,
}

impl<T> AudioStream<T> {
    fn new() -> Self {
        Self {
            data: Arc::new(Mutex::new((VecDeque::new(), None)))
        }
    }
    
    fn insert_sample(&self, sample: T) {
        let mut data = self.data.lock().unwrap();
        data.0.push_back(sample);
        if let Some(waker) = data.1.take() {
            waker.wake();
        }
    }
}

impl<T> Stream for AudioStream<T> {
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let s = self.get_mut();
        let mut data = s.data.lock().unwrap();
        if data.0.len() > 0 {
            Poll::Ready(data.0.pop_front())
        } else {
            data.1 = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}