#![warn(elided_lifetimes_in_paths)] #![warn(meta_variable_misuse)] #![warn(missing_debug_implementations)] #![warn(single_use_lifetimes)] #![warn(unreachable_pub)] #![warn(unused_crate_dependencies)] #![warn(unused_import_braces)] #![warn(unused_lifetimes)] #![warn(unused_qualifications)] #![deny(macro_use_extern_crate)] #![deny(missing_abi)] #![deny(future_incompatible)] #![forbid(unsafe_code)] #![forbid(non_ascii_idents)] pub mod audio; pub mod client; pub mod command; pub mod error; pub mod network; pub mod notifications; pub mod state; use crate::state::State; use bytes::{BufMut, BytesMut}; use futures_util::{select, FutureExt, SinkExt, StreamExt}; use log::*; use mumlib::command::{Command, CommandResponse}; use mumlib::setup_logger; use std::io::ErrorKind; use tokio::{ net::{UnixListener, UnixStream}, sync::mpsc, }; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; #[tokio::main] async fn main() { if std::env::args().any(|s| s.as_str() == "--version" || s.as_str() == "-V") { println!("mumd {}", env!("VERSION")); return; } setup_logger(std::io::stderr(), true); notifications::init(); // check if another instance is live let connection = UnixStream::connect(mumlib::SOCKET_PATH).await; match connection { Ok(stream) => { let (reader, writer) = stream.into_split(); let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new()); let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new()); let mut command = BytesMut::new(); bincode::serialize_into((&mut command).writer(), &Command::Ping).unwrap(); if let Ok(()) = writer.send(command.freeze()).await { if let Some(Ok(buf)) = reader.next().await { if let Ok(Ok::, mumlib::Error>(Some( CommandResponse::Pong, ))) = bincode::deserialize(&buf) { error!("Another instance of mumd is already running"); return; } } } debug!("a dead socket was found, removing"); tokio::fs::remove_file(mumlib::SOCKET_PATH).await.unwrap(); } Err(e) => { if matches!(e.kind(), std::io::ErrorKind::ConnectionRefused) { debug!("a dead socket was found, removing"); tokio::fs::remove_file(mumlib::SOCKET_PATH).await.unwrap(); } } } let (command_sender, command_receiver) = mpsc::unbounded_channel(); let state = match State::new() { Ok(s) => s, Err(e) => { error!("Error instantiating mumd: {}", e); return; } }; let run = select! { r = client::handle(state, command_receiver).fuse() => r, _ = receive_commands(command_sender).fuse() => Ok(()), }; if let Err(e) = run { error!("mumd: {}", e); std::process::exit(1); } } async fn receive_commands( command_sender: mpsc::UnboundedSender<( Command, mpsc::UnboundedSender>>, )>, ) { let socket = UnixListener::bind(mumlib::SOCKET_PATH).unwrap(); loop { if let Ok((incoming, _)) = socket.accept().await { let sender = command_sender.clone(); tokio::spawn(async move { let (reader, writer) = incoming.into_split(); let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new()); let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new()); while let Some(next) = reader.next().await { let buf = match next { Ok(buf) => buf, Err(_) => continue, }; let command = match bincode::deserialize::(&buf) { Ok(e) => e, Err(_) => continue, }; let (tx, mut rx) = mpsc::unbounded_channel(); sender.send((command, tx)).unwrap(); while let Some(response) = rx.recv().await { let mut serialized = BytesMut::new(); bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); if let Err(e) = writer.send(serialized.freeze()).await { if e.kind() != ErrorKind::BrokenPipe { //if the client closed the connection, ignore logging the error //we just assume that they just don't want any more packets error!("Error sending response: {:?}", e); } break; } } } }); } } }