aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/audio/input.rs
diff options
context:
space:
mode:
authorEskil Q <eskilq@kth.se>2021-01-01 15:08:05 +0100
committerEskil Q <eskilq@kth.se>2021-01-01 15:08:05 +0100
commit65d7b5e907ffbb594319e13684f7f566c0ad2264 (patch)
tree54962689ecd24bdc38daa93db294d7f9ee7604eb /mumd/src/audio/input.rs
parentee13c36868f08203d548c3221300651c5108b8a9 (diff)
downloadmum-65d7b5e907ffbb594319e13684f7f566c0ad2264.tar.gz
add AudioStream struct
Diffstat (limited to 'mumd/src/audio/input.rs')
-rw-r--r--mumd/src/audio/input.rs39
1 files changed, 39 insertions, 0 deletions
diff --git a/mumd/src/audio/input.rs b/mumd/src/audio/input.rs
index 914891b..01fd1f3 100644
--- a/mumd/src/audio/input.rs
+++ b/mumd/src/audio/input.rs
@@ -5,6 +5,10 @@ use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, watch};
use log::*;
+use futures::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use futures_util::task::Waker;
pub fn callback<T: Sample>(
mut opus_encoder: opus::Encoder,
@@ -54,3 +58,38 @@ pub fn callback<T: Sample>(
}
}
}
+
+struct AudioStream<T> {
+ data: Arc<Mutex<(VecDeque<T>, Option<Waker>)>>,
+}
+
+impl<T> AudioStream<T> {
+ fn new() -> Self {
+ Self {
+ data: Arc::new(Mutex::new((VecDeque::new(), None)))
+ }
+ }
+
+ fn insert_sample(&self, sample: T) {
+ let mut data = self.data.lock().unwrap();
+ data.0.push_back(sample);
+ if let Some(waker) = data.1.take() {
+ waker.wake();
+ }
+ }
+}
+
+impl<T> Stream for AudioStream<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let s = self.get_mut();
+ let mut data = s.data.lock().unwrap();
+ if data.0.len() > 0 {
+ Poll::Ready(data.0.pop_front())
+ } else {
+ data.1 = Some(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+} \ No newline at end of file