aboutsummaryrefslogtreecommitdiffstats
path: root/mumd/src/main.rs
diff options
context:
space:
mode:
authorEskil Queseth <eskilq@kth.se>2021-03-26 01:48:09 +0100
committerEskil Queseth <eskilq@kth.se>2021-03-26 01:48:09 +0100
commit8afd49fcb78bd725ef602e48b73d42a91673d0c5 (patch)
tree63efc10d7cc4f613d0ea18422ef3239915b8bb40 /mumd/src/main.rs
parentb0034ec3e344030274c98cf81f75789d80ce6211 (diff)
downloadmum-8afd49fcb78bd725ef602e48b73d42a91673d0c5.tar.gz
make command receiving scaleable
Diffstat (limited to 'mumd/src/main.rs')
-rw-r--r--mumd/src/main.rs41
1 files changed, 25 insertions, 16 deletions
diff --git a/mumd/src/main.rs b/mumd/src/main.rs
index 5b71ae0..6303978 100644
--- a/mumd/src/main.rs
+++ b/mumd/src/main.rs
@@ -69,25 +69,34 @@ async fn receive_commands(
loop {
if let Ok((incoming, _)) = socket.accept().await {
- let (reader, writer) = incoming.into_split();
- let reader = FramedRead::new(reader, LengthDelimitedCodec::new());
- let writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
+ let sender = command_sender.clone();
+ tokio::spawn(async move {
+ let (reader, writer) = incoming.into_split();
+ let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new());
+ let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
+
+ while let Some(next) = reader.next().await {
+ let buf = match next {
+ Ok(buf) => buf,
+ Err(_) => continue,
+ };
- reader.filter_map(|buf| async {
- buf.ok()
- })
- .map(|buf| bincode::deserialize::<Command>(&buf))
- .filter_map(|e| async { e.ok() })
- .filter_map(|command| async {
- let (tx, rx) = oneshot::channel();
+ let command = match bincode::deserialize::<Command>(&buf) {
+ Ok(e) => e,
+ Err(_) => continue,
+ };
- command_sender.send((command, tx)).unwrap();
+ let (tx, rx) = oneshot::channel();
- let response = rx.await.unwrap();
- let mut serialized = BytesMut::new();
- bincode::serialize_into((&mut serialized).writer(), &response).unwrap();
- Some(Ok(serialized.freeze()))
- }).forward(writer).await.unwrap();
+ sender.send((command, tx)).unwrap();
+
+ let response = rx.await.unwrap();
+ let mut serialized = BytesMut::new();
+ bincode::serialize_into((&mut serialized).writer(), &response).unwrap();
+
+ let _ = writer.send(serialized.freeze()).await;
+ }
+ });
}
}
} \ No newline at end of file