aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/main.rs
blob: 479c568f3244d7c1e85661cfd34306070f5da1cd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#![warn(elided_lifetimes_in_paths)]
#![warn(meta_variable_misuse)]
#![warn(missing_debug_implementations)]
#![warn(single_use_lifetimes)]
#![warn(unreachable_pub)]
#![warn(unused_crate_dependencies)]
#![warn(unused_import_braces)]
#![warn(unused_lifetimes)]
#![warn(unused_qualifications)]
#![deny(macro_use_extern_crate)]
#![deny(missing_abi)]
#![deny(future_incompatible)]
#![forbid(unsafe_code)]
#![forbid(non_ascii_idents)]

pub mod audio;
pub mod client;
pub mod command;
pub mod error;
pub mod network;
pub mod notifications;
pub mod state;

use crate::state::State;

use bytes::{BufMut, BytesMut};
use futures_util::{select, FutureExt, SinkExt, StreamExt};
use log::*;
use mumlib::command::{Command, CommandResponse};
use mumlib::setup_logger;
use std::io::ErrorKind;
use tokio::{
    net::{UnixListener, UnixStream},
    sync::mpsc,
};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};

#[tokio::main]
async fn main() {
    if std::env::args().any(|s| s.as_str() == "--version") {
        println!("mumd {}", env!("VERSION"));
        return;
    }

    setup_logger(std::io::stderr(), true);
    notifications::init();

    // check if another instance is live
    let connection = UnixStream::connect(mumlib::SOCKET_PATH).await;
    match connection {
        Ok(stream) => {
            let (reader, writer) = stream.into_split();
            let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new());
            let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
            let mut command = BytesMut::new();
            bincode::serialize_into((&mut command).writer(), &Command::Ping).unwrap();
            if let Ok(()) = writer.send(command.freeze()).await {
                if let Some(Ok(buf)) = reader.next().await {
                    if let Ok(Ok::<Option<CommandResponse>, mumlib::Error>(Some(
                        CommandResponse::Pong,
                    ))) = bincode::deserialize(&buf)
                    {
                        error!("Another instance of mumd is already running");
                        return;
                    }
                }
            }
            debug!("a dead socket was found, removing");
            tokio::fs::remove_file(mumlib::SOCKET_PATH).await.unwrap();
        }
        Err(e) => {
            if matches!(e.kind(), std::io::ErrorKind::ConnectionRefused) {
                debug!("a dead socket was found, removing");
                tokio::fs::remove_file(mumlib::SOCKET_PATH).await.unwrap();
            }
        }
    }

    let (command_sender, command_receiver) = mpsc::unbounded_channel();

    let state = match State::new() {
        Ok(s) => s,
        Err(e) => {
            error!("Error instantiating mumd: {}", e);
            return;
        }
    };

    let run = select! {
        r = client::handle(state, command_receiver).fuse() => r,
        _ = receive_commands(command_sender).fuse() => Ok(()),
    };

    if let Err(e) = run {
        error!("mumd: {}", e);
        std::process::exit(1);
    }
}

async fn receive_commands(
    command_sender: mpsc::UnboundedSender<(
        Command,
        mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
    )>,
) {
    let socket = UnixListener::bind(mumlib::SOCKET_PATH).unwrap();

    loop {
        if let Ok((incoming, _)) = socket.accept().await {
            let sender = command_sender.clone();
            tokio::spawn(async move {
                let (reader, writer) = incoming.into_split();
                let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new());
                let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());

                while let Some(next) = reader.next().await {
                    let buf = match next {
                        Ok(buf) => buf,
                        Err(_) => continue,
                    };

                    let command = match bincode::deserialize::<Command>(&buf) {
                        Ok(e) => e,
                        Err(_) => continue,
                    };

                    let (tx, mut rx) = mpsc::unbounded_channel();

                    sender.send((command, tx)).unwrap();

                    while let Some(response) = rx.recv().await {
                        let mut serialized = BytesMut::new();
                        bincode::serialize_into((&mut serialized).writer(), &response).unwrap();

                        if let Err(e) = writer.send(serialized.freeze()).await {
                            if e.kind() != ErrorKind::BrokenPipe {
                                //if the client closed the connection, ignore logging the error
                                //we just assume that they just don't want any more packets
                                error!("Error sending response: {:?}", e);
                            }
                            break;
                        }
                    }
                }
            });
        }
    }
}