aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2020-10-13 02:31:09 +0200
committerGustav Sörnäs <gustav@sornas.net>2020-10-13 02:31:35 +0200
commitb583f6dbe521e01e879e16605026997dfa10c3d3 (patch)
treef318fbf00a47a30787b891094a522f313fd1d00b
parent39c1ff5be55ade710bbe0fbe4701a070dadbb8e7 (diff)
downloadmum-b583f6dbe521e01e879e16605026997dfa10c3d3.tar.gz
join different channels
Co-authored-by: Eskil Queseth <eskilq@kth.se>
-rw-r--r--mumd/src/audio.rs7
-rw-r--r--mumd/src/command.rs2
-rw-r--r--mumd/src/main.rs21
-rw-r--r--mumd/src/network/tcp.rs57
-rw-r--r--mumd/src/network/udp.rs48
-rw-r--r--mumd/src/state.rs81
6 files changed, 151 insertions, 65 deletions
diff --git a/mumd/src/audio.rs b/mumd/src/audio.rs
index 9b794a6..3c24f1c 100644
--- a/mumd/src/audio.rs
+++ b/mumd/src/audio.rs
@@ -1,6 +1,5 @@
use bytes::Bytes;
-use cpal::traits::DeviceTrait;
-use cpal::traits::HostTrait;
+use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{
InputCallbackInfo, OutputCallbackInfo, Sample, SampleFormat, SampleRate, Stream, StreamConfig,
};
@@ -30,7 +29,7 @@ pub struct Audio {
pub input_buffer: Arc<Mutex<VecDeque<f32>>>,
input_channel_receiver: Option<Receiver<VoicePacketPayload>>,
- client_streams: Arc<Mutex<HashMap<u32, ClientStream>>>,
+ client_streams: Arc<Mutex<HashMap<u32, ClientStream>>>, //TODO move to user state
}
//TODO split into input/output
@@ -129,6 +128,8 @@ impl Audio {
}
.unwrap();
+ output_stream.play().unwrap();
+
Self {
output_config,
output_stream,
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index 5d6cca4..322bde8 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -1,4 +1,4 @@
-enum Command {
+pub enum Command {
ChannelJoin {
channel_id: u32,
},
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index 2a0fcbd..a08db44 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -2,21 +2,22 @@ mod audio;
mod network;
mod command;
mod state;
-use crate::audio::Audio;
-use crate::state::Server;
+use crate::state::State;
use argparse::ArgumentParser;
use argparse::Store;
use argparse::StoreTrue;
use colored::*;
-use cpal::traits::StreamTrait;
use futures::channel::oneshot;
use futures::join;
use log::*;
+use mumble_protocol::control::ControlPacket;
use mumble_protocol::crypt::ClientCryptState;
+use mumble_protocol::voice::Serverbound;
use std::net::ToSocketAddrs;
use std::sync::Arc;
use std::sync::Mutex;
+use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
@@ -73,28 +74,24 @@ async fn main() {
// Oneshot channel for setting UDP CryptState from control task
// For simplicity we don't deal with re-syncing, real applications would have to.
let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::<ClientCryptState>();
+ let (packet_sender, packet_receiver) = mpsc::channel::<ControlPacket<Serverbound>>(10);
- let audio = Audio::new();
- audio.output_stream.play().unwrap();
- let audio = Arc::new(Mutex::new(audio));
-
- let server_state = Arc::new(Mutex::new(Server::new()));
+ let state = Arc::new(Mutex::new(State::new(packet_sender, username)));
// Run it
join!(
network::tcp::handle(
- server_state,
+ Arc::clone(&state),
server_addr,
server_host,
- username,
accept_invalid_cert,
crypt_state_sender,
- Arc::clone(&audio),
+ packet_receiver,
),
network::udp::handle(
+ state,
server_addr,
crypt_state_receiver,
- audio,
),
);
}
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index dde98aa..fa4c4b6 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -1,5 +1,5 @@
-use crate::audio::Audio;
-use crate::state::Server;
+use crate::state::State;
+use crate::command::Command;
use log::*;
use futures::channel::oneshot;
@@ -12,6 +12,7 @@ use std::convert::{Into, TryInto};
use std::net::{SocketAddr};
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
+use tokio::sync::mpsc;
use tokio::time::{self, Duration};
use tokio_tls::{TlsConnector, TlsStream};
use tokio_util::codec::{Decoder, Framed};
@@ -24,25 +25,26 @@ type TcpReceiver =
SplitStream<Framed<TlsStream<TcpStream>, ControlCodec<Serverbound, Clientbound>>>;
pub async fn handle(
- server: Arc<Mutex<Server>>,
+ state: Arc<Mutex<State>>,
server_addr: SocketAddr,
server_host: String,
- username: String,
accept_invalid_cert: bool,
crypt_state_sender: oneshot::Sender<ClientCryptState>,
- audio: Arc<Mutex<Audio>>,
+ packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>,
) {
let (sink, stream) = connect(server_addr, server_host, accept_invalid_cert).await;
let sink = Arc::new(Mutex::new(sink));
+
// Handshake (omitting `Version` message for brevity)
- authenticate(Arc::clone(&sink), username).await;
+ authenticate(Arc::clone(&sink), state.lock().unwrap().username().to_string()).await;
info!("Logging in...");
join!(
send_pings(Arc::clone(&sink), 10),
- listen(server, sink, stream, crypt_state_sender, audio),
+ listen(state, stream, crypt_state_sender),
+ send_packets(sink, packet_receiver),
);
}
@@ -72,6 +74,7 @@ async fn connect(
ClientControlCodec::new().framed(tls_stream).split()
}
+//TODO &mut sink?
async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) {
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
@@ -79,6 +82,7 @@ async fn authenticate(sink: Arc<Mutex<TcpSender>>, username: String) {
sink.lock().unwrap().send(msg.into()).await.unwrap();
}
+//TODO move somewhere else (main?) and send through packet_sender
async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) {
let mut interval = time::interval(Duration::from_secs(delay_seconds));
loop {
@@ -89,12 +93,18 @@ async fn send_pings(sink: Arc<Mutex<TcpSender>>, delay_seconds: u64) {
}
}
+async fn send_packets(sink: Arc<Mutex<TcpSender>>,
+ mut packet_receiver: mpsc::Receiver<ControlPacket<Serverbound>>) {
+
+ while let Some(packet) = packet_receiver.recv().await {
+ sink.lock().unwrap().send(packet).await.unwrap();
+ }
+}
+
async fn listen(
- server: Arc<Mutex<Server>>,
- sink: Arc<Mutex<TcpSender>>,
+ state: Arc<Mutex<State>>,
mut stream: TcpReceiver,
crypt_state_sender: oneshot::Sender<ClientCryptState>,
- audio: Arc<Mutex<Audio>>,
) {
let mut crypt_state = None;
let mut crypt_state_sender = Some(crypt_state_sender);
@@ -102,18 +112,12 @@ async fn listen(
while let Some(packet) = stream.next().await {
//TODO handle types separately
match packet.unwrap() {
- ControlPacket::TextMessage(mut msg) => {
+ ControlPacket::TextMessage(msg) => {
info!(
"Got message from user with session ID {}: {}",
msg.get_actor(),
msg.get_message()
);
- // Send reply back to server
- let mut response = msgs::TextMessage::new();
- response.mut_session().push(msg.get_actor());
- response.set_message(msg.take_message());
- let mut lock = sink.lock().unwrap();
- lock.send(response.into()).await.unwrap();
}
ControlPacket::CryptSetup(msg) => {
debug!("Crypt setup");
@@ -139,7 +143,8 @@ async fn listen(
.expect("Server didn't send us any CryptSetup packet!"),
);
}
- let mut server = server.lock().unwrap();
+ let mut state = state.lock().unwrap();
+ let server = state.server_mut();
server.parse_server_sync(msg);
match &server.welcome_text {
Some(s) => info!("Welcome: {}", s),
@@ -148,16 +153,18 @@ async fn listen(
for (_, channel) in server.channels() {
info!("Found channel {}", channel.name());
}
- sink.lock().unwrap().send(msgs::UserList::new().into()).await.unwrap();
+ //TODO start listening for packets to send here
+ state.handle_command(Command::ChannelJoin{channel_id: 1}).await;
}
ControlPacket::Reject(msg) => {
warn!("Login rejected: {:?}", msg);
}
ControlPacket::UserState(msg) => {
- audio.lock().unwrap().add_client(msg.get_session());
- let mut server = server.lock().unwrap();
+ let mut state = state.lock().unwrap();
let session = msg.get_session();
- server.parse_user_state(msg);
+ state.audio_mut().add_client(msg.get_session()); //TODO
+ state.parse_initial_user_state(msg); //TODO only if actually initiating state
+ let server = state.server_mut();
let user = server.users().get(&session).unwrap();
info!("User {} connected to {}",
user.name(),
@@ -165,14 +172,14 @@ async fn listen(
}
ControlPacket::UserRemove(msg) => {
info!("User {} left", msg.get_session());
- audio.lock().unwrap().remove_client(msg.get_session());
+ state.lock().unwrap().audio_mut().remove_client(msg.get_session());
}
ControlPacket::ChannelState(msg) => {
debug!("Channel state received");
- server.lock().unwrap().parse_channel_state(msg);
+ state.lock().unwrap().server_mut().parse_channel_state(msg);
}
ControlPacket::ChannelRemove(msg) => {
- server.lock().unwrap().parse_channel_remove(msg);
+ state.lock().unwrap().server_mut().parse_channel_remove(msg);
}
_ => {}
}
diff --git a/mumd/src/network/udp.rs b/mumd/src/network/udp.rs
index 39f16b6..5f76501 100644
--- a/mumd/src/network/udp.rs
+++ b/mumd/src/network/udp.rs
@@ -1,4 +1,4 @@
-use crate::audio::Audio;
+use crate::state::State;
use log::*;
use bytes::Bytes;
@@ -36,10 +36,28 @@ pub async fn connect(
UdpFramed::new(udp_socket, crypt_state).split()
}
+pub async fn handle(
+ state: Arc<Mutex<State>>,
+ server_addr: SocketAddr,
+ crypt_state: oneshot::Receiver<ClientCryptState>,
+) {
+ let (mut sink, source) = connect(crypt_state).await;
+
+ // Note: A normal application would also send periodic Ping packets, and its own audio
+ // via UDP. We instead trick the server into accepting us by sending it one
+ // dummy voice packet.
+ send_ping(&mut sink, server_addr).await;
+
+ let sink = Arc::new(Mutex::new(sink));
+ join!(
+ listen(Arc::clone(&state), source),
+ send_voice(state, sink, server_addr)
+ );
+}
+
async fn listen(
- _sink: Arc<Mutex<UdpSender>>,
+ state: Arc<Mutex<State>>,
mut source: UdpReceiver,
- audio: Arc<Mutex<Audio>>,
) {
while let Some(packet) = source.next().await {
let (packet, _src_addr) = match packet {
@@ -63,7 +81,7 @@ async fn listen(
// position_info,
..
} => {
- audio.lock().unwrap().decode_packet(session_id, payload);
+ state.lock().unwrap().audio().decode_packet(session_id, payload);
}
}
}
@@ -86,11 +104,11 @@ async fn send_ping(sink: &mut UdpSender, server_addr: SocketAddr) {
}
async fn send_voice(
+ state: Arc<Mutex<State>>,
sink: Arc<Mutex<UdpSender>>,
server_addr: SocketAddr,
- audio: Arc<Mutex<Audio>>,
) {
- let mut receiver = audio.lock().unwrap().take_receiver().unwrap();
+ let mut receiver = state.lock().unwrap().audio_mut().take_receiver().unwrap();
let mut count = 0;
while let Some(payload) = receiver.recv().await {
@@ -111,21 +129,3 @@ async fn send_voice(
}
}
-pub async fn handle(
- server_addr: SocketAddr,
- crypt_state: oneshot::Receiver<ClientCryptState>,
- audio: Arc<Mutex<Audio>>,
-) {
- let (mut sink, source) = connect(crypt_state).await;
-
- // Note: A normal application would also send periodic Ping packets, and its own audio
- // via UDP. We instead trick the server into accepting us by sending it one
- // dummy voice packet.
- send_ping(&mut sink, server_addr).await;
-
- let sink = Arc::new(Mutex::new(sink));
- join!(
- listen(Arc::clone(&sink), source, Arc::clone(&audio)),
- send_voice(sink, server_addr, audio)
- );
-}
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index 1ef8467..566adaf 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -1,7 +1,88 @@
use log::*;
+use crate::audio::Audio;
+use crate::command::Command;
use mumble_protocol::control::msgs;
+use mumble_protocol::control::ControlPacket;
+use mumble_protocol::voice::Serverbound;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
+use tokio::sync::mpsc;
+
+pub struct State {
+ server: Server,
+ audio: Audio,
+
+ packet_sender: mpsc::Sender<ControlPacket<Serverbound>>,
+
+ username: String,
+ session_id: Option<u32>, //FIXME set
+}
+
+impl State {
+ pub fn new(packet_sender: mpsc::Sender<ControlPacket<Serverbound>>,
+ username: String) -> Self {
+ Self {
+ server: Server::new(),
+ audio: Audio::new(),
+ packet_sender,
+ username,
+ session_id: None,
+ }
+ }
+
+ //TODO result
+ pub async fn handle_command(&mut self, command: Command) {
+ match command {
+ Command::ChannelJoin{channel_id} => {
+ if self.session_id.is_none() {
+ warn!("Tried to join channel but we don't have a session id");
+ return;
+ }
+ let mut msg = msgs::UserState::new();
+ msg.set_session(self.session_id.unwrap());
+ msg.set_channel_id(channel_id);
+ self.packet_sender.send(msg.into()).await.unwrap();
+ }
+ _ => {}
+ }
+ }
+
+ pub fn parse_initial_user_state(&mut self, msg: Box<msgs::UserState>) {
+ if !msg.has_session() {
+ warn!("Can't parse user state without session");
+ return;
+ }
+ if !msg.has_name() {
+ warn!("Missing name in initial user state");
+ } else {
+ if msg.get_name() == self.username {
+ match self.session_id {
+ None => {
+ debug!("Found our session id: {}", msg.get_session());
+ self.session_id = Some(msg.get_session());
+ }
+ Some(session) => {
+ if session != msg.get_session() {
+ error!("Got two different session IDs ({} and {}) for ourselves",
+ session,
+ msg.get_session());
+ } else {
+ debug!("Got our session ID twice");
+ }
+ }
+ }
+ }
+ }
+ self.server.parse_user_state(msg);
+ }
+
+ pub fn audio(&self) -> &Audio { &self.audio }
+ pub fn audio_mut(&mut self) -> &mut Audio { &mut self.audio }
+
+ pub fn username(&self) -> &str { &self.username }
+
+ pub fn server_mut(&mut self) -> &mut Server { &mut self.server }
+}
pub struct Server {
channels: HashMap<u32, Channel>,