aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGustav Sörnäs <gustav@sornas.net>2021-01-02 18:41:59 +0100
committerGitHub <noreply@github.com>2021-01-02 18:41:59 +0100
commit28f0ccd4639865e10690022c8164ba4c5b337102 (patch)
tree24c79ff42ce823b29cd94f72d7e567c67a7fc8f0
parent67364577263943e815be9ba700c10845698e116d (diff)
parentaef5b85b22b916a3a7f84b1b9bbea151544580f3 (diff)
downloadmum-28f0ccd4639865e10690022c8164ba4c5b337102.tar.gz
Merge pull request #49 from sornas/move-more-to-backend
Move mumble client stuff away from main
-rw-r--r--mumd/src/client.rs55
-rw-r--r--mumd/src/command.rs10
-rw-r--r--mumd/src/main.rs43
-rw-r--r--mumd/src/network/tcp.rs4
-rw-r--r--mumd/src/state.rs28
5 files changed, 78 insertions, 62 deletions
diff --git a/mumd/src/client.rs b/mumd/src/client.rs
new file mode 100644
index 0000000..3613061
--- /dev/null
+++ b/mumd/src/client.rs
@@ -0,0 +1,55 @@
+use crate::command;
+use crate::network::{tcp, udp, ConnectionInfo};
+use crate::state::State;
+
+use futures_util::join;
+use ipc_channel::ipc::IpcSender;
+use mumble_protocol::{Serverbound, control::ControlPacket, crypt::ClientCryptState};
+use mumlib::command::{Command, CommandResponse};
+use std::sync::{Arc, Mutex};
+use tokio::sync::{mpsc, watch};
+
+pub async fn handle(
+ command_receiver: mpsc::UnboundedReceiver<(
+ Command,
+ IpcSender<mumlib::error::Result<Option<CommandResponse>>>,
+ )>,
+) {
+ let (connection_info_sender, connection_info_receiver) =
+ watch::channel::<Option<ConnectionInfo>>(None);
+ let (crypt_state_sender, crypt_state_receiver) =
+ mpsc::channel::<ClientCryptState>(1);
+ let (packet_sender, packet_receiver) =
+ mpsc::unbounded_channel::<ControlPacket<Serverbound>>();
+ let (ping_request_sender, ping_request_receiver) =
+ mpsc::unbounded_channel();
+ let (response_sender, response_receiver) =
+ mpsc::unbounded_channel();
+
+ let state = State::new();
+ let state = Arc::new(Mutex::new(state));
+ join!(
+ tcp::handle(
+ Arc::clone(&state),
+ connection_info_receiver.clone(),
+ crypt_state_sender,
+ packet_sender.clone(),
+ packet_receiver,
+ response_receiver,
+ ),
+ udp::handle(
+ Arc::clone(&state),
+ connection_info_receiver.clone(),
+ crypt_state_receiver,
+ ),
+ command::handle(
+ state,
+ command_receiver,
+ response_sender,
+ ping_request_sender,
+ packet_sender,
+ connection_info_sender,
+ ),
+ udp::handle_pings(ping_request_receiver),
+ );
+}
diff --git a/mumd/src/command.rs b/mumd/src/command.rs
index 330e3fc..e8c92c3 100644
--- a/mumd/src/command.rs
+++ b/mumd/src/command.rs
@@ -1,13 +1,13 @@
-use crate::state::{ExecutionContext, State};
+use crate::{network::ConnectionInfo, state::{ExecutionContext, State}};
use crate::network::tcp::{TcpEvent, TcpEventCallback};
use ipc_channel::ipc::IpcSender;
use log::*;
-use mumble_protocol::ping::PongPacket;
+use mumble_protocol::{Serverbound, control::ControlPacket, ping::PongPacket};
use mumlib::command::{Command, CommandResponse};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, watch};
pub async fn handle(
state: Arc<Mutex<State>>,
@@ -17,12 +17,14 @@ pub async fn handle(
)>,
tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>,
ping_request_sender: mpsc::UnboundedSender<(u64, SocketAddr, Box<dyn FnOnce(PongPacket)>)>,
+ mut packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
+ mut connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
) {
debug!("Begin listening for commands");
while let Some((command, response_sender)) = command_receiver.recv().await {
debug!("Received command {:?}", command);
let mut state = state.lock().unwrap();
- let event = state.handle_command(command);
+ let event = state.handle_command(command, &mut packet_sender, &mut connection_info_sender);
drop(state);
match event {
ExecutionContext::TcpEvent(event, generator) => {
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index db6d2ef..67481f9 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -1,23 +1,17 @@
mod audio;
+mod client;
mod command;
mod network;
mod notify;
mod state;
-use crate::network::ConnectionInfo;
-use crate::state::State;
-
use futures::join;
use ipc_channel::ipc::{self, IpcOneShotServer, IpcSender};
use log::*;
-use mumble_protocol::control::ControlPacket;
-use mumble_protocol::crypt::ClientCryptState;
-use mumble_protocol::voice::Serverbound;
use mumlib::command::{Command, CommandResponse};
use mumlib::setup_logger;
use std::fs;
-use std::sync::{Arc, Mutex};
-use tokio::sync::{mpsc, watch};
+use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
#[tokio::main]
@@ -45,46 +39,17 @@ async fn main() {
}
}
- // Oneshot channel for setting UDP CryptState from control task
- // For simplicity we don't deal with re-syncing, real applications would have to.
- let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::<ClientCryptState>(1); // crypt state should always be consumed before sending a new one
- let (packet_sender, packet_receiver) = mpsc::unbounded_channel::<ControlPacket<Serverbound>>();
let (command_sender, command_receiver) = mpsc::unbounded_channel::<(
Command,
IpcSender<mumlib::error::Result<Option<CommandResponse>>>,
)>();
- let (connection_info_sender, connection_info_receiver) =
- watch::channel::<Option<ConnectionInfo>>(None);
- let (response_sender, response_receiver) = mpsc::unbounded_channel();
- let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel();
-
- let state = State::new(packet_sender, connection_info_sender);
- let state = Arc::new(Mutex::new(state));
- let (_, _, _, e, _) = join!(
- network::tcp::handle(
- Arc::clone(&state),
- connection_info_receiver.clone(),
- crypt_state_sender,
- packet_receiver,
- response_receiver,
- ),
- network::udp::handle(
- Arc::clone(&state),
- connection_info_receiver.clone(),
- crypt_state_receiver,
- ),
- command::handle(
- state,
- command_receiver,
- response_sender,
- ping_request_sender,
- ),
+ let (_, e) = join!(
+ client::handle(command_receiver),
spawn_blocking(move || {
// IpcSender is blocking
receive_oneshot_commands(command_sender);
}),
- network::udp::handle_pings(ping_request_receiver),
);
e.unwrap();
}
diff --git a/mumd/src/network/tcp.rs b/mumd/src/network/tcp.rs
index 3c96ee1..47ea311 100644
--- a/mumd/src/network/tcp.rs
+++ b/mumd/src/network/tcp.rs
@@ -44,6 +44,7 @@ pub async fn handle(
state: Arc<Mutex<State>>,
mut connection_info_receiver: watch::Receiver<Option<ConnectionInfo>>,
crypt_state_sender: mpsc::Sender<ClientCryptState>,
+ packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
mut packet_receiver: mpsc::UnboundedReceiver<ControlPacket<Serverbound>>,
mut tcp_event_register_receiver: mpsc::UnboundedReceiver<(TcpEvent, TcpEventCallback)>,
) {
@@ -67,14 +68,13 @@ pub async fn handle(
let state_lock = state.lock().unwrap();
authenticate(&mut sink, state_lock.username().unwrap().to_string()).await;
let phase_watcher = state_lock.phase_receiver();
- let packet_sender = state_lock.packet_sender();
drop(state_lock);
let event_queue = Arc::new(Mutex::new(HashMap::new()));
info!("Logging in...");
join!(
- send_pings(packet_sender, 10, phase_watcher.clone()),
+ send_pings(packet_sender.clone(), 10, phase_watcher.clone()),
listen(
Arc::clone(&state),
stream,
diff --git a/mumd/src/state.rs b/mumd/src/state.rs
index 85e5449..8fa05ae 100644
--- a/mumd/src/state.rs
+++ b/mumd/src/state.rs
@@ -57,17 +57,11 @@ pub struct State {
server: Option<Server>,
audio: Audio,
- packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
- connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
-
phase_watcher: (watch::Sender<StatePhase>, watch::Receiver<StatePhase>),
}
impl State {
- pub fn new(
- packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
- connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
- ) -> Self {
+ pub fn new() -> Self {
let config = mumlib::config::read_default_cfg();
let audio = Audio::new(
config.audio.input_volume.unwrap_or(1.0),
@@ -77,15 +71,18 @@ impl State {
config,
server: None,
audio,
- packet_sender,
- connection_info_sender,
phase_watcher: watch::channel(StatePhase::Disconnected),
};
state.reload_config();
state
}
- pub fn handle_command(&mut self, command: Command) -> ExecutionContext {
+ pub fn handle_command(
+ &mut self,
+ command: Command,
+ packet_sender: &mut mpsc::UnboundedSender<ControlPacket<Serverbound>>,
+ connection_info_sender: &mut watch::Sender<Option<ConnectionInfo>>,
+ ) -> ExecutionContext {
match command {
Command::ChannelJoin { channel_identifier } => {
if !matches!(*self.phase_receiver().borrow(), StatePhase::Connected) {
@@ -134,7 +131,7 @@ impl State {
let mut msg = msgs::UserState::new();
msg.set_session(self.server.as_ref().unwrap().session_id().unwrap());
msg.set_channel_id(id);
- self.packet_sender.send(msg.into()).unwrap();
+ packet_sender.send(msg.into()).unwrap();
now!(Ok(None))
}
Command::ChannelList => {
@@ -200,7 +197,7 @@ impl State {
let server = self.server_mut().unwrap();
server.set_muted(mute);
server.set_deafened(deafen);
- self.packet_sender.send(msg.into()).unwrap();
+ packet_sender.send(msg.into()).unwrap();
}
now!(Ok(new_deaf.map(|b| CommandResponse::DeafenStatus { is_deafened: b })))
@@ -294,7 +291,7 @@ impl State {
let server = self.server_mut().unwrap();
server.set_muted(mute);
server.set_deafened(deafen);
- self.packet_sender.send(msg.into()).unwrap();
+ packet_sender.send(msg.into()).unwrap();
}
now!(Ok(new_mute.map(|b| CommandResponse::MuteStatus { is_muted: b })))
@@ -334,7 +331,7 @@ impl State {
return now!(Err(Error::InvalidServerAddrError(host, port)));
}
};
- self.connection_info_sender
+ connection_info_sender
.send(Some(ConnectionInfo::new(
socket_addr,
host,
@@ -592,9 +589,6 @@ impl State {
pub fn audio_mut(&mut self) -> &mut Audio {
&mut self.audio
}
- pub fn packet_sender(&self) -> mpsc::UnboundedSender<ControlPacket<Serverbound>> {
- self.packet_sender.clone()
- }
pub fn phase_receiver(&self) -> watch::Receiver<StatePhase> {
self.phase_watcher.1.clone()
}