aboutsummaryrefslogtreecommitdiffstats
path: root/mumd
diff options
context:
space:
mode:
Diffstat (limited to 'mumd')
-rw-r--r--mumd/Cargo.toml15
-rw-r--r--mumd/src/main.rs330
2 files changed, 343 insertions, 2 deletions
diff --git a/mumd/Cargo.toml b/mumd/Cargo.toml
index b89f817..bb68525 100644
--- a/mumd/Cargo.toml
+++ b/mumd/Cargo.toml
@@ -11,3 +11,18 @@ daemonize = "0.4"
ipc-channel = "0.14"
serde = { version = "1.0", features = ["derive"] }
clap = "2.33"
+mumble-protocol = "0.3"
+cpal = { git = "https://github.com/RustAudio/cpal" }
+opus = "0.2"
+
+bytes = "0.5"
+byteorder = "1"
+protobuf = "2"
+openssl = { version = "0.10", optional = true }
+
+argparse = "0.2"
+futures = "0.3"
+native-tls = "0.2"
+tokio = { version = "0.2", features = ["full"] }
+tokio-util = { version = "0.3", features = ["codec", "udp"] }
+tokio-tls = "0.3"
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index e7a11a9..01454c3 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -1,3 +1,329 @@
-fn main() {
- println!("Hello, world!");
+use argparse::ArgumentParser;
+use argparse::Store;
+use argparse::StoreTrue;
+use bytes::Bytes;
+
+use futures::channel::oneshot;
+use futures::join;
+use futures::StreamExt;
+use futures::SinkExt;
+
+use mumble_protocol::control::msgs;
+use mumble_protocol::control::ClientControlCodec;
+use mumble_protocol::control::ControlPacket;
+use mumble_protocol::crypt::ClientCryptState;
+use mumble_protocol::voice::VoicePacket;
+use mumble_protocol::voice::VoicePacketPayload;
+
+use std::convert::Into;
+use std::convert::TryInto;
+
+use std::net::Ipv6Addr;
+use std::net::SocketAddr;
+use std::net::ToSocketAddrs;
+
+use std::collections::VecDeque;
+
+use tokio::net::TcpStream;
+use tokio::net::UdpSocket;
+use tokio_tls::TlsConnector;
+use tokio_util::codec::Decoder;
+use tokio_util::udp::UdpFramed;
+
+use tokio::time::{self, Duration};
+
+use std::sync::Arc;
+use std::sync::Mutex;
+
+use cpal::{Sample, SampleFormat, OutputCallbackInfo, StreamConfig, SampleRate};
+use cpal::traits::DeviceTrait;
+use cpal::traits::HostTrait;
+use cpal::traits::StreamTrait;
+
+use opus::Channels;
+
+async fn connect(
+ server_addr: SocketAddr,
+ server_host: String,
+ user_name: String,
+ accept_invalid_cert: bool,
+ crypt_state_sender: oneshot::Sender<ClientCryptState>,
+) {
+ // Wrap crypt_state_sender in Option, so we can call it only once
+ let mut crypt_state_sender = Some(crypt_state_sender);
+
+ // Connect to server via TCP
+ let stream = TcpStream::connect(&server_addr).await.expect("Failed to connect to server:");
+ println!("TCP connected..");
+
+ // Wrap the connection in TLS
+ let mut builder = native_tls::TlsConnector::builder();
+ builder.danger_accept_invalid_certs(accept_invalid_cert);
+ let connector: TlsConnector = builder
+ .build()
+ .expect("Failed to create TLS connector")
+ .into();
+ let tls_stream = connector
+ .connect(&server_host, stream)
+ .await
+ .expect("Failed to connect TLS: {}");
+ println!("TLS connected..");
+
+ // Wrap the TLS stream with Mumble's client-side control-channel codec
+ let (sink, mut stream) = ClientControlCodec::new().framed(tls_stream).split();
+ let sink = Arc::new(Mutex::new(sink));
+
+ // Handshake (omitting `Version` message for brevity)
+ let mut msg = msgs::Authenticate::new();
+ msg.set_username(user_name);
+ msg.set_opus(true);
+ let mut lock = sink.lock().unwrap();
+ lock.send(msg.into()).await.unwrap();
+ drop(lock);
+
+ println!("Logging in..");
+ let mut crypt_state = None;
+
+ let ping_sink = Arc::clone(&sink);
+ let handle = async {
+ let mut interval = time::interval(Duration::from_secs(10));
+ let sink = ping_sink;
+
+ loop {
+ interval.tick().await;
+ let msg = msgs::Ping::new();
+ let mut lock = sink.lock().unwrap();
+ lock.send(msg.into()).await.unwrap();
+ }
+ };
+
+ // Handle incoming packets
+ let receive = async {
+ while let Some(packet) = stream.next().await {
+ match packet.unwrap() {
+ ControlPacket::TextMessage(mut msg) => {
+ println!(
+ "Got message from user with session ID {}: {}",
+ msg.get_actor(),
+ msg.get_message()
+ );
+ // Send reply back to server
+ let mut response = msgs::TextMessage::new();
+ response.mut_session().push(msg.get_actor());
+ response.set_message(msg.take_message());
+ let mut lock = sink.lock().unwrap();
+ lock.send(response.into()).await.unwrap();
+ }
+ ControlPacket::CryptSetup(msg) => {
+ println!("crypt setup");
+ // Wait until we're fully connected before initiating UDP voice
+ crypt_state = Some(ClientCryptState::new_from(
+ msg.get_key()
+ .try_into()
+ .expect("Server sent private key with incorrect size"),
+ msg.get_client_nonce()
+ .try_into()
+ .expect("Server sent client_nonce with incorrect size"),
+ msg.get_server_nonce()
+ .try_into()
+ .expect("Server sent server_nonce with incorrect size"),
+ ));
+ }
+ ControlPacket::ServerSync(_) => {
+ println!("Logged in!");
+ if let Some(sender) = crypt_state_sender.take() {
+ let _ = sender.send(
+ crypt_state
+ .take()
+ .expect("Server didn't send us any CryptSetup packet!"),
+ );
+ }
+ }
+ ControlPacket::Reject(msg) => {
+ println!("Login rejected: {:?}", msg);
+ }
+ _ => {},
+ }
+ }
+ };
+ join!(handle, receive);
+}
+
+async fn handle_udp(
+ server_addr: SocketAddr,
+ crypt_state: oneshot::Receiver<ClientCryptState>,
+) {
+ // Bind UDP socket
+ let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
+ .await
+ .expect("Failed to bind UDP socket");
+
+ // Wait for initial CryptState
+ let crypt_state = match crypt_state.await {
+ Ok(crypt_state) => crypt_state,
+ // disconnected before we received the CryptSetup packet, oh well
+ Err(_) => return,
+ };
+ println!("UDP ready!");
+
+ // initialize audio buffer
+ let audio_buf: VecDeque<f32> = VecDeque::new();
+ let audio_buf = Arc::new(Mutex::new(audio_buf));
+
+ let host = cpal::default_host();
+ let device = host.default_output_device().expect("default output device not found");
+ let mut supported_configs_range = device.supported_output_configs()
+ .expect("error querying configs");
+ let supported_config = supported_configs_range.next()
+ .expect("no supported config?!")
+ .with_sample_rate(SampleRate(48000));
+ let supported_sample_format = supported_config.sample_format();
+ let config: StreamConfig = supported_config.into();
+
+ let err_fn = |err| eprintln!("an error occurred on the output audio stream: {}", err);
+ // cpal callback:
+ // pop from audio buf
+ // if empty, fill rest with silence
+ fn curry_callback<T: Sample>(buf: Arc<Mutex<VecDeque<f32>>>)
+ -> impl FnMut(&mut [T],
+ &OutputCallbackInfo) + Send + 'static {
+ move |data: &mut [T], _info: &OutputCallbackInfo| {
+ let mut lock = buf.lock().unwrap();
+ for sample in data.iter_mut() {
+ *sample = Sample::from(&lock.pop_front().unwrap_or(0.0));
+ }
+ }
+ }
+ let stream_audio_buf = Arc::clone(&audio_buf);
+ let stream = match supported_sample_format {
+ SampleFormat::F32 => device.build_output_stream(&config, curry_callback::<f32>(stream_audio_buf), err_fn),
+ SampleFormat::I16 => device.build_output_stream(&config, curry_callback::<i16>(stream_audio_buf), err_fn),
+ SampleFormat::U16 => device.build_output_stream(&config, curry_callback::<u16>(stream_audio_buf), err_fn),
+ }.unwrap();
+ stream.play().unwrap();
+
+ println!("{} {}", config.sample_rate.0, config.channels);
+
+ // create opus decoder (might be expensive)
+ let mut opus_decoder = opus::Decoder::new(
+ config.sample_rate.0 as u32,
+ match config.channels {
+ 1 => Channels::Mono,
+ 2 => Channels::Stereo,
+ _ => unimplemented!("ljudnörd (got {} channels, need 1 or 2)", config.channels)
+ }
+ ).unwrap();
+
+ // Wrap the raw UDP packets in Mumble's crypto and voice codec (CryptState does both)
+ let (mut sink, mut source) = UdpFramed::new(udp_socket, crypt_state).split();
+
+ // Note: A normal application would also send periodic Ping packets, and its own audio
+ // via UDP. We instead trick the server into accepting us by sending it one
+ // dummy voice packet.
+ sink.send((
+ VoicePacket::Audio {
+ _dst: std::marker::PhantomData,
+ target: 0,
+ session_id: (),
+ seq_num: 0,
+ payload: VoicePacketPayload::Opus(Bytes::from([0u8; 128].as_ref()), true),
+ position_info: None,
+ },
+ server_addr,
+ )).await.unwrap();
+
+ // Handle incoming UDP packets
+ while let Some(packet) = source.next().await {
+ let (packet, _src_addr) = match packet {
+ Ok(packet) => packet,
+ Err(err) => {
+ eprintln!("Got an invalid UDP packet: {}", err);
+ // To be expected, considering this is the internet, just ignore it
+ continue
+ }
+ };
+ match packet {
+ VoicePacket::Ping { .. } => {
+ // Note: A normal application would handle these and only use UDP for voice
+ // once it has received one.
+ continue
+ }
+ VoicePacket::Audio {
+ // seq_num,
+ payload,
+ // position_info,
+ ..
+ } => {
+ match payload {
+ VoicePacketPayload::Opus(bytes, eot) => {
+ let mut out: Vec<f32> = vec![0.0; bytes.len() * config.channels as usize * 4];
+ opus_decoder.decode_float(&bytes[..], &mut out, false);
+ let mut lock = audio_buf.lock().unwrap();
+ lock.extend(out);
+ },
+ _ => { unimplemented!("något fint"); }
+ }
+
+ // decode paylout and put it in buffer
+
+ // Got audio, naively echo it back
+ //let reply = VoicePacket::Audio {
+ // _dst: std::marker::PhantomData,
+ // target: 0, // normal speech
+ // session_id: (), // unused for server-bound packets
+ // seq_num,
+ // payload,
+ // position_info,
+ //};
+ //sink.send((reply, src_addr)).await.unwrap();
+ }
+ }
+ }
+}
+
+#[tokio::main]
+async fn main() {
+ // Handle command line arguments
+ let mut server_host = "".to_string();
+ let mut server_port = 64738u16;
+ let mut user_name = "EchoBot".to_string();
+ let mut accept_invalid_cert = false;
+ {
+ let mut ap = ArgumentParser::new();
+ ap.set_description("Run the echo client example");
+ ap.refer(&mut server_host)
+ .add_option(&["--host"], Store, "Hostname of mumble server")
+ .required();
+ ap.refer(&mut server_port)
+ .add_option(&["--port"], Store, "Port of mumble server");
+ ap.refer(&mut user_name)
+ .add_option(&["--username"], Store, "User name used to connect");
+ ap.refer(&mut accept_invalid_cert).add_option(
+ &["--accept-invalid-cert"],
+ StoreTrue,
+ "Accept invalid TLS certificates",
+ );
+ ap.parse_args_or_exit();
+ }
+ let server_addr = (server_host.as_ref(), server_port)
+ .to_socket_addrs()
+ .expect("Failed to parse server address")
+ .next()
+ .expect("Failed to resolve server address");
+
+ // Oneshot channel for setting UDP CryptState from control task
+ // For simplicity we don't deal with re-syncing, real applications would have to.
+ let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::<ClientCryptState>();
+
+ // Run it
+ join!(
+ connect(
+ server_addr,
+ server_host,
+ user_name,
+ accept_invalid_cert,
+ crypt_state_sender,
+ ),
+ handle_udp(server_addr, crypt_state_receiver)
+ );
}