From 5cdac93a475b4402680ac8d274677f4ba29b1e25 Mon Sep 17 00:00:00 2001 From: Eskil Q Date: Sat, 2 Jan 2021 08:42:52 +0100 Subject: add OpusEncoder stream --- Cargo.lock | 1 + mumd/Cargo.toml | 1 + mumd/src/audio.rs | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index cdd53a7..25591ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1196,6 +1196,7 @@ name = "mumd" version = "0.3.0" dependencies = [ "argparse", + "async-stream", "bytes", "cpal", "dasp_frame", diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml index 101e614..d12cec2 100644 --- a/mumd/Cargo.toml +++ b/mumd/Cargo.toml @@ -19,6 +19,7 @@ notifications = ["libnotify"] mumlib = { version = "0.3", path = "../mumlib" } argparse = "0.2" +async-stream = "0.3.0" cpal = "0.13" bytes = "1.0" dasp_interpolate = { version = "0.11", features = ["linear"] } diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index dc0b77d..324b615 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -15,7 +15,7 @@ use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, watch}; use dasp_frame::Frame; -use dasp_sample::{Sample, SignedSample}; +use dasp_sample::{Sample, SignedSample, ToSample}; use dasp_ring_buffer::Fixed; use futures::Stream; use futures::task::{Context, Poll}; @@ -545,4 +545,74 @@ impl StreamingSignal for FromInterleavedSamplesStream Poll::Ready(F::EQUILIBRIUM) } } +} + +struct OpusEncoder { + encoder: opus::Encoder, + frame_size: u32, + sample_rate: u32, + channels: usize, + stream: S, + input_buffer: Vec, + exhausted: bool, +} + +impl OpusEncoder + where + S: Stream, + I: ToSample { + fn new(frame_size: u32, sample_rate: u32, channels: usize, stream: S) -> Self { + let encoder = opus::Encoder::new( + sample_rate, + match channels { + 1 => Channels::Mono, + 2 => Channels::Stereo, + _ => unimplemented!( + "Only 1 or 2 channels supported, got {})", + channels + ), + }, + opus::Application::Voip, + ).unwrap(); + Self { + encoder, + frame_size, + sample_rate, + channels, + stream, + input_buffer: Vec::new(), + exhausted: false, + } + } +} + +impl Stream for OpusEncoder + where + S: Stream + Unpin, + I: Sample + ToSample { + type Item = Vec; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let s = self.get_mut(); + if s.exhausted { + return Poll::Ready(None); + } + let opus_frame_size = (s.frame_size * s.sample_rate / 400) as usize; + while s.input_buffer.len() < opus_frame_size { + match S::poll_next(Pin::new(&mut s.stream), cx) { + Poll::Ready(Some(v)) => { + s.input_buffer.push(v.to_sample::()); + } + Poll::Ready(None) => { + s.exhausted = true; + return Poll::Ready(None); + } + Poll::Pending => return Poll::Pending, + } + } + + let encoded = s.encoder.encode_vec_float(&s.input_buffer, opus_frame_size).unwrap(); + s.input_buffer.clear(); + Poll::Ready(Some(encoded)) + } } \ No newline at end of file -- cgit v1.2.1