1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
mod audio;
mod client;
mod command;
mod error;
mod network;
mod notifications;
mod state;
use crate::state::State;
use bytes::{BufMut, BytesMut};
use futures_util::{select, FutureExt, SinkExt, StreamExt};
use log::*;
use mumlib::command::{Command, CommandResponse};
use mumlib::setup_logger;
use std::io::ErrorKind;
use tokio::{
net::{UnixListener, UnixStream},
sync::mpsc,
};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
#[tokio::main]
async fn main() {
if std::env::args().any(|s| s.as_str() == "--version" || s.as_str() == "-V") {
println!("mumd {}", env!("VERSION"));
return;
}
setup_logger(std::io::stderr(), true);
notifications::init();
// check if another instance is live
let connection = UnixStream::connect(mumlib::SOCKET_PATH).await;
match connection {
Ok(stream) => {
let (reader, writer) = stream.into_split();
let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new());
let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
let mut command = BytesMut::new();
bincode::serialize_into((&mut command).writer(), &Command::Ping).unwrap();
if let Ok(()) = writer.send(command.freeze()).await {
if let Some(Ok(buf)) = reader.next().await {
if let Ok(Ok::<Option<CommandResponse>, mumlib::Error>(Some(
CommandResponse::Pong,
))) = bincode::deserialize(&buf)
{
error!("Another instance of mumd is already running");
return;
}
}
}
debug!("a dead socket was found, removing");
tokio::fs::remove_file(mumlib::SOCKET_PATH).await.unwrap();
}
Err(e) => {
if matches!(e.kind(), std::io::ErrorKind::ConnectionRefused) {
debug!("a dead socket was found, removing");
tokio::fs::remove_file(mumlib::SOCKET_PATH).await.unwrap();
}
}
}
let (command_sender, command_receiver) = mpsc::unbounded_channel();
let state = match State::new() {
Ok(s) => s,
Err(e) => {
error!("Error instantiating mumd: {}", e);
return;
}
};
let run = select! {
r = client::handle(state, command_receiver).fuse() => r,
_ = receive_commands(command_sender).fuse() => Ok(()),
};
match run {
Err(e) => {
error!("mumd: {}", e);
std::process::exit(1);
}
_ => {}
}
}
async fn receive_commands(
command_sender: mpsc::UnboundedSender<(
Command,
mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
)>,
) {
let socket = UnixListener::bind(mumlib::SOCKET_PATH).unwrap();
loop {
if let Ok((incoming, _)) = socket.accept().await {
let sender = command_sender.clone();
tokio::spawn(async move {
let (reader, writer) = incoming.into_split();
let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new());
let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
while let Some(next) = reader.next().await {
let buf = match next {
Ok(buf) => buf,
Err(_) => continue,
};
let command = match bincode::deserialize::<Command>(&buf) {
Ok(e) => e,
Err(_) => continue,
};
let (tx, mut rx) = mpsc::unbounded_channel();
sender.send((command, tx)).unwrap();
while let Some(response) = rx.recv().await {
let mut serialized = BytesMut::new();
bincode::serialize_into((&mut serialized).writer(), &response).unwrap();
if let Err(e) = writer.send(serialized.freeze()).await {
if e.kind() != ErrorKind::BrokenPipe {
//if the client closed the connection, ignore logging the error
//we just assume that they just don't want any more packets
error!("Error sending response: {:?}", e);
}
break;
}
}
}
});
}
}
}
|