aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mumd/src/audio.rs71
-rw-r--r--mumd/src/audio/input.rs46
-rw-r--r--mumd/src/main.rs2
-rw-r--r--mumd/src/network/udp.rs11
-rw-r--r--mumd/src/state.rs4
5 files changed, 45 insertions, 89 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index 0a2465e..a8af82d 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -13,9 +13,9 @@ use opus::Channels;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
-use tokio::sync::{mpsc, watch};
+use tokio::sync::watch;
use dasp_frame::Frame;
-use dasp_sample::{Sample, SignedSample, ToSample};
+use dasp_sample::{SignedSample, ToSample, Sample};
use dasp_ring_buffer::Fixed;
use futures::Stream;
use futures::task::{Context, Poll};
@@ -74,7 +74,7 @@ pub struct Audio {
_output_stream: cpal::Stream,
_input_stream: cpal::Stream,
- input_channel_receiver: Option<mpsc::Receiver<VoicePacketPayload>>,
+ input_channel_receiver: Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>>,
input_volume_sender: watch::Sender<f32>,
output_volume_sender: watch::Sender<f32>,
@@ -88,7 +88,7 @@ pub struct Audio {
}
impl Audio {
- pub fn new(input_volume: f32, output_volume: f32) -> Self {
+ pub async fn new(input_volume: f32, output_volume: f32) -> Self {
let sample_rate = SampleRate(SAMPLE_RATE);
let host = cpal::default_host();
@@ -169,20 +169,7 @@ impl Audio {
}
.unwrap();
- let input_encoder = opus::Encoder::new(
- input_config.sample_rate.0,
- match input_config.channels {
- 1 => Channels::Mono,
- 2 => Channels::Stereo,
- _ => unimplemented!(
- "Only 1 or 2 channels supported, got {})",
- input_config.channels
- ),
- },
- opus::Application::Voip,
- )
- .unwrap();
- let (input_sender, input_receiver) = mpsc::channel(100);
+ let (sample_sender, sample_receiver) = futures::channel::mpsc::channel(1_000_000);
let (input_volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
@@ -190,39 +177,37 @@ impl Audio {
SampleFormat::F32 => input_device.build_input_stream(
&input_config,
input::callback::<f32>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
SampleFormat::I16 => input_device.build_input_stream(
&input_config,
input::callback::<i16>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
SampleFormat::U16 => input_device.build_input_stream(
&input_config,
input::callback::<u16>(
- input_encoder,
- input_sender,
- input_config.sample_rate.0,
+ sample_sender,
input_volume_receiver,
- 4, // 10 ms
),
err_fn,
),
}
.unwrap();
+ let opus_stream = OpusEncoder::new(
+ 4,
+ input_config.sample_rate.0,
+ input_config.channels as usize,
+ StreamingSignalExt::into_interleaved_samples(from_interleaved_samples_stream::<_, f32>(sample_receiver).await)); //TODO attach a noise gate
+ //TODO group frames correctly
+
output_stream.play().unwrap();
let sounds = EVENT_SOUNDS
@@ -246,7 +231,7 @@ impl Audio {
_ => unimplemented!() // TODO handle gracefully (this might not even happen)
};
let mut signal = signal::from_interleaved_samples_iter::<_, [f32; 2]>(iter);
- let interp = Linear::new(signal.next(), signal.next());
+ let interp = Linear::new(Signal::next(&mut signal), Signal::next(&mut signal));
let samples = signal
.from_hz_to_hz(interp, spec.sample_rate as f64, SAMPLE_RATE as f64)
.until_exhausted()
@@ -262,7 +247,7 @@ impl Audio {
_output_stream: output_stream,
_input_stream: input_stream,
input_volume_sender,
- input_channel_receiver: Some(input_receiver),
+ input_channel_receiver: Some(Box::new(opus_stream)),
client_streams,
sounds,
output_volume_sender,
@@ -312,7 +297,7 @@ impl Audio {
}
}
- pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<VoicePacketPayload>> {
+ pub fn take_receiver(&mut self) -> Option<Box<dyn Stream<Item = Vec<u8>> + Unpin>> {
self.input_channel_receiver.take()
}
@@ -530,6 +515,9 @@ trait StreamingSignalExt: StreamingSignal {
}
}
+impl<S> StreamingSignalExt for S
+ where S: StreamingSignal {}
+
struct Next<'a, S: ?Sized> {
stream: &'a mut S
}
@@ -561,16 +549,16 @@ impl<S> Stream for IntoInterleavedSamples<S>
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let s = self.get_mut();
loop {
- if s.current_frame.is_none() {
- match S::poll_next(Pin::new(&mut s.signal), cx) {
- Poll::Ready(val) => {
- s.current_frame = Some(val.channels());
- }
- Poll::Pending => return Poll::Pending,
+ if s.current_frame.is_some() {
+ if let Some(channel) = s.current_frame.as_mut().unwrap().next() {
+ return Poll::Ready(Some(channel));
}
}
- if let Some(channel) = s.current_frame.as_mut().unwrap().next() {
- return Poll::Ready(Some(channel));
+ match S::poll_next(Pin::new(&mut s.signal), cx) {
+ Poll::Ready(val) => {
+ s.current_frame = Some(val.channels());
+ }
+ Poll::Pending => return Poll::Pending,
}
}
}
@@ -681,6 +669,7 @@ impl<S, F> StreamingSignal for FromInterleavedSamplesStream<S, F>
let mut data = s.next_buf.iter().cloned();
let n = F::from_samples(&mut data).unwrap();
+ s.next_buf.clear();
let ret = mem::replace(&mut s.next_frame, Some(n)).unwrap();
Poll::Ready(ret)
}
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
index 01fd1f3..8f0fe6e 100644
--- a/mumd/src/audio/input.rs
+++ b/mumd/src/audio/input.rs
@@ -1,9 +1,7 @@
-use bytes::Bytes;
use cpal::{InputCallbackInfo, Sample};
-use mumble_protocol::voice::VoicePacketPayload;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
-use tokio::sync::{mpsc, watch};
+use tokio::sync::watch;
use log::*;
use futures::Stream;
use std::pin::Pin;
@@ -11,50 +9,18 @@ use std::task::{Context, Poll};
use futures_util::task::Waker;
pub fn callback<T: Sample>(
- mut opus_encoder: opus::Encoder,
- input_sender: mpsc::Sender<VoicePacketPayload>,
- sample_rate: u32,
+ mut input_sender: futures::channel::mpsc::Sender<f32>,
input_volume_receiver: watch::Receiver<f32>,
- opus_frame_size_blocks: u32, // blocks of 2.5ms
) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static {
- if !(opus_frame_size_blocks == 1
- || opus_frame_size_blocks == 2
- || opus_frame_size_blocks == 4
- || opus_frame_size_blocks == 8)
- {
- panic!(
- "Unsupported amount of opus frame blocks {}",
- opus_frame_size_blocks
- );
- }
- let opus_frame_size = opus_frame_size_blocks * sample_rate / 400;
-
- let buf = Arc::new(Mutex::new(VecDeque::new()));
move |data: &[T], _info: &InputCallbackInfo| {
- debug!("{:?}", _info);
- let mut buf = buf.lock().unwrap();
let input_volume = *input_volume_receiver.borrow();
- let out: Vec<f32> = data
+ for sample in data
.iter()
.map(|e| e.to_f32())
- .map(|e| e * input_volume)
- .collect();
- buf.extend(out);
- while buf.len() >= opus_frame_size as usize {
- let tail = buf.split_off(opus_frame_size as usize);
- let mut opus_buf: Vec<u8> = vec![0; opus_frame_size as usize];
- let result = opus_encoder
- .encode_float(&Vec::from(buf.clone()), &mut opus_buf)
- .unwrap();
- opus_buf.truncate(result);
- let bytes = Bytes::copy_from_slice(&opus_buf);
- match input_sender.try_send(VoicePacketPayload::Opus(bytes, false)) {
- Ok(_) => {}
- Err(_e) => {
- //warn!("Error sending audio packet: {:?}", e);
- }
+ .map(|e| e * input_volume) {
+ if let Err(_e) = input_sender.try_send(sample) {
+ // warn!("Error sending audio: {}", e)
}
- *buf = tail;
}
}
}
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index db6d2ef..4d6f148 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -58,7 +58,7 @@ async fn main() {
let (response_sender, response_receiver) = mpsc::unbounded_channel();
let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel();
- let state = State::new(packet_sender, connection_info_sender);
+ let state = State::new(packet_sender, connection_info_sender).await;
let state = Arc::new(Mutex::new(state));
let (_, _, _, e, _) = join!(
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index b592a60..d412d55 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -3,7 +3,7 @@ use crate::state::{State, StatePhase};
use log::*;
use bytes::Bytes;
-use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt};
+use futures::{join, pin_mut, select, FutureExt, SinkExt, StreamExt, Stream};
use futures_util::stream::{SplitSink, SplitStream};
use mumble_protocol::crypt::ClientCryptState;
use mumble_protocol::ping::{PingPacket, PongPacket};
@@ -54,7 +54,7 @@ pub async fn handle(
Arc::clone(&sink),
connection_info.socket_addr,
phase_watcher,
- &mut receiver
+ &mut *receiver
),
new_crypt_state(&mut crypt_state_receiver, sink, source)
);
@@ -198,8 +198,9 @@ async fn send_voice(
sink: Arc<Mutex<UdpSender>>,
server_addr: SocketAddr,
mut phase_watcher: watch::Receiver<StatePhase>,
- receiver: &mut mpsc::Receiver<VoicePacketPayload>,
+ receiver: &mut (dyn Stream<Item = Vec<u8>> + Unpin),
) {
+ pin_mut!(receiver);
let (tx, rx) = oneshot::channel();
let phase_transition_block = async {
loop {
@@ -216,7 +217,7 @@ async fn send_voice(
pin_mut!(rx);
let mut count = 0;
loop {
- let packet_recv = receiver.recv().fuse();
+ let packet_recv = receiver.next().fuse();
pin_mut!(packet_recv);
let exitor = select! {
data = packet_recv => Some(data),
@@ -236,7 +237,7 @@ async fn send_voice(
target: 0, // normal speech
session_id: (), // unused for server-bound packets
seq_num: count,
- payload,
+ payload: VoicePacketPayload::Opus(payload.into(), false),
position_info: None,
};
count += 1;
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index 85e5449..1421691 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -64,7 +64,7 @@ pub struct State {
}
impl State {
- pub fn new(
+ pub async fn new(
packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
) -> Self {
@@ -72,7 +72,7 @@ impl State {
let audio = Audio::new(
config.audio.input_volume.unwrap_or(1.0),
config.audio.output_volume.unwrap_or(1.0),
- );
+ ).await;
let mut state = Self {
config,
server: None,