From d484c05e56194346944b295968c66ccc0e543534 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Wed, 24 Mar 2021 21:38:51 +0100 Subject: remove ipc-channel dependency --- mumd/src/client.rs | 5 ++-- mumd/src/command.rs | 3 +-- mumd/src/main.rs | 73 +++++++++++++++++++++++++---------------------------- 3 files changed, 37 insertions(+), 44 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/client.rs b/mumd/src/client.rs index 84c1ea1..cdae7eb 100644 --- a/mumd/src/client.rs +++ b/mumd/src/client.rs @@ -2,16 +2,15 @@ use crate::command; use crate::network::{tcp, udp, ConnectionInfo}; use crate::state::State; -use ipc_channel::ipc::IpcSender; use mumble_protocol::{Serverbound, control::ControlPacket, crypt::ClientCryptState}; use mumlib::command::{Command, CommandResponse}; use std::sync::Arc; -use tokio::{join, sync::{mpsc, watch, Mutex}}; +use tokio::{join, sync::{Mutex, mpsc, oneshot, watch}}; pub async fn handle( command_receiver: mpsc::UnboundedReceiver<( Command, - IpcSender>>, + oneshot::Sender>>, )>, ) { let (connection_info_sender, connection_info_receiver) = diff --git a/mumd/src/command.rs b/mumd/src/command.rs index 653d1fa..3e462b1 100644 --- a/mumd/src/command.rs +++ b/mumd/src/command.rs @@ -5,7 +5,6 @@ use crate::network::{ }; use crate::state::{ExecutionContext, State}; -use ipc_channel::ipc::IpcSender; use log::*; use mumble_protocol::{Serverbound, control::ControlPacket}; use mumlib::command::{Command, CommandResponse}; @@ -16,7 +15,7 @@ pub async fn handle( state: Arc>, mut command_receiver: mpsc::UnboundedReceiver<( Command, - IpcSender>>, + oneshot::Sender>>, )>, tcp_event_register_sender: mpsc::UnboundedSender<(TcpEvent, TcpEventCallback)>, ping_request_sender: mpsc::UnboundedSender, diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 7c3745c..d4151e3 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -5,13 +5,14 @@ mod network; mod notify; mod state; -use ipc_channel::ipc::{self, IpcOneShotServer, IpcSender}; +use futures_util::StreamExt; +//use ipc_channel::ipc::{self, IpcOneShotServer, IpcSender}; use log::*; use mumlib::command::{Command, CommandResponse}; use mumlib::setup_logger; -use std::fs; -use tokio::{join, sync::mpsc}; -use tokio::task::spawn_blocking; +use tokio::{join, net::UnixListener, sync::{mpsc, oneshot}}; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; +use bytes::{BufMut, BytesMut}; #[tokio::main] async fn main() { @@ -24,7 +25,7 @@ async fn main() { notify::init(); // check if another instance is live - let (tx_client, rx_client) = + /*let (tx_client, rx_client) = ipc::channel::>>().unwrap(); if let Ok(server_name) = fs::read_to_string(mumlib::SOCKET_PATH) { if let Ok(tx0) = IpcSender::connect(server_name) { @@ -41,50 +42,44 @@ async fn main() { } } } - } + }*/ - let (command_sender, command_receiver) = mpsc::unbounded_channel::<( - Command, - IpcSender>>, - )>(); + let (command_sender, command_receiver) = mpsc::unbounded_channel(); - let (_, e) = join!( + join!( client::handle(command_receiver), - spawn_blocking(move || { - // IpcSender is blocking - receive_oneshot_commands(command_sender); - }), + receive_commands(command_sender), ); - e.unwrap(); } -fn receive_oneshot_commands( +async fn receive_commands( command_sender: mpsc::UnboundedSender<( Command, - IpcSender>>, + oneshot::Sender>>, )>, ) { + let socket = UnixListener::bind("/tmp/mumd").unwrap(); + loop { - // create listener - let (server, server_name): ( - IpcOneShotServer<( - Command, - IpcSender>>, - )>, - String, - ) = IpcOneShotServer::new().unwrap(); - fs::write(mumlib::SOCKET_PATH, &server_name).unwrap(); - debug!("Listening to {}", server_name); + if let Ok((incoming, _)) = socket.accept().await { + let (reader, writer) = incoming.into_split(); + let reader = FramedRead::new(reader, LengthDelimitedCodec::new()); + let writer = FramedWrite::new(writer, LengthDelimitedCodec::new()); + + reader.filter_map(|buf| async { + buf.ok() + }) + .map(|buf| bincode::deserialize::(&buf).unwrap()) + .filter_map(|command| async { + let (tx, rx) = oneshot::channel(); - // receive command and response channel - let (_, conn): ( - _, - ( - Command, - IpcSender>>, - ), - ) = server.accept().unwrap(); - debug!("Sending command {:?} to command handler", conn.0); - command_sender.send(conn).unwrap(); + command_sender.send((command, tx)).unwrap(); + + let response = rx.await.unwrap(); + let mut serialized = BytesMut::new(); + bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); + Some(Ok(serialized.freeze())) + }).forward(writer).await.unwrap(); + } } -} +} \ No newline at end of file -- cgit v1.2.1 From 169eb4f49f3fd587e9c28f2209b2dd130425d866 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 26 Mar 2021 01:10:09 +0100 Subject: re-add check for live server --- mumd/src/main.rs | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index d4151e3..5b22089 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -5,12 +5,12 @@ mod network; mod notify; mod state; -use futures_util::StreamExt; +use futures_util::{SinkExt, StreamExt}; //use ipc_channel::ipc::{self, IpcOneShotServer, IpcSender}; use log::*; use mumlib::command::{Command, CommandResponse}; use mumlib::setup_logger; -use tokio::{join, net::UnixListener, sync::{mpsc, oneshot}}; +use tokio::{join, net::{UnixListener, UnixStream}, sync::{mpsc, oneshot}}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use bytes::{BufMut, BytesMut}; @@ -25,24 +25,32 @@ async fn main() { notify::init(); // check if another instance is live - /*let (tx_client, rx_client) = - ipc::channel::>>().unwrap(); - if let Ok(server_name) = fs::read_to_string(mumlib::SOCKET_PATH) { - if let Ok(tx0) = IpcSender::connect(server_name) { - if tx0.send((Command::Ping, tx_client)).is_ok() { - match rx_client.recv() { - Ok(Ok(Some(CommandResponse::Pong))) => { + let connection = UnixStream::connect("/tmp/mumd").await; + match connection { + Ok(stream) => { + let (reader, writer) = stream.into_split(); + let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new()); + let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new()); + let mut command = BytesMut::new(); + bincode::serialize_into((&mut command).writer(), &Command::Ping).unwrap(); + if let Ok(()) = writer.send(command.freeze()).await { + if let Some(Ok(buf)) = reader.next().await { + if let Ok(Ok::, mumlib::error::Error>(Some(CommandResponse::Pong))) = bincode::deserialize(&buf) { error!("Another instance of mumd is already running"); return; - }, - resp => { - warn!("Ping with weird response. Continuing..."); - debug!("Response was {:?}", resp); } } } + debug!("a dead socket was found, removing"); + tokio::fs::remove_file("/tmp/mumd").await.unwrap(); } - }*/ + Err(e) => { + if matches!(e.kind(), std::io::ErrorKind::ConnectionRefused) { + debug!("a dead socket was found, removing"); + tokio::fs::remove_file("/tmp/mumd").await.unwrap(); + } + } + } let (command_sender, command_receiver) = mpsc::unbounded_channel(); -- cgit v1.2.1 From 5cc61e8d2140280421b5962b2911ebbcc927e9d1 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 26 Mar 2021 01:15:55 +0100 Subject: re-add error checking --- mumd/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'mumd/src') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 5b22089..18b84b8 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -77,7 +77,8 @@ async fn receive_commands( reader.filter_map(|buf| async { buf.ok() }) - .map(|buf| bincode::deserialize::(&buf).unwrap()) + .map(|buf| bincode::deserialize::(&buf)) + .filter_map(|e| async { e.ok() }) .filter_map(|command| async { let (tx, rx) = oneshot::channel(); -- cgit v1.2.1 From 0f677fff3f48d4d17ece37060323ce19a41d87ad Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 26 Mar 2021 01:20:16 +0100 Subject: remove code comment --- mumd/src/main.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'mumd/src') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 18b84b8..c596b8f 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -6,7 +6,6 @@ mod notify; mod state; use futures_util::{SinkExt, StreamExt}; -//use ipc_channel::ipc::{self, IpcOneShotServer, IpcSender}; use log::*; use mumlib::command::{Command, CommandResponse}; use mumlib::setup_logger; -- cgit v1.2.1 From b0034ec3e344030274c98cf81f75789d80ce6211 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 26 Mar 2021 01:23:57 +0100 Subject: use constant string for socket path --- mumd/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index c596b8f..5b71ae0 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -24,7 +24,7 @@ async fn main() { notify::init(); // check if another instance is live - let connection = UnixStream::connect("/tmp/mumd").await; + let connection = UnixStream::connect(mumlib::SOCKET_PATH).await; match connection { Ok(stream) => { let (reader, writer) = stream.into_split(); @@ -41,12 +41,12 @@ async fn main() { } } debug!("a dead socket was found, removing"); - tokio::fs::remove_file("/tmp/mumd").await.unwrap(); + tokio::fs::remove_file(mumlib::SOCKET_PATH).await.unwrap(); } Err(e) => { if matches!(e.kind(), std::io::ErrorKind::ConnectionRefused) { debug!("a dead socket was found, removing"); - tokio::fs::remove_file("/tmp/mumd").await.unwrap(); + tokio::fs::remove_file(mumlib::SOCKET_PATH).await.unwrap(); } } } @@ -65,7 +65,7 @@ async fn receive_commands( oneshot::Sender>>, )>, ) { - let socket = UnixListener::bind("/tmp/mumd").unwrap(); + let socket = UnixListener::bind(mumlib::SOCKET_PATH).unwrap(); loop { if let Ok((incoming, _)) = socket.accept().await { -- cgit v1.2.1 From 8afd49fcb78bd725ef602e48b73d42a91673d0c5 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 26 Mar 2021 01:48:09 +0100 Subject: make command receiving scaleable --- mumd/src/main.rs | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 5b71ae0..6303978 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -69,25 +69,34 @@ async fn receive_commands( loop { if let Ok((incoming, _)) = socket.accept().await { - let (reader, writer) = incoming.into_split(); - let reader = FramedRead::new(reader, LengthDelimitedCodec::new()); - let writer = FramedWrite::new(writer, LengthDelimitedCodec::new()); + let sender = command_sender.clone(); + tokio::spawn(async move { + let (reader, writer) = incoming.into_split(); + let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new()); + let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new()); + + while let Some(next) = reader.next().await { + let buf = match next { + Ok(buf) => buf, + Err(_) => continue, + }; - reader.filter_map(|buf| async { - buf.ok() - }) - .map(|buf| bincode::deserialize::(&buf)) - .filter_map(|e| async { e.ok() }) - .filter_map(|command| async { - let (tx, rx) = oneshot::channel(); + let command = match bincode::deserialize::(&buf) { + Ok(e) => e, + Err(_) => continue, + }; - command_sender.send((command, tx)).unwrap(); + let (tx, rx) = oneshot::channel(); - let response = rx.await.unwrap(); - let mut serialized = BytesMut::new(); - bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); - Some(Ok(serialized.freeze())) - }).forward(writer).await.unwrap(); + sender.send((command, tx)).unwrap(); + + let response = rx.await.unwrap(); + let mut serialized = BytesMut::new(); + bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); + + let _ = writer.send(serialized.freeze()).await; + } + }); } } } \ No newline at end of file -- cgit v1.2.1 From d0eb98decf0d78957acc870624c338b75fc24edf Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Mon, 29 Mar 2021 00:47:26 +0200 Subject: re-add trailing newline 2 electric boogaloo --- mumd/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'mumd/src') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 6303978..f7168c4 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -99,4 +99,5 @@ async fn receive_commands( }); } } -} \ No newline at end of file +} + -- cgit v1.2.1 From 80cedde5d6868f5e4db85db2c80825e49d981a48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustav=20S=C3=B6rn=C3=A4s?= Date: Mon, 29 Mar 2021 11:16:35 +0200 Subject: remove extra newline --- mumd/src/main.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'mumd/src') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index f7168c4..26e8d49 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -100,4 +100,3 @@ async fn receive_commands( } } } - -- cgit v1.2.1