-
|
This serializes all client writes. I hope to know why it's there and if it will be fixed in the future. Thanks |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
|
can following code fix this issue? Thanks diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs
index 77870bfc..de034882 100644
--- a/openraft/src/core/raft_core.rs
+++ b/openraft/src/core/raft_core.rs
@@ -9,9 +9,10 @@ use std::sync::Arc;
use std::time::Duration;
use anyerror::AnyError;
-use futures::stream::FuturesUnordered;
-use futures::StreamExt;
-use futures::TryFutureExt;
+use futures::{
+ stream::{FuturesOrdered, FuturesUnordered},
+ StreamExt, TryFutureExt
+};
use maplit::btreeset;
use tokio::select;
use tokio::sync::mpsc;
@@ -158,6 +159,10 @@ impl LeaderData {
// TODO: remove SM
/// The core type implementing the Raft protocol.
+
+type LogFLushResult = Result::NodeId>, std::io::Error>;
+type OneshotReceiver = <::AsyncRuntime as AsyncRuntime>::OneshotReceiver;
+
pub struct RaftCore
where
C: RaftTypeConfig,
@@ -208,6 +213,8 @@ where
pub(crate) span: Span,
+ pub(crate) stream_log_flushed: FuturesOrdered>>,
+
pub(crate) _p: PhantomData,
}
@@ -735,9 +742,8 @@ where
let callback = LogFlushed::new(log_io_id, tx);
self.log_store.append(entries, callback).await?;
- rx.await
- .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?
- .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?;
+
+ self.stream_log_flushed.push_back(rx);
Ok(())
}
@@ -931,6 +937,28 @@ where
// In each loop, the first step is blocking waiting for any message from any channel.
// Then if there is any message, process as many as possible to maximize throughput.
+ let poll_log_flush = async {
+ if self.stream_log_flushed.is_empty() {
+ return None;
+ }
+ while let Some(flush_res) = self.stream_log_flushed.next().await {
+ let log_io_id = flush_res
+ .map_err(|e| AnyError::error(e))
+ .and_then(|r| r.map_err(|e| AnyError::error(e)));
+ match log_io_id {
+ Ok(log_io_id) => {
+ // The leader may have changed.
+ // But reporting to a different leader is not a problem.
+ if let Ok(mut lh) = self.engine.leader_handler() {
+ lh.replication_handler().update_local_progress(log_io_id.log_id);
+ }
+ }
+ Err(err) => { return Some(Err(err)); }
+ }
+ }
+ Some(Ok(()))
+ };
+
select! {
// Check shutdown in each loop first so that a message flood in `tx_api` won't block shutting down.
// `select!` without `biased` provides a random fairness.
@@ -943,6 +971,10 @@ where
return Err(Fatal::Stopped);
}
+ Some(flush_res) = poll_log_flush => {
+ flush_res.map_err(|e| Into::>::into(StorageIOError::write_logs(e)))?;
+ }
+
notify_res = self.rx_notify.recv() => {
match notify_res {
Some(notify) => self.handle_notify(notify)?,
@@ -1611,15 +1643,9 @@ where
}
Command::AppendInputEntries { vote, entries } => {
let last_log_id = *entries.last().unwrap().get_log_id();
- tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);
-
+ tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries));
self.append_to_log(entries, vote, last_log_id).await?;
- // The leader may have changed.
- // But reporting to a different leader is not a problem.
- if let Ok(mut lh) = self.engine.leader_handler() {
- lh.replication_handler().update_local_progress(Some(last_log_id));
- }
}
Command::SaveVote { vote } => {
self.log_store.save_vote(&vote).await?;
diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs
index a540ef37..3aba1f22 100644
--- a/openraft/src/raft/mod.rs
+++ b/openraft/src/raft/mod.rs
@@ -300,6 +300,8 @@ where C: RaftTypeConfig
command_state: CommandState::default(),
span: core_span,
+ stream_log_flushed: futures::stream::FuturesOrdered::new(),
+
_p: Default::default(),
};
|
Beta Was this translation helpful? Give feedback.
-
|
Thank you for bringing attention to this issue and suggesting a solution. To make To ensure that log I/O is asynchronous, all I/O operations in A more effective storage API would handle all write I/O in a callback manner, allowing the application to serialize all I/O operations in a single internal queue or similar structure. Currently, log I/O operations are blocking, which naturally serializes them based on the execution order. Therefore, to transition to non-blocking and asynchronous operations, the |
Beta Was this translation helpful? Give feedback.
Hi @ukernel ,
thanks for looking into it.
Yes, the issue should be fixed ultimately to allow more dataflow-like processing. Your proposal seems on the first look like a workable solution (after some clean-ups), but I don't know all the dependencies inside of
RaftCore, i.e., whether this might cause other problems.In any case, feel free to post it as a PR, it's easier to review and comment than a diff in an issue. Invite @drmingdrmer for the review, he's the main maintainer of the crate and he definitely knows the dependencies the best.
The issue I see is that
FuturesOrderedseems to be optimized for a use case with large number of heavyweight futures with a large number of memory allocat…