aboutsummaryrefslogtreecommitdiffstats
path: root/mumd
diff options
context:
space:
mode:
Diffstat (limited to 'mumd')
-rw-r--r--mumd/Cargo.toml7
-rw-r--r--mumd/src/audio.rs398
-rw-r--r--mumd/src/audio/input.rs48
-rw-r--r--mumd/src/network/udp.rs24
4 files changed, 381 insertions, 96 deletions
diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml
index 9af1f73..4f0bc19 100644
--- a/mumd/Cargo.toml
+++ b/mumd/Cargo.toml
@@ -18,17 +18,19 @@ notifications = ["libnotify"]
[dependencies]
mumlib = { version = "0.3", path = "../mumlib" }
-argparse = "0.2"
cpal = "0.13"
bytes = "1.0"
dasp_interpolate = { version = "0.11", features = ["linear"] }
dasp_signal = "0.11"
+dasp_frame = "0.11"
+dasp_sample = "0.11"
+dasp_ring_buffer = "0.11"
futures = "0.3"
futures-util = "0.3"
hound = "3.4"
ipc-channel = "0.14"
log = "0.4"
-mumble-protocol = "0.4.0"
+mumble-protocol = "0.4.1"
native-tls = "0.2"
openssl = { version = "0.10" }
opus = "0.2"
@@ -36,6 +38,7 @@ serde = { version = "1.0", features = ["derive"] }
strum = "0.20"
strum_macros = "0.20"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "net", "time"] }
+tokio-stream = "0.1.0"
tokio-native-tls = "0.3"
tokio-util = { version = "0.6", features = ["codec", "net"] }
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index 0df2852..680433c 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -4,24 +4,33 @@ pub mod output;
use crate::audio::output::SaturatingAdd;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
-use cpal::{SampleFormat, SampleRate, Stream, StreamConfig};
+use cpal::{SampleFormat, SampleRate, StreamConfig};
+use dasp_frame::Frame;
use dasp_interpolate::linear::Linear;
+use dasp_sample::{SignedSample, ToSample, Sample};
use dasp_signal::{self as signal, Signal};
+use futures::Stream;
+use futures::stream::StreamExt;
+use futures::task::{Context, Poll};
use log::*;
-use mumble_protocol::voice::VoicePacketPayload;
+use mumble_protocol::Serverbound;
+use mumble_protocol::voice::{VoicePacketPayload, VoicePacket};
use mumlib::config::SoundEffect;
use opus::Channels;
use std::{
borrow::Cow,
collections::{hash_map::Entry, HashMap, VecDeque},
convert::TryFrom,
+ fmt::Debug,
fs::File,
+ future::Future,
io::Read,
- sync::{Arc, Mutex}
+ pin::Pin,
+ sync::{Arc, Mutex},
};
use strum::IntoEnumIterator;
use strum_macros::EnumIter;
-use tokio::sync::{mpsc, watch};
+use tokio::sync::watch;
const SAMPLE_RATE: u32 = 48000;
@@ -63,10 +72,10 @@ impl TryFrom<&str> for NotificationEvents {
pub struct Audio {
output_config: StreamConfig,
- _output_stream: Stream,
- _input_stream: Stream,
+ _output_stream: cpal::Stream,
+ _input_stream: cpal::Stream,
- input_channel_receiver: Option<mpsc::Receiver<VoicePacketPayload>>,
+ input_channel_receiver: Arc<Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>>,
input_volume_sender: watch::Sender<f32>,
output_volume_sender: watch::Sender<f32>,
@@ -161,20 +170,7 @@ impl Audio {
}
.unwrap();
- let input_encoder = opus::Encoder::new(
- input_config.sample_rate.0,
- match input_config.channels {
- 1 => Channels::Mono,
- 2 => Channels::Stereo,
- _ => unimplemented!(
- "Only 1 or 2 channels supported, got {})",
- input_config.channels
- ),
- },
- opus::Application::Voip,
- )
- .unwrap();
- let (input_sender, input_receiver) = mpsc::channel(100);
+ let (sample_sender, sample_receiver) = futures::channel::mpsc::channel(1_000_000);
let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
@@ -182,39 +178,47 @@ impl Audio {
SampleFormat::F32 => input_device.build_input_stream(
&input_config,
input::callback::<f32>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
SampleFormat::I16 => input_device.build_input_stream(
&input_config,
input::callback::<i16>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
SampleFormat::U16 => input_device.build_input_stream(
&input_config,
input::callback::<u16>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
}
.unwrap();
+ let opus_stream = OpusEncoder::new(
+ 4,
+ input_config.sample_rate.0,
+ input_config.channels as usize,
+ StreamingSignalExt::into_interleaved_samples(
+ StreamingNoiseGate::new(
+ from_interleaved_samples_stream::<_, f32>(sample_receiver), //TODO group frames correctly
+ 0.09,
+ 10_000))).enumerate().map(|(i, e)| VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0, // normal speech
+ session_id: (), // unused for server-bound packets
+ seq_num: i as u64,
+ payload: VoicePacketPayload::Opus(e.into(), false),
+ position_info: None,
+ });
+
output_stream.play().unwrap();
let mut res = Self {
@@ -222,7 +226,7 @@ impl Audio {
_output_stream: output_stream,
_input_stream: input_stream,
input_volume_sender,
- input_channel_receiver: Some(input_receiver),
+ input_channel_receiver: Arc::new(Mutex::new(Box::new(opus_stream))),
client_streams,
sounds: HashMap::new(),
output_volume_sender,
@@ -269,7 +273,7 @@ impl Audio {
_ => unimplemented!() // TODO handle gracefully (this might not even happen)
};
let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter);
- let interp = Linear::new(signal.next(), signal.next());
+ let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal));
let samples = signal
.from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64)
.until_exhausted()
@@ -328,8 +332,8 @@ impl Audio {
}
}
- pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<VoicePacketPayload>> {
- self.input_channel_receiver.take()
+ pub fn take_receiver(&mut self) -> Arc<Mutex<Box<dyn Stream<Item = VoicePacket<Serverbound>> + Unpin>>> {
+ Arc::clone(&self.input_channel_receiver)
}
pub fn clear_clients(&mut self) {
@@ -395,3 +399,323 @@ fn get_sfx(file: &str) -> Cow<'static, [u8]> {
fn get_default_sfx() -> Cow<'static, [u8]> {
Cow::from(include_bytes!("fallback_sfx.wav").as_ref())
}
+
+struct StreamingNoiseGate<S: StreamingSignal> {
+ open: usize,
+ signal: S,
+ activate_threshold: <<S::Frame as Frame>::Sample as Sample>::Float,
+ deactivation_delay: usize,
+}
+
+impl<S: StreamingSignal> StreamingNoiseGate<S> {
+ pub fn new(
+ signal: S,
+ activate_threshold: <<S::Frame as Frame>::Sample as Sample>::Float,
+ deactivation_delay: usize,
+ ) -> StreamingNoiseGate<S> {
+ Self {
+ open: 0,
+ signal,
+ activate_threshold,
+ deactivation_delay
+ }
+ }
+}
+
+impl<S> StreamingSignal for StreamingNoiseGate<S>
+ where
+ S: StreamingSignal + Unpin,
+ <<<S as StreamingSignal>::Frame as Frame>::Sample as Sample>::Float: Unpin,
+ <S as StreamingSignal>::Frame: Unpin {
+ type Frame = S::Frame;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
+ let s = self.get_mut();
+
+ let frame = match S::poll_next(Pin::new(&mut s.signal), cx) {
+ Poll::Ready(v) => v,
+ Poll::Pending => return Poll::Pending,
+ };
+
+ match s.open {
+ 0 => {
+ if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) {
+ s.open = s.deactivation_delay;
+ }
+ }
+ _ => {
+ if frame.to_float_frame().channels().any(|e| abs(e) >= s.activate_threshold) {
+ s.open = s.deactivation_delay;
+ } else {
+ s.open -= 1;
+ }
+ }
+ }
+
+ if s.open != 0 {
+ Poll::Ready(frame)
+ } else {
+ Poll::Ready(<S::Frame as Frame>::EQUILIBRIUM)
+ }
+ }
+
+ fn is_exhausted(&self) -> bool {
+ self.signal.is_exhausted()
+ }
+}
+
+fn abs<S: SignedSample>(sample: S) -> S {
+ let zero = S::EQUILIBRIUM;
+ if sample >= zero {
+ sample
+ } else {
+ -sample
+ }
+}
+
+trait StreamingSignal {
+ type Frame: Frame;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame>;
+
+ fn is_exhausted(&self) -> bool {
+ false
+ }
+}
+
+impl<S> StreamingSignal for S
+ where
+ S: Signal + Unpin {
+ type Frame = S::Frame;
+
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Frame> {
+ Poll::Ready(self.get_mut().next())
+ }
+}
+
+trait StreamingSignalExt: StreamingSignal {
+ fn next(&mut self) -> Next<'_, Self> {
+ Next {
+ stream: self
+ }
+ }
+
+ fn into_interleaved_samples(self) -> IntoInterleavedSamples<Self>
+ where
+ Self: Sized {
+ IntoInterleavedSamples { signal: self, current_frame: None }
+ }
+}
+
+impl<S> StreamingSignalExt for S
+ where S: StreamingSignal {}
+
+struct Next<'a, S: ?Sized> {
+ stream: &'a mut S
+}
+
+impl<'a, S: StreamingSignal + Unpin> Future for Next<'a, S> {
+ type Output = S::Frame;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match S::poll_next(Pin::new(self.stream), cx) {
+ Poll::Ready(val) => {
+ Poll::Ready(val)
+ }
+ Poll::Pending => Poll::Pending
+ }
+ }
+}
+
+struct IntoInterleavedSamples<S: StreamingSignal> {
+ signal: S,
+ current_frame: Option<<S::Frame as Frame>::Channels>,
+}
+
+impl<S> Stream for IntoInterleavedSamples<S>
+ where
+ S: StreamingSignal + Unpin,
+ <<S as StreamingSignal>::Frame as Frame>::Channels: Unpin {
+ type Item = <S::Frame as Frame>::Sample;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let s = self.get_mut();
+ loop {
+ if s.current_frame.is_some() {
+ if let Some(channel) = s.current_frame.as_mut().unwrap().next() {
+ return Poll::Ready(Some(channel));
+ }
+ }
+ match S::poll_next(Pin::new(&mut s.signal), cx) {
+ Poll::Ready(val) => {
+ s.current_frame = Some(val.channels());
+ }
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ }
+}
+
+struct FromStream<S> {
+ stream: S,
+ underlying_exhausted: bool,
+}
+
+impl<S> StreamingSignal for FromStream<S>
+ where
+ S: Stream + Unpin,
+ S::Item: Frame + Unpin {
+ type Frame = S::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
+ let s = self.get_mut();
+ if s.underlying_exhausted {
+ return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM);
+ }
+ match S::poll_next(Pin::new(&mut s.stream), cx) {
+ Poll::Ready(Some(val)) => {
+ Poll::Ready(val)
+ }
+ Poll::Ready(None) => {
+ s.underlying_exhausted = true;
+ return Poll::Ready(<Self::Frame as Frame>::EQUILIBRIUM);
+ }
+ Poll::Pending => Poll::Pending,
+ }
+ }
+
+ fn is_exhausted(&self) -> bool {
+ self.underlying_exhausted
+ }
+}
+
+
+struct FromInterleavedSamplesStream<S, F>
+ where
+ F: Frame {
+ stream: S,
+ next_buf: Vec<F::Sample>,
+ underlying_exhausted: bool,
+}
+
+fn from_interleaved_samples_stream<S, F>(stream: S) -> FromInterleavedSamplesStream<S, F>
+ where
+ S: Stream + Unpin,
+ S::Item: Sample,
+ F: Frame<Sample = S::Item> {
+ FromInterleavedSamplesStream {
+ stream,
+ next_buf: Vec::new(),
+ underlying_exhausted: false,
+ }
+}
+
+impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F>
+ where
+ S: Stream + Unpin,
+ S::Item: Sample + Unpin,
+ F: Frame<Sample = S::Item> + Unpin {
+ type Frame = F;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Frame> {
+ let s = self.get_mut();
+ if s.underlying_exhausted {
+ return Poll::Ready(F::EQUILIBRIUM);
+ }
+ while s.next_buf.len() < F::CHANNELS {
+ match S::poll_next(Pin::new(&mut s.stream), cx) {
+ Poll::Ready(Some(v)) => {
+ s.next_buf.push(v);
+ }
+ Poll::Ready(None) => {
+ s.underlying_exhausted = true;
+ return Poll::Ready(F::EQUILIBRIUM);
+ }
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+
+ let mut data = s.next_buf.iter().cloned();
+ let n = F::from_samples(&mut data).unwrap();
+ s.next_buf.clear();
+ Poll::Ready(n)
+ }
+
+ fn is_exhausted(&self) -> bool {
+ self.underlying_exhausted
+ }
+}
+
+struct OpusEncoder<S> {
+ encoder: opus::Encoder,
+ frame_size: u32,
+ sample_rate: u32,
+ stream: S,
+ input_buffer: Vec<f32>,
+ exhausted: bool,
+}
+
+impl<S, I> OpusEncoder<S>
+ where
+ S: Stream<Item = I>,
+ I: ToSample<f32> {
+ fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self {
+ let encoder = opus::Encoder::new(
+ sample_rate,
+ match channels {
+ 1 => Channels::Mono,
+ 2 => Channels::Stereo,
+ _ => unimplemented!(
+ "Only 1 or 2 channels supported, got {})",
+ channels
+ ),
+ },
+ opus::Application::Voip,
+ ).unwrap();
+ Self {
+ encoder,
+ frame_size,
+ sample_rate,
+ stream,
+ input_buffer: Vec::new(),
+ exhausted: false,
+ }
+ }
+}
+
+impl<S, I> Stream for OpusEncoder<S>
+ where
+ S: Stream<Item = I> + Unpin,
+ I: Sample + ToSample<f32> {
+ type Item = Vec<u8>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let s = self.get_mut();
+ if s.exhausted {
+ return Poll::Ready(None);
+ }
+ let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize;
+ loop {
+ while s.input_buffer.len() < opus_frame_size {
+ match S::poll_next(Pin::new(&mut s.stream), cx) {
+ Poll::Ready(Some(v)) => {
+ s.input_buffer.push(v.to_sample::<f32>());
+ }
+ Poll::Ready(None) => {
+ s.exhausted = true;
+ return Poll::Ready(None);
+ }
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ if s.input_buffer.iter().any(|&e| e != 0.0) {
+ break;
+ }
+ s.input_buffer.clear();
+ }
+
+ let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap();
+ s.input_buffer.clear();
+ Poll::Ready(Some(encoded))
+ }
+}
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
index fe0d21f..deb0fb8 100644
--- a/mumd/src/audio/input.rs
+++ b/mumd/src/audio/input.rs
@@ -1,54 +1,20 @@
-use bytes::Bytes;
use cpal::{InputCallbackInfo, Sample};
-use mumble_protocol::voice::VoicePacketPayload;
-use std::collections::VecDeque;
-use std::sync::{Arc, Mutex};
-use tokio::sync::{mpsc, watch};
+use tokio::sync::watch;
+use log::*;
pub fn callback<T: Sample>(
- mut opus_encoder: opus::Encoder,
- input_sender: mpsc::Sender<VoicePacketPayload>,
- sample_rate: u32,
+ mut input_sender: futures::channel::mpsc::Sender<f32>,
input_volume_receiver: watch::Receiver<f32>,
- opus_frame_size_blocks: u32, // blocks of 2.5ms
) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static {
- if !(opus_frame_size_blocks == 1
- || opus_frame_size_blocks == 2
- || opus_frame_size_blocks == 4
- || opus_frame_size_blocks == 8)
- {
- panic!(
- "Unsupported amount of opus frame blocks {}",
- opus_frame_size_blocks
- );
- }
- let opus_frame_size = opus_frame_size_blocks * sample_rate / 400;
-
- let buf = Arc::new(Mutex::new(VecDeque::new()));
move |data: &[T], _info: &InputCallbackInfo| {
- let mut buf = buf.lock().unwrap();
let input_volume = *input_volume_receiver.borrow();
- let out: Vec<f32> = data
+ for sample in data
.iter()
.map(|e| e.to_f32())
- .map(|e| e * input_volume)
- .collect();
- buf.extend(out);
- while buf.len() >= opus_frame_size as usize {
- let tail = buf.split_off(opus_frame_size as usize);
- let mut opus_buf: Vec<u8> = vec![0; opus_frame_size as usize];
- let result = opus_encoder
- .encode_float(&Vec::from(buf.clone()), &mut opus_buf)
- .unwrap();
- opus_buf.truncate(result);
- let bytes = Bytes::copy_from_slice(&opus_buf);
- match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) {
- Ok(_) => {}
- Err(_e) => {
- //warn!("Error sending audio packet: {:?}", e);
- }
+ .map(|e| e * input_volume) {
+ if let Err(_e) = input_sender.try_send(sample) {
+ warn!("Error sending audio: {}", _e);
}
- *buf = tail;
}
}
}
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index f7eeb62..0c00029 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -2,7 +2,7 @@ use crate::network::ConnectionInfo;
use crate::state::{State, StatePhase};
use bytes::Bytes;
-use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt};
+use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt, Stream};
use futures_util::stream::{SplitSink, SplitStream};
use log::*;
use mumble_protocol::crypt::ClientCryptState;
@@ -28,7 +28,7 @@ pub async fn handle(
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
mut crypt_state_receiver: mpsc::Receiver<ClientCryptState>,
) {
- let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap();
+ let receiver = state.lock().unwrap().audio_mut().take_receiver();
loop {
let connection_info = 'data: loop {
@@ -50,13 +50,14 @@ pub async fn handle(
let source = Arc::new(Mutex::new(source));
let phase_watcher = state.lock().unwrap().phase_receiver();
+ let mut audio_receiver_lock = receiver.lock().unwrap();
join!(
listen(Arc::clone(&state), Arc::clone(&source), phase_watcher.clone()),
send_voice(
Arc::clone(&sink),
connection_info.socket_addr,
phase_watcher,
- &mut receiver
+ &mut *audio_receiver_lock
),
new_crypt_state(&mut crypt_state_receiver, sink, source)
);
@@ -197,8 +198,9 @@ async fn send_voice(
sink: Arc<Mutex<UdpSender>>,
server_addr: SocketAddr,
mut phase_watcher: watch::Receiver<StatePhase>,
- receiver: &mut mpsc::Receiver<VoicePacketPayload>,
+ receiver: &mut (dyn Stream<Item = VoicePacket<Serverbound>> + Unpin),
) {
+ pin_mut!(receiver);
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
loop {
@@ -213,9 +215,8 @@ async fn send_voice(
let main_block = async {
let rx = rx.fuse();
pin_mut!(rx);
- let mut count = 0;
loop {
- let packet_recv = receiver.recv().fuse();
+ let packet_recv = receiver.next().fuse();
pin_mut!(packet_recv);
let exitor = select! {
data = packet_recv => Some(data),
@@ -229,16 +230,7 @@ async fn send_voice(
warn!("Channel closed before disconnect command");
break;
}
- Some(Some(payload)) => {
- let reply = VoicePacket::Audio {
- _dst: std::marker::PhantomData,
- target: 0, // normal speech
- session_id: (), // unused for server-bound packets
- seq_num: count,
- payload,
- position_info: None,
- };
- count += 1;
+ Some(Some(reply)) => {
sink.lock()
.unwrap()
.send((reply, server_addr))