Geyser gRPC - Code Examples
Practical Rust examples for common Geyser gRPC use cases. Fill in your Geyser gRPC endpoint and credentials. Solana Stream SDK supports Geyser gRPC.
Basic subscription: all non-vote, non-failed transactions
rustuse anyhow::Result; use futures::{sink::SinkExt, stream::StreamExt}; use solana_signature::Signature; use std::collections::HashMap; use solana_stream_sdk::{ GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSubscribeRequest as SubscribeRequest, GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions, GeyserUpdateOneof as UpdateOneof, }; #[tokio::main] async fn main() -> Result<()> { let endpoint = "https://your-erpc-grpc-endpoint"; // Set your ERPC gRPC URL let x_token = "YOUR_API_KEY"; // Auth header if required let mut client = GeyserGrpcClient::build_from_shared(endpoint)? .x_token(Some(x_token))? .connect() .await?; let (mut tx, mut stream) = client.subscribe().await?; tx.send(SubscribeRequest { transactions: HashMap::from([( "all_transactions".to_string(), SubscribeRequestFilterTransactions { vote: Some(false), // Drop vote txs (~70% of traffic) failed: Some(false), // Drop failed txs ..Default::default() }, )]), commitment: Some(CommitmentLevel::Confirmed as i32), ..Default::default() }) .await?; while let Some(Ok(msg)) = stream.next().await { if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof { let sig = tx .transaction .and_then(|t| Signature::try_from(t.signature.as_slice()).ok()) .unwrap_or_default(); println!("Slot: {} | Signature: {}", tx.slot, sig); } } Ok(()) }
Durable client with reconnection, gap recovery, ping/pong
rustuse anyhow::Result; use futures::stream::StreamExt; use std::collections::HashMap; use solana_stream_sdk::{ GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus as SlotStatus, GeyserSubscribeRequest as SubscribeRequest, GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots, GeyserUpdateOneof as UpdateOneof, }; #[tokio::main] async fn main() -> Result<()> { let endpoint = "https://your-erpc-grpc-endpoint"; let x_token = "YOUR_API_KEY"; let mut subscribe_request = SubscribeRequest { slots: HashMap::from([( "slots".to_string(), SubscribeRequestFilterSlots::default(), )]), commitment: Some(CommitmentLevel::Confirmed as i32), ..Default::default() }; let mut tracked_slot: u64 = 0; loop { let mut client = GeyserGrpcClient::build_from_shared(endpoint)? .x_token(Some(x_token))? .connect() .await?; subscribe_request.from_slot = if tracked_slot > 0 { Some(tracked_slot) } else { None }; let (mut _sink, mut stream) = client .subscribe_with_request(Some(subscribe_request.clone())) .await?; loop { match stream.next().await { Some(Ok(msg)) => { if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof { tracked_slot = slot.slot; println!( "Slot: {} | Status: {:?}", slot.slot, SlotStatus::try_from(slot.status).unwrap() ); } } Some(Err(e)) => { eprintln!("Error receiving updates: {}", e); break; } None => break, } } } }
Stream accounts for a mint (token accounts)
rustSubscribeRequest { accounts: HashMap::from([( "token_accounts_by_mint".to_string(), SubscribeRequestFilterAccounts { filters: vec![ SubscribeRequestFilterAccountsFilter { filter: Some(Filter::TokenAccountState(true)), }, SubscribeRequestFilterAccountsFilter { filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp { offset: 0, data: Some(Data::Base58( "pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // replace with mint )), })), }, ], ..Default::default() }, )]), commitment: Some(CommitmentLevel::Confirmed as i32), ..Default::default() }
Stream transactions (successful, non-vote)
rustSubscribeRequest { transactions: HashMap::from([( "all_transactions".to_string(), SubscribeRequestFilterTransactions { vote: Some(false), failed: Some(false), ..Default::default() }, )]), commitment: Some(CommitmentLevel::Confirmed as i32), ..Default::default() }
Production-grade client highlights
- Durable client with reconnection and gap recovery (
from_slot) - Ping/pong handling to keep the stream alive
- Bounded channels/backpressure between ingress and processing
- Dynamic subscription updates on the same bidi stream
- Error handling, metrics, and logging for slow consumers/dropped updates
Full production-grade client (durable, gap recovery, ping/pong, dynamic subscriptions)
rustuse futures::{SinkExt, stream::StreamExt}; use solana_signature::Signature; use std::{collections::HashMap, sync::Arc}; use tokio::sync::{RwLock, mpsc}; use solana_stream_sdk::{ GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus, GeyserSubscribeRequest as SubscribeRequest, GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots, GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions, GeyserSubscribeUpdate as SubscribeUpdate, GeyserUpdateOneof as UpdateOneof, }; use solana_stream_sdk::yellowstone_grpc_proto::geyser::SubscribeRequestPing; const UPDATE_CHANNEL_CAPACITY: usize = 10_000; #[tokio::main] async fn main() { let endpoint = "https://your-erpc-grpc-endpoint".to_string(); let x_token = "YOUR_API_KEY".to_string(); let subscribe_request = Arc::new(RwLock::new(SubscribeRequest { slots: HashMap::from([( "slots".to_string(), SubscribeRequestFilterSlots { ..Default::default() }, )]), transactions: HashMap::from([( "all_transactions".to_string(), SubscribeRequestFilterTransactions { vote: Some(false), failed: Some(false), ..Default::default() }, )]), commitment: Some(CommitmentLevel::Confirmed as i32), ..Default::default() })); let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY); let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>(); let dispatcher_handle = tokio::spawn(async move { dispatcher_loop(updates_rx).await; }); let ingress_handle = tokio::spawn(async move { ingress_loop( updates_tx, subscribe_rx, endpoint, x_token, subscribe_request, ) .await; }); if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 { println!("Error: {:?}", e); } } async fn ingress_loop( updates_tx: mpsc::Sender<SubscribeUpdate>, mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>, endpoint: String, x_token: String, subscribe_request: Arc<RwLock<SubscribeRequest>>, ) { let mut tracked_slot: u64 = 0; loop { let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()) .unwrap() .x_token(Some(x_token.clone())) .unwrap(); let mut client = builder.connect().await.unwrap(); subscribe_request.write().await.from_slot = if tracked_slot > 0 { Some(tracked_slot) } else { None }; match client .subscribe_with_request(Some(subscribe_request.read().await.clone())) .await { Ok((mut subscribe_tx, mut stream)) => { loop { tokio::select! { Some(subscribe_request) = subscribe_rx.recv() => { if let Err(e) = subscribe_tx.send(subscribe_request).await { println!("Error sending subscribe request: {:?}", e); break; } } Some(result) = stream.next() => { match result { Ok(update) => { if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) { if let Err(e) = subscribe_tx .send(SubscribeRequest { ping: Some(SubscribeRequestPing { id: 1 }), ..Default::default() }) .await { println!("Error sending ping: {:?}", e); } } if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) { continue; } if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof { tracked_slot = slot_update.slot; continue; } if let Err(e) = updates_tx.send(update.clone()).await { println!("Slow consumer, dropping update {:?} err: {:?}", update, e); } } Err(e) => { println!("Error receiving update: {:?}", e); break; } } } } } } Err(e) => { println!("Error subscribing: {:?}", e); } }; println!("Disconnected, reconnecting..."); } } async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) { loop { if let Some(update) = updates_rx.recv().await { match update.update_oneof { Some(UpdateOneof::Transaction(tx)) => { let sig = tx .transaction .and_then(|t| Signature::try_from(t.signature.as_slice()).ok()) .unwrap_or_default(); println!("Slot: {} | Signature: {}", tx.slot, sig); } _ => { println!("Received update: {:?}", update); } } } } }
Contributing examples
Found a useful pattern? Consider contributing it to these docs.
Solana Stream SDK repository: https://github.com/ValidatorsDAO/solana-stream
Solana Stream SDK repository: https://github.com/ValidatorsDAO/solana-stream