Skip to content

Commit d99f37b

Browse files
authored
perf: optimize send raw batching (paradigmxyz#18280)
1 parent 3029709 commit d99f37b

File tree

2 files changed

+14
-16
lines changed

2 files changed

+14
-16
lines changed

‎crates/rpc/rpc/src/eth/helpers/transaction.rs‎

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ where
2727
async fn send_raw_transaction(&self, tx: Bytes) -> Result<B256, Self::Error> {
2828
let recovered = recover_raw_transaction(&tx)?;
2929

30-
// broadcast raw transaction to subscribers if there is any.
31-
self.broadcast_raw_transaction(tx.clone());
32-
3330
let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
3431

3532
// forward the transaction to the specific endpoint if configured.
3633
if let Some(client) = self.raw_tx_forwarder() {
3734
tracing::debug!(target: "rpc::eth", hash = %pool_transaction.hash(), "forwarding raw transaction to forwarder");
38-
let rlp_hex = hex::encode_prefixed(tx);
35+
let rlp_hex = hex::encode_prefixed(&tx);
36+
37+
// broadcast raw transaction to subscribers if there is any.
38+
self.broadcast_raw_transaction(tx);
3939

4040
let hash =
4141
client.request("eth_sendRawTransaction", (rlp_hex,)).await.inspect_err(|err| {
@@ -48,6 +48,9 @@ where
4848
return Ok(hash);
4949
}
5050

51+
// broadcast raw transaction to subscribers if there is any.
52+
self.broadcast_raw_transaction(tx);
53+
5154
// submit the transaction to the pool with a `Local` origin
5255
let AddedTransactionOutcome { hash, .. } =
5356
self.inner.add_pool_transaction(pool_transaction).await?;

‎crates/transaction-pool/src/batcher.rs‎

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use pin_project::pin_project;
1010
use std::{
1111
future::Future,
1212
pin::Pin,
13-
task::{Context, Poll},
13+
task::{ready, Context, Poll},
1414
};
1515
use tokio::sync::{mpsc, oneshot};
1616

@@ -44,6 +44,7 @@ where
4444
pub struct BatchTxProcessor<Pool: TransactionPool> {
4545
pool: Pool,
4646
max_batch_size: usize,
47+
buf: Vec<BatchTxRequest<Pool::Transaction>>,
4748
#[pin]
4849
request_rx: mpsc::UnboundedReceiver<BatchTxRequest<Pool::Transaction>>,
4950
}
@@ -59,7 +60,7 @@ where
5960
) -> (Self, mpsc::UnboundedSender<BatchTxRequest<Pool::Transaction>>) {
6061
let (request_tx, request_rx) = mpsc::unbounded_channel();
6162

62-
let processor = Self { pool, max_batch_size, request_rx };
63+
let processor = Self { pool, max_batch_size, buf: Vec::with_capacity(1), request_rx };
6364

6465
(processor, request_tx)
6566
}
@@ -88,21 +89,15 @@ where
8889

8990
loop {
9091
// Drain all available requests from the receiver
91-
let mut batch = Vec::with_capacity(1);
92-
while let Poll::Ready(Some(request)) = this.request_rx.poll_recv(cx) {
93-
batch.push(request);
94-
95-
// Check if the max batch size threshold has been reached
96-
if batch.len() >= *this.max_batch_size {
97-
break;
98-
}
99-
}
92+
ready!(this.request_rx.poll_recv_many(cx, this.buf, *this.max_batch_size));
10093

101-
if !batch.is_empty() {
94+
if !this.buf.is_empty() {
95+
let batch = std::mem::take(this.buf);
10296
let pool = this.pool.clone();
10397
tokio::spawn(async move {
10498
Self::process_batch(&pool, batch).await;
10599
});
100+
this.buf.reserve(1);
106101

107102
continue;
108103
}

0 commit comments

Comments
 (0)