aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/audio/output.rs
blob: 78dba02481f22dd9a28b7352a3e50557ddc0ecb9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use cpal::{OutputCallbackInfo, Sample};
use mumble_protocol::voice::VoicePacketPayload;
use opus::Channels;
use std::collections::{HashMap, VecDeque};
use std::ops::AddAssign;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;

pub struct ClientStream {
    buffer: VecDeque<f32>, //TODO ring buffer?
    opus_decoder: opus::Decoder,
}

impl ClientStream {
    pub fn new(sample_rate: u32, channels: u16) -> Self {
        Self {
            buffer: VecDeque::new(),
            opus_decoder: opus::Decoder::new(
                sample_rate,
                match channels {
                    1 => Channels::Mono,
                    2 => Channels::Stereo,
                    _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels),
                },
            )
            .unwrap(),
        }
    }

    pub fn decode_packet(&mut self, payload: VoicePacketPayload, channels: usize) {
        match payload {
            VoicePacketPayload::Opus(bytes, _eot) => {
                let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode
                let parsed = self
                    .opus_decoder
                    .decode_float(&bytes, &mut out, false)
                    .expect("Error decoding");
                out.truncate(parsed);
                self.buffer.extend(out);
            }
            _ => {
                unimplemented!("Payload type not supported");
            }
        }
    }
}

pub trait SaturatingAdd {
    fn saturating_add(self, rhs: Self) -> Self;
}

impl SaturatingAdd for f32 {
    fn saturating_add(self, rhs: Self) -> Self {
        match self + rhs {
            a if a < -1.0 => -1.0,
            a if a > 1.0 => 1.0,
            a => a,
        }
    }
}

impl SaturatingAdd for i16 {
    fn saturating_add(self, rhs: Self) -> Self {
        i16::saturating_add(self, rhs)
    }
}

impl SaturatingAdd for u16 {
    fn saturating_add(self, rhs: Self) -> Self {
        u16::saturating_add(self, rhs)
    }
}

pub fn curry_callback<T: Sample + AddAssign + SaturatingAdd>(
    buf: Arc<Mutex<HashMap<u32, ClientStream>>>,
    output_volume_receiver: watch::Receiver<f32>,
    user_volumes: Arc<Mutex<HashMap<u32, f32>>>
) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
    move |data: &mut [T], _info: &OutputCallbackInfo| {
        for sample in data.iter_mut() {
            *sample = Sample::from(&0.0);
        }

        let volume = *output_volume_receiver.borrow();

        let mut lock = buf.lock().unwrap();
        for (id, client_stream) in &mut *lock {
            let user_volume = user_volumes.lock().unwrap().get(id).cloned().unwrap_or(1.0);
            for sample in data.iter_mut() {
                *sample = sample.saturating_add(Sample::from(
                    &(client_stream.buffer.pop_front().unwrap_or(0.0) * volume * user_volume),
                ));
            }
        }
    }
}