From 8afd49fcb78bd725ef602e48b73d42a91673d0c5 Mon Sep 17 00:00:00 2001 From: Eskil Queseth Date: Fri, 26 Mar 2021 01:48:09 +0100 Subject: make command receiving scaleable --- mumd/src/main.rs | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) (limited to 'mumd/src') diff --git a/mumd/src/main.rs b/mumd/src/main.rs index 5b71ae0..6303978 100644 --- a/mumd/src/main.rs +++ b/mumd/src/main.rs @@ -69,25 +69,34 @@ async fn receive_commands( loop { if let Ok((incoming, _)) = socket.accept().await { - let (reader, writer) = incoming.into_split(); - let reader = FramedRead::new(reader, LengthDelimitedCodec::new()); - let writer = FramedWrite::new(writer, LengthDelimitedCodec::new()); + let sender = command_sender.clone(); + tokio::spawn(async move { + let (reader, writer) = incoming.into_split(); + let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new()); + let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new()); + + while let Some(next) = reader.next().await { + let buf = match next { + Ok(buf) => buf, + Err(_) => continue, + }; - reader.filter_map(|buf| async { - buf.ok() - }) - .map(|buf| bincode::deserialize::(&buf)) - .filter_map(|e| async { e.ok() }) - .filter_map(|command| async { - let (tx, rx) = oneshot::channel(); + let command = match bincode::deserialize::(&buf) { + Ok(e) => e, + Err(_) => continue, + }; - command_sender.send((command, tx)).unwrap(); + let (tx, rx) = oneshot::channel(); - let response = rx.await.unwrap(); - let mut serialized = BytesMut::new(); - bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); - Some(Ok(serialized.freeze())) - }).forward(writer).await.unwrap(); + sender.send((command, tx)).unwrap(); + + let response = rx.await.unwrap(); + let mut serialized = BytesMut::new(); + bincode::serialize_into((&mut serialized).writer(), &response).unwrap(); + + let _ = writer.send(serialized.freeze()).await; + } + }); } } } \ No newline at end of file -- cgit v1.2.1