Geyser gRPC - Codevoorbeelden
Praktische Rust-voorbeelden voor veelvoorkomende Geyser gRPC use cases. Vul uw Geyser gRPC-endpoint in. Solana Stream SDK ondersteunt Geyser gRPC.
Basisabonnement: alle niet-vote, niet-gefaalde transacties
rust
use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint"; // Set your ERPC gRPC URL
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
let (mut tx, mut stream) = client.subscribe().await?;
tx.send(SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false), // Drop vote txs (~70% of traffic)
failed: Some(false), // Drop failed txs
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
})
.await?;
while let Some(Ok(msg)) = stream.next().await {
if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
}
Ok(())
}use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint"; // Set your ERPC gRPC URL
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
let (mut tx, mut stream) = client.subscribe().await?;
tx.send(SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false), // Drop vote txs (~70% of traffic)
failed: Some(false), // Drop failed txs
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
})
.await?;
while let Some(Ok(msg)) = stream.next().await {
if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
}
Ok(())
}Duurzame client met herverbinding, gap recovery, ping/pong
rust
use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSlotStatus as SlotStatus, GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint";
let mut subscribe_request = SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots::default(),
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
};
let mut tracked_slot: u64 = 0;
loop {
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
subscribe_request.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
let (mut _sink, mut stream) = client
.subscribe_with_request(Some(subscribe_request.clone()))
.await?;
loop {
match stream.next().await {
Some(Ok(msg)) => {
if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
tracked_slot = slot.slot;
println!(
"Slot: {} | Status: {:?}",
slot.slot,
SlotStatus::try_from(slot.status).unwrap()
);
}
}
Some(Err(e)) => {
eprintln!("Error receiving updates: {}", e);
break;
}
None => break,
}
}
}
}use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSlotStatus as SlotStatus, GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint";
let mut subscribe_request = SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots::default(),
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
};
let mut tracked_slot: u64 = 0;
loop {
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
subscribe_request.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
let (mut _sink, mut stream) = client
.subscribe_with_request(Some(subscribe_request.clone()))
.await?;
loop {
match stream.next().await {
Some(Ok(msg)) => {
if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
tracked_slot = slot.slot;
println!(
"Slot: {} | Status: {:?}",
slot.slot,
SlotStatus::try_from(slot.status).unwrap()
);
}
}
Some(Err(e)) => {
eprintln!("Error receiving updates: {}", e);
break;
}
None => break,
}
}
}
}Stream accounts voor een mint (tokenaccounts)
rust
SubscribeRequest {
accounts: HashMap::from([(
"token_accounts_by_mint".to_string(),
SubscribeRequestFilterAccounts {
filters: vec![
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::TokenAccountState(true)),
},
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
offset: 0,
data: Some(Data::Base58(
"pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // replace with mint
)),
})),
},
],
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}SubscribeRequest {
accounts: HashMap::from([(
"token_accounts_by_mint".to_string(),
SubscribeRequestFilterAccounts {
filters: vec![
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::TokenAccountState(true)),
},
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
offset: 0,
data: Some(Data::Base58(
"pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // replace with mint
)),
})),
},
],
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}Stream transacties (succesvol, niet-vote)
rust
SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}Hoogtepunten productieklare client
- Duurzame client met herverbinding en gap recovery (
from_slot) - Ping/pong-afhandeling om de stream actief te houden
- Bounded channels/backpressure tussen inkomend verkeer en verwerking
- Dynamische abonnementsupdates op dezelfde bidi-stream
- Foutafhandeling, metrics en logging voor trage consumers/gedropte updates
Volledige productieklare client (duurzaam, gap recovery, ping/pong, dynamische abonnementen)
rust
use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserSubscribeUpdate as SubscribeUpdate, GeyserUpdateOneof as UpdateOneof,
};
use solana_stream_sdk::yellowstone_grpc_proto::geyser::SubscribeRequestPing;
const UPDATE_CHANNEL_CAPACITY: usize = 10_000;
#[tokio::main]
async fn main() {
let endpoint = "http://your-erpc-grpc-endpoint".to_string();
let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots {
..Default::default()
},
)]),
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}));
let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
let dispatcher_handle = tokio::spawn(async move {
dispatcher_loop(updates_rx).await;
});
let ingress_handle = tokio::spawn(async move {
ingress_loop(
updates_tx,
subscribe_rx,
endpoint,
subscribe_request,
)
.await;
});
if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
println!("Error: {:?}", e);
}
}
async fn ingress_loop(
updates_tx: mpsc::Sender<SubscribeUpdate>,
mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
endpoint: String,
subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
let mut tracked_slot: u64 = 0;
loop {
let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()).unwrap();
let mut client = builder.connect().await.unwrap();
subscribe_request.write().await.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
match client
.subscribe_with_request(Some(subscribe_request.read().await.clone()))
.await
{
Ok((mut subscribe_tx, mut stream)) => {
loop {
tokio::select! {
Some(subscribe_request) = subscribe_rx.recv() => {
if let Err(e) = subscribe_tx.send(subscribe_request).await {
println!("Error sending subscribe request: {:?}", e);
break;
}
}
Some(result) = stream.next() => {
match result {
Ok(update) => {
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
if let Err(e) = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await
{
println!("Error sending ping: {:?}", e);
}
}
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
continue;
}
if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
tracked_slot = slot_update.slot;
continue;
}
if let Err(e) = updates_tx.send(update.clone()).await {
println!("Slow consumer, dropping update {:?} err: {:?}", update, e);
}
}
Err(e) => {
println!("Error receiving update: {:?}", e);
break;
}
}
}
}
}
}
Err(e) => {
println!("Error subscribing: {:?}", e);
}
};
println!("Disconnected, reconnecting...");
}
}
async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
loop {
if let Some(update) = updates_rx.recv().await {
match update.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
_ => {
println!("Received update: {:?}", update);
}
}
}
}
}use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserSubscribeUpdate as SubscribeUpdate, GeyserUpdateOneof as UpdateOneof,
};
use solana_stream_sdk::yellowstone_grpc_proto::geyser::SubscribeRequestPing;
const UPDATE_CHANNEL_CAPACITY: usize = 10_000;
#[tokio::main]
async fn main() {
let endpoint = "http://your-erpc-grpc-endpoint".to_string();
let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots {
..Default::default()
},
)]),
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}));
let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
let dispatcher_handle = tokio::spawn(async move {
dispatcher_loop(updates_rx).await;
});
let ingress_handle = tokio::spawn(async move {
ingress_loop(
updates_tx,
subscribe_rx,
endpoint,
subscribe_request,
)
.await;
});
if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
println!("Error: {:?}", e);
}
}
async fn ingress_loop(
updates_tx: mpsc::Sender<SubscribeUpdate>,
mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
endpoint: String,
subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
let mut tracked_slot: u64 = 0;
loop {
let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()).unwrap();
let mut client = builder.connect().await.unwrap();
subscribe_request.write().await.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
match client
.subscribe_with_request(Some(subscribe_request.read().await.clone()))
.await
{
Ok((mut subscribe_tx, mut stream)) => {
loop {
tokio::select! {
Some(subscribe_request) = subscribe_rx.recv() => {
if let Err(e) = subscribe_tx.send(subscribe_request).await {
println!("Error sending subscribe request: {:?}", e);
break;
}
}
Some(result) = stream.next() => {
match result {
Ok(update) => {
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
if let Err(e) = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await
{
println!("Error sending ping: {:?}", e);
}
}
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
continue;
}
if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
tracked_slot = slot_update.slot;
continue;
}
if let Err(e) = updates_tx.send(update.clone()).await {
println!("Slow consumer, dropping update {:?} err: {:?}", update, e);
}
}
Err(e) => {
println!("Error receiving update: {:?}", e);
break;
}
}
}
}
}
}
Err(e) => {
println!("Error subscribing: {:?}", e);
}
};
println!("Disconnected, reconnecting...");
}
}
async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
loop {
if let Some(update) = updates_rx.recv().await {
match update.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
_ => {
println!("Received update: {:?}", update);
}
}
}
}
}Voorbeelden bijdragen
Een nuttig patroon gevonden? Overweeg het bij te dragen aan deze documentatie.
Solana Stream SDK-repository: https://github.com/ValidatorsDAO/solana-stream