diff options
| author | Gustav Sörnäs <gustav@sornas.net> | 2020-10-10 20:33:55 +0200 |
|---|---|---|
| committer | Gustav Sörnäs <gustav@sornas.net> | 2020-10-10 20:33:55 +0200 |
| commit | ab8116223328412484d1e76a9ff7b2055f05abf5 (patch) | |
| tree | a38ddfa10111aa6f7b5015015a1b33fae2fdb42c /mumd/src | |
| parent | 2aee60f7bbc6186cf6ca63aef182e1fe52fa03ad (diff) | |
| download | mum-ab8116223328412484d1e76a9ff7b2055f05abf5.tar.gz | |
big ol refactor
Co-authored-by: Eskil Queseth <eskilq@kth.se>
Diffstat (limited to 'mumd/src')
| -rw-r--r-- | mumd/src/audio.rs | 41 | ||||
| -rw-r--r-- | mumd/src/main.rs | 209 | ||||
| -rw-r--r-- | mumd/src/network.rs | 241 |
3 files changed, 263 insertions, 228 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs index d1a309c..c6f30fb 100644 --- a/mumd/src/audio.rs +++ b/mumd/src/audio.rs @@ -1,6 +1,6 @@ -use cpal::{Sample, SampleFormat, OutputCallbackInfo, Stream, StreamConfig, SampleRate}; use cpal::traits::DeviceTrait; use cpal::traits::HostTrait; +use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig}; use std::collections::VecDeque; use std::sync::Arc; use std::sync::Mutex; @@ -11,17 +11,21 @@ pub struct Audio { pub output_stream: Stream, } -impl Audio { +impl Audio { pub fn new() -> Self { let output_buffer = Arc::new(Mutex::new(VecDeque::new())); let host = cpal::default_host(); - let device = host.default_output_device().expect("default output device not found"); - let mut supported_configs_range = device.supported_output_configs() - .expect("error querying configs"); - let supported_config = supported_configs_range.next() - .expect("no supported config??") - .with_sample_rate(SampleRate(48000)); + let device = host + .default_output_device() + .expect("default output device not found"); + let mut supported_configs_range = device + .supported_output_configs() + .expect("error querying configs"); + let supported_config = supported_configs_range + .next() + .expect("no supported config??") + .with_sample_rate(SampleRate(48000)); let supported_sample_format = supported_config.sample_format(); let config: StreamConfig = supported_config.into(); @@ -29,10 +33,17 @@ impl Audio { let stream_audio_buf = Arc::clone(&output_buffer); let stream = match supported_sample_format { - SampleFormat::F32 => device.build_output_stream(&config, curry_callback::<f32>(stream_audio_buf), err_fn), - SampleFormat::I16 => device.build_output_stream(&config, curry_callback::<i16>(stream_audio_buf), err_fn), - SampleFormat::U16 => device.build_output_stream(&config, curry_callback::<u16>(stream_audio_buf), err_fn), - }.unwrap(); + SampleFormat::F32 => { + device.build_output_stream(&config, curry_callback::<f32>(stream_audio_buf), err_fn) + } + SampleFormat::I16 => { + device.build_output_stream(&config, curry_callback::<i16>(stream_audio_buf), err_fn) + } + SampleFormat::U16 => { + device.build_output_stream(&config, curry_callback::<u16>(stream_audio_buf), err_fn) + } + } + .unwrap(); Self { output_buffer, @@ -42,9 +53,9 @@ impl Audio { } } -fn curry_callback<T: Sample>(buf: Arc<Mutex<VecDeque<f32>>>) - -> impl FnMut(&mut [T], - &OutputCallbackInfo) + Send + 'static { +fn curry_callback<T: Sample>( + buf: Arc<Mutex<VecDeque<f32>>>, +) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static { move |data: &mut [T], _info: &OutputCallbackInfo| { let mut lock = buf.lock().unwrap(); for sample in data.iter_mut() { diff --git a/mumd/src/main.rs b/mumd/src/main.rs index ffd5547..3960b48 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -5,212 +5,18 @@ use crate::audio::Audio; use argparse::ArgumentParser; use argparse::Store; use argparse::StoreTrue; -use bytes::Bytes; - +use cpal::traits::StreamTrait; use futures::channel::oneshot; use futures::join; -use futures::StreamExt; -use futures::SinkExt; - -use mumble_protocol::control::msgs; -use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; -use mumble_protocol::voice::VoicePacket; -use mumble_protocol::voice::VoicePacketPayload; - -use std::convert::Into; -use std::convert::TryInto; - -use std::net::SocketAddr; use std::net::ToSocketAddrs; -use tokio::time::{self, Duration}; - -use std::sync::Arc; -use std::sync::Mutex; - -use cpal::traits::StreamTrait; - -use opus::Channels; - -async fn connect( - server_addr: SocketAddr, - server_host: String, - user_name: String, - accept_invalid_cert: bool, - crypt_state_sender: oneshot::Sender<ClientCryptState>, -) { - // Wrap crypt_state_sender in Option, so we can call it only once - let mut crypt_state_sender = Some(crypt_state_sender); - - let (sink, mut stream) = network::connect_tcp(server_addr, server_host, accept_invalid_cert).await; - let sink = Arc::new(Mutex::new(sink)); - - // Handshake (omitting `Version` message for brevity) - let mut msg = msgs::Authenticate::new(); - msg.set_username(user_name); - msg.set_opus(true); - let mut lock = sink.lock().unwrap(); - lock.send(msg.into()).await.unwrap(); - drop(lock); - - println!("Logging in.."); - let mut crypt_state = None; - - let ping_sink = Arc::clone(&sink); - let handle = async { - let mut interval = time::interval(Duration::from_secs(10)); - let sink = ping_sink; - - loop { - interval.tick().await; - let msg = msgs::Ping::new(); - let mut lock = sink.lock().unwrap(); - lock.send(msg.into()).await.unwrap(); - } - }; - - // Handle incoming packets - let receive = async { - while let Some(packet) = stream.next().await { - match packet.unwrap() { - ControlPacket::TextMessage(mut msg) => { - println!( - "Got message from user with session ID {}: {}", - msg.get_actor(), - msg.get_message() - ); - // Send reply back to server - let mut response = msgs::TextMessage::new(); - response.mut_session().push(msg.get_actor()); - response.set_message(msg.take_message()); - let mut lock = sink.lock().unwrap(); - lock.send(response.into()).await.unwrap(); - } - ControlPacket::CryptSetup(msg) => { - println!("crypt setup"); - // Wait until we're fully connected before initiating UDP voice - crypt_state = Some(ClientCryptState::new_from( - msg.get_key() - .try_into() - .expect("Server sent private key with incorrect size"), - msg.get_client_nonce() - .try_into() - .expect("Server sent client_nonce with incorrect size"), - msg.get_server_nonce() - .try_into() - .expect("Server sent server_nonce with incorrect size"), - )); - } - ControlPacket::ServerSync(_) => { - println!("Logged in!"); - if let Some(sender) = crypt_state_sender.take() { - let _ = sender.send( - crypt_state - .take() - .expect("Server didn't send us any CryptSetup packet!"), - ); - } - } - ControlPacket::Reject(msg) => { - println!("Login rejected: {:?}", msg); - } - _ => {}, - } - } - }; - join!(handle, receive); -} - -async fn handle_udp( - server_addr: SocketAddr, - crypt_state: oneshot::Receiver<ClientCryptState>, -) { - let audio = Audio::new(); - audio.output_stream.play().unwrap(); - - // create opus decoder (might be expensive) - let mut opus_decoder = opus::Decoder::new( - audio.output_config.sample_rate.0 as u32, - match audio.output_config.channels { - 1 => Channels::Mono, - 2 => Channels::Stereo, - _ => unimplemented!("ljudnörd (got {} channels, need 1 or 2)", audio.output_config.channels) - } - ).unwrap(); - - let (mut sink, mut source) = network::connect_udp(server_addr, crypt_state).await; - - // Note: A normal application would also send periodic Ping packets, and its own audio - // via UDP. We instead trick the server into accepting us by sending it one - // dummy voice packet. - sink.send(( - VoicePacket::Audio { - _dst: std::marker::PhantomData, - target: 0, - session_id: (), - seq_num: 0, - payload: VoicePacketPayload::Opus(Bytes::from([0u8; 128].as_ref()), true), - position_info: None, - }, - server_addr, - )).await.unwrap(); - - // Handle incoming UDP packets - while let Some(packet) = source.next().await { - let (packet, _src_addr) = match packet { - Ok(packet) => packet, - Err(err) => { - eprintln!("Got an invalid UDP packet: {}", err); - // To be expected, considering this is the internet, just ignore it - continue - } - }; - match packet { - VoicePacket::Ping { .. } => { - // Note: A normal application would handle these and only use UDP for voice - // once it has received one. - continue - } - VoicePacket::Audio { - // seq_num, - payload, - // position_info, - .. - } => { - match payload { - VoicePacketPayload::Opus(bytes, _eot) => { - let mut out: Vec<f32> = vec![0.0; bytes.len() * audio.output_config.channels as usize * 4]; - opus_decoder.decode_float(&bytes[..], &mut out, false).expect("error decoding"); - let mut lock = audio.output_buffer.lock().unwrap(); - lock.extend(out); - }, - _ => { unimplemented!("något fint"); } - } - - // decode paylout and put it in buffer - - // Got audio, naively echo it back - //let reply = VoicePacket::Audio { - // _dst: std::marker::PhantomData, - // target: 0, // normal speech - // session_id: (), // unused for server-bound packets - // seq_num, - // payload, - // position_info, - //}; - //sink.send((reply, src_addr)).await.unwrap(); - } - } - } -} - #[tokio::main] async fn main() { // Handle command line arguments let mut server_host = "".to_string(); let mut server_port = 64738u16; - let mut user_name = "EchoBot".to_string(); + let mut username = "EchoBot".to_string(); let mut accept_invalid_cert = false; { let mut ap = ArgumentParser::new(); @@ -220,7 +26,7 @@ async fn main() { .required(); ap.refer(&mut server_port) .add_option(&["--port"], Store, "Port of mumble server"); - ap.refer(&mut user_name) + ap.refer(&mut username) .add_option(&["--username"], Store, "User name used to connect"); ap.refer(&mut accept_invalid_cert).add_option( &["--accept-invalid-cert"], @@ -239,15 +45,18 @@ async fn main() { // For simplicity we don't deal with re-syncing, real applications would have to. let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::<ClientCryptState>(); + let audio = Audio::new(); + audio.output_stream.play().unwrap(); + // Run it join!( - connect( + network::handle_tcp( server_addr, server_host, - user_name, + username, accept_invalid_cert, crypt_state_sender, ), - handle_udp(server_addr, crypt_state_receiver) + network::handle_udp(server_addr, crypt_state_receiver, audio,) ); } diff --git a/mumd/src/network.rs b/mumd/src/network.rs index 34b5d01..82b45da 100644 --- a/mumd/src/network.rs +++ b/mumd/src/network.rs @@ -1,32 +1,53 @@ +use crate::audio::Audio; + +use bytes::Bytes; +use mumble_protocol::voice::VoicePacketPayload; +use opus::Channels; + use futures::channel::oneshot; +use futures::join; +use futures::SinkExt; use futures::StreamExt; use futures_util::stream::{SplitSink, SplitStream}; -use mumble_protocol::{Serverbound, Clientbound}; +use mumble_protocol::control::msgs; use mumble_protocol::control::ClientControlCodec; use mumble_protocol::control::ControlCodec; use mumble_protocol::control::ControlPacket; use mumble_protocol::crypt::ClientCryptState; use mumble_protocol::voice::VoicePacket; +use mumble_protocol::{Clientbound, Serverbound}; +use std::convert::Into; +use std::convert::TryInto; use std::net::Ipv6Addr; use std::net::SocketAddr; +use std::sync::Arc; +use std::sync::Mutex; use tokio::net::TcpStream; use tokio::net::UdpSocket; +use tokio::time::{self, Duration}; use tokio_tls::TlsConnector; use tokio_tls::TlsStream; use tokio_util::codec::Decoder; use tokio_util::codec::Framed; use tokio_util::udp::UdpFramed; -pub async fn connect_tcp( +type TcpSender = SplitSink< + Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>, + ControlPacket<Serverbound>, +>; +type TcpReceiver = + SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>; +type UdpSender = SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>; +type UdpReceiver = SplitStream<UdpFramed<ClientCryptState>>; + +async fn connect_tcp( server_addr: SocketAddr, server_host: String, accept_invalid_cert: bool, -) -> ( - SplitSink<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>, ControlPacket<Serverbound>>, - SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>> -) { - - let stream = TcpStream::connect(&server_addr).await.expect("failed to connect to server:"); +) -> (TcpSender, TcpReceiver) { + let stream = TcpStream::connect(&server_addr) + .await + .expect("failed to connect to server:"); println!("TCP connected"); let mut builder = native_tls::TlsConnector::builder(); @@ -46,12 +67,8 @@ pub async fn connect_tcp( } pub async fn connect_udp( - server_addr: SocketAddr, crypt_state: oneshot::Receiver<ClientCryptState>, -) -> ( - SplitSink<UdpFramed<ClientCryptState>, (VoicePacket<Serverbound>, SocketAddr)>, - SplitStream<UdpFramed<ClientCryptState>> -) { +) -> (UdpSender, UdpReceiver) { // Bind UDP socket let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16)) .await @@ -68,3 +85,201 @@ pub async fn connect_udp( // Wrap the raw UDP packets in Mumble's crypto and voice codec (CryptState does both) UdpFramed::new(udp_socket, crypt_state).split() } + +async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) { + let mut interval = time::interval(Duration::from_secs(delay_seconds)); + loop { + interval.tick().await; + println!("Sending ping"); + let msg = msgs::Ping::new(); + sink.lock().unwrap().send(msg.into()).await.unwrap(); + } +} + +async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) { + let mut msg = msgs::Authenticate::new(); + msg.set_username(username); + msg.set_opus(true); + sink.lock().unwrap().send(msg.into()).await.unwrap(); +} + +async fn listen_tcp( + sink: Arc<Mutex<TcpSender>>, + mut stream: TcpReceiver, + crypt_state_sender: oneshot::Sender<ClientCryptState>, +) { + let mut crypt_state = None; + let mut crypt_state_sender = Some(crypt_state_sender); + + while let Some(packet) = stream.next().await { + //TODO handle types separately + match packet.unwrap() { + ControlPacket::TextMessage(mut msg) => { + println!( + "Got message from user with session ID {}: {}", + msg.get_actor(), + msg.get_message() + ); + // Send reply back to server + let mut response = msgs::TextMessage::new(); + response.mut_session().push(msg.get_actor()); + response.set_message(msg.take_message()); + let mut lock = sink.lock().unwrap(); + lock.send(response.into()).await.unwrap(); + } + ControlPacket::CryptSetup(msg) => { + println!("crypt setup"); + // Wait until we're fully connected before initiating UDP voice + crypt_state = Some(ClientCryptState::new_from( + msg.get_key() + .try_into() + .expect("Server sent private key with incorrect size"), + msg.get_client_nonce() + .try_into() + .expect("Server sent client_nonce with incorrect size"), + msg.get_server_nonce() + .try_into() + .expect("Server sent server_nonce with incorrect size"), + )); + } + ControlPacket::ServerSync(_) => { + println!("Logged in!"); + if let Some(sender) = crypt_state_sender.take() { + let _ = sender.send( + crypt_state + .take() + .expect("Server didn't send us any CryptSetup packet!"), + ); + } + } + ControlPacket::Reject(msg) => { + println!("Login rejected: {:?}", msg); + } + _ => {} + } + } +} + +pub async fn handle_tcp( + server_addr: SocketAddr, + server_host: String, + username: String, + accept_invalid_cert: bool, + crypt_state_sender: oneshot::Sender<ClientCryptState>, +) { + let (sink, stream) = connect_tcp(server_addr, server_host, accept_invalid_cert).await; + let sink = Arc::new(Mutex::new(sink)); + + // Handshake (omitting `Version` message for brevity) + authenticate(Arc::clone(&sink), username).await; + + println!("Logging in.."); + + join!( + send_pings(Arc::clone(&sink), 10), + listen_tcp(sink, stream, crypt_state_sender), + ); +} + +async fn listen_udp( + _sink: Arc<Mutex<UdpSender>>, + mut source: UdpReceiver, + mut opus_decoder: opus::Decoder, + audio: Audio, +) { + while let Some(packet) = source.next().await { + let (packet, _src_addr) = match packet { + Ok(packet) => packet, + Err(err) => { + eprintln!("Got an invalid UDP packet: {}", err); + // To be expected, considering this is the internet, just ignore it + continue; + } + }; + match packet { + VoicePacket::Ping { .. } => { + // Note: A normal application would handle these and only use UDP for voice + // once it has received one. + continue; + } + VoicePacket::Audio { + // seq_num, + payload, + // position_info, + .. + } => { + match payload { + VoicePacketPayload::Opus(bytes, _eot) => { + let mut out: Vec<f32> = + vec![0.0; bytes.len() * audio.output_config.channels as usize * 4]; + opus_decoder + .decode_float(&bytes[..], &mut out, false) + .expect("error decoding"); + let mut lock = audio.output_buffer.lock().unwrap(); + lock.extend(out); + } + _ => { + unimplemented!("något fint"); + } + } + + // decode paylout and put it in buffer + + // Got audio, naively echo it back + //let reply = VoicePacket::Audio { + // _dst: std::marker::PhantomData, + // target: 0, // normal speech + // session_id: (), // unused for server-bound packets + // seq_num, + // payload, + // position_info, + //}; + //sink.send((reply, src_addr)).await.unwrap(); + } + } + } +} + +async fn send_ping_udp(sink: &mut UdpSender, server_addr: SocketAddr) { + sink.send(( + VoicePacket::Audio { + _dst: std::marker::PhantomData, + target: 0, + session_id: (), + seq_num: 0, + payload: VoicePacketPayload::Opus(Bytes::from([0u8; 128].as_ref()), true), + position_info: None, + }, + server_addr, + )) + .await + .unwrap(); +} + +pub async fn handle_udp( + server_addr: SocketAddr, + crypt_state: oneshot::Receiver<ClientCryptState>, + audio: Audio, +) { + let opus_decoder = opus::Decoder::new( + audio.output_config.sample_rate.0 as u32, + match audio.output_config.channels { + 1 => Channels::Mono, + 2 => Channels::Stereo, + _ => unimplemented!( + "ljudnörd (got {} channels, need 1 or 2)", + audio.output_config.channels + ), + }, + ) + .unwrap(); + + let (mut sink, source) = connect_udp(crypt_state).await; + + // Note: A normal application would also send periodic Ping packets, and its own audio + // via UDP. We instead trick the server into accepting us by sending it one + // dummy voice packet. + send_ping_udp(&mut sink, server_addr).await; + + listen_udp(Arc::new(Mutex::new(sink)), source, opus_decoder, audio).await; +} |
