1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
mod audio;
mod client;
mod command;
mod error;
mod network;
mod notify;
mod state;
use crate::state::State;
use futures_util::{SinkExt, StreamExt};
use log::*;
use mumlib::command::{Command, CommandResponse};
use mumlib::setup_logger;
use tokio::{join, net::{UnixListener, UnixStream}, sync::{mpsc, oneshot}};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use bytes::{BufMut, BytesMut};
#[tokio::main]
async fn main() {
if std::env::args().find(|s| s.as_str() == "--version").is_some() {
println!("mumd {}", env!("VERSION"));
return;
}
setup_logger(std::io::stderr(), true);
notify::init();
// check if another instance is live
let connection = UnixStream::connect(mumlib::SOCKET_PATH).await;
match connection {
Ok(stream) => {
let (reader, writer) = stream.into_split();
let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new());
let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
let mut command = BytesMut::new();
bincode::serialize_into((&mut command).writer(), &Command::Ping).unwrap();
if let Ok(()) = writer.send(command.freeze()).await {
if let Some(Ok(buf)) = reader.next().await {
if let Ok(Ok::<Option<CommandResponse>, mumlib::Error>(Some(CommandResponse::Pong))) = bincode::deserialize(&buf) {
error!("Another instance of mumd is already running");
return;
}
}
}
debug!("a dead socket was found, removing");
tokio::fs::remove_file(mumlib::SOCKET_PATH).await.unwrap();
}
Err(e) => {
if matches!(e.kind(), std::io::ErrorKind::ConnectionRefused) {
debug!("a dead socket was found, removing");
tokio::fs::remove_file(mumlib::SOCKET_PATH).await.unwrap();
}
}
}
let (command_sender, command_receiver) = mpsc::unbounded_channel();
let state = match State::new() {
Ok(s) => s,
Err(e) => {
error!("Error instantiating mumd: {}", e);
return;
}
};
join!(
client::handle(state, command_receiver),
receive_commands(command_sender),
);
}
async fn receive_commands(
command_sender: mpsc::UnboundedSender<(
Command,
oneshot::Sender<mumlib::error::Result<Option<CommandResponse>>>,
)>,
) {
let socket = UnixListener::bind(mumlib::SOCKET_PATH).unwrap();
loop {
if let Ok((incoming, _)) = socket.accept().await {
let sender = command_sender.clone();
tokio::spawn(async move {
let (reader, writer) = incoming.into_split();
let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new());
let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
while let Some(next) = reader.next().await {
let buf = match next {
Ok(buf) => buf,
Err(_) => continue,
};
let command = match bincode::deserialize::<Command>(&buf) {
Ok(e) => e,
Err(_) => continue,
};
let (tx, rx) = oneshot::channel();
sender.send((command, tx)).unwrap();
let response = rx.await.unwrap();
let mut serialized = BytesMut::new();
bincode::serialize_into((&mut serialized).writer(), &response).unwrap();
let _ = writer.send(serialized.freeze()).await;
}
});
}
}
}
|