Geyser gRPC - 代码示例
常见 Geyser gRPC 使用场景的实用 Rust 示例。请填入您的 Geyser gRPC 端点。Solana Stream SDK 支持 Geyser gRPC。
基本订阅:所有非投票、未失败的交易
rust
use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint"; // Set your ERPC gRPC URL
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
let (mut tx, mut stream) = client.subscribe().await?;
tx.send(SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false), // Drop vote txs (~70% of traffic)
failed: Some(false), // Drop failed txs
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
})
.await?;
while let Some(Ok(msg)) = stream.next().await {
if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
}
Ok(())
}use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint"; // Set your ERPC gRPC URL
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
let (mut tx, mut stream) = client.subscribe().await?;
tx.send(SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false), // Drop vote txs (~70% of traffic)
failed: Some(false), // Drop failed txs
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
})
.await?;
while let Some(Ok(msg)) = stream.next().await {
if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
}
Ok(())
}带重连、间隙恢复、ping/pong 的持久客户端
rust
use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSlotStatus as SlotStatus, GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint";
let mut subscribe_request = SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots::default(),
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
};
let mut tracked_slot: u64 = 0;
loop {
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
subscribe_request.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
let (mut _sink, mut stream) = client
.subscribe_with_request(Some(subscribe_request.clone()))
.await?;
loop {
match stream.next().await {
Some(Ok(msg)) => {
if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
tracked_slot = slot.slot;
println!(
"Slot: {} | Status: {:?}",
slot.slot,
SlotStatus::try_from(slot.status).unwrap()
);
}
}
Some(Err(e)) => {
eprintln!("Error receiving updates: {}", e);
break;
}
None => break,
}
}
}
}use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSlotStatus as SlotStatus, GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint";
let mut subscribe_request = SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots::default(),
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
};
let mut tracked_slot: u64 = 0;
loop {
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
subscribe_request.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
let (mut _sink, mut stream) = client
.subscribe_with_request(Some(subscribe_request.clone()))
.await?;
loop {
match stream.next().await {
Some(Ok(msg)) => {
if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
tracked_slot = slot.slot;
println!(
"Slot: {} | Status: {:?}",
slot.slot,
SlotStatus::try_from(slot.status).unwrap()
);
}
}
Some(Err(e)) => {
eprintln!("Error receiving updates: {}", e);
break;
}
None => break,
}
}
}
}流式获取某个 mint 的账户(代币账户)
rust
SubscribeRequest {
accounts: HashMap::from([(
"token_accounts_by_mint".to_string(),
SubscribeRequestFilterAccounts {
filters: vec![
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::TokenAccountState(true)),
},
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
offset: 0,
data: Some(Data::Base58(
"pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // replace with mint
)),
})),
},
],
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}SubscribeRequest {
accounts: HashMap::from([(
"token_accounts_by_mint".to_string(),
SubscribeRequestFilterAccounts {
filters: vec![
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::TokenAccountState(true)),
},
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
offset: 0,
data: Some(Data::Base58(
"pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // replace with mint
)),
})),
},
],
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}流式获取交易(成功的、非投票的)
rust
SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}生产级客户端要点
- 带重连和间隙恢复(
from_slot)的持久客户端 - Ping/pong 处理以保持流活跃
- 接收和处理之间使用有界通道/背压
- 在同一双向流上动态更新订阅
- 错误处理、指标和慢消费者/丢弃更新的日志记录
完整生产级客户端(持久、间隙恢复、ping/pong、动态订阅)
rust
use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserSubscribeUpdate as SubscribeUpdate, GeyserUpdateOneof as UpdateOneof,
};
use solana_stream_sdk::yellowstone_grpc_proto::geyser::SubscribeRequestPing;
const UPDATE_CHANNEL_CAPACITY: usize = 10_000;
#[tokio::main]
async fn main() {
let endpoint = "http://your-erpc-grpc-endpoint".to_string();
let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots {
..Default::default()
},
)]),
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}));
let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
let dispatcher_handle = tokio::spawn(async move {
dispatcher_loop(updates_rx).await;
});
let ingress_handle = tokio::spawn(async move {
ingress_loop(
updates_tx,
subscribe_rx,
endpoint,
subscribe_request,
)
.await;
});
if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
println!("Error: {:?}", e);
}
}
async fn ingress_loop(
updates_tx: mpsc::Sender<SubscribeUpdate>,
mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
endpoint: String,
subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
let mut tracked_slot: u64 = 0;
loop {
let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()).unwrap();
let mut client = builder.connect().await.unwrap();
subscribe_request.write().await.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
match client
.subscribe_with_request(Some(subscribe_request.read().await.clone()))
.await
{
Ok((mut subscribe_tx, mut stream)) => {
loop {
tokio::select! {
Some(subscribe_request) = subscribe_rx.recv() => {
if let Err(e) = subscribe_tx.send(subscribe_request).await {
println!("Error sending subscribe request: {:?}", e);
break;
}
}
Some(result) = stream.next() => {
match result {
Ok(update) => {
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
if let Err(e) = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await
{
println!("Error sending ping: {:?}", e);
}
}
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
continue;
}
if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
tracked_slot = slot_update.slot;
continue;
}
if let Err(e) = updates_tx.send(update.clone()).await {
println!("Slow consumer, dropping update {:?} err: {:?}", update, e);
}
}
Err(e) => {
println!("Error receiving update: {:?}", e);
break;
}
}
}
}
}
}
Err(e) => {
println!("Error subscribing: {:?}", e);
}
};
println!("Disconnected, reconnecting...");
}
}
async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
loop {
if let Some(update) = updates_rx.recv().await {
match update.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
_ => {
println!("Received update: {:?}", update);
}
}
}
}
}use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserSubscribeUpdate as SubscribeUpdate, GeyserUpdateOneof as UpdateOneof,
};
use solana_stream_sdk::yellowstone_grpc_proto::geyser::SubscribeRequestPing;
const UPDATE_CHANNEL_CAPACITY: usize = 10_000;
#[tokio::main]
async fn main() {
let endpoint = "http://your-erpc-grpc-endpoint".to_string();
let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots {
..Default::default()
},
)]),
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}));
let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
let dispatcher_handle = tokio::spawn(async move {
dispatcher_loop(updates_rx).await;
});
let ingress_handle = tokio::spawn(async move {
ingress_loop(
updates_tx,
subscribe_rx,
endpoint,
subscribe_request,
)
.await;
});
if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
println!("Error: {:?}", e);
}
}
async fn ingress_loop(
updates_tx: mpsc::Sender<SubscribeUpdate>,
mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
endpoint: String,
subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
let mut tracked_slot: u64 = 0;
loop {
let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()).unwrap();
let mut client = builder.connect().await.unwrap();
subscribe_request.write().await.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
match client
.subscribe_with_request(Some(subscribe_request.read().await.clone()))
.await
{
Ok((mut subscribe_tx, mut stream)) => {
loop {
tokio::select! {
Some(subscribe_request) = subscribe_rx.recv() => {
if let Err(e) = subscribe_tx.send(subscribe_request).await {
println!("Error sending subscribe request: {:?}", e);
break;
}
}
Some(result) = stream.next() => {
match result {
Ok(update) => {
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
if let Err(e) = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await
{
println!("Error sending ping: {:?}", e);
}
}
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
continue;
}
if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
tracked_slot = slot_update.slot;
continue;
}
if let Err(e) = updates_tx.send(update.clone()).await {
println!("Slow consumer, dropping update {:?} err: {:?}", update, e);
}
}
Err(e) => {
println!("Error receiving update: {:?}", e);
break;
}
}
}
}
}
}
Err(e) => {
println!("Error subscribing: {:?}", e);
}
};
println!("Disconnected, reconnecting...");
}
}
async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
loop {
if let Some(update) = updates_rx.recv().await {
match update.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
_ => {
println!("Received update: {:?}", update);
}
}
}
}
}贡献示例
发现了有用的模式?欢迎贡献到这些文档中。
Solana Stream SDK 仓库:https://github.com/ValidatorsDAO/solana-stream