Geyser gRPC - Code Examples

Practical Rust examples for common Geyser gRPC use cases. Fill in your Geyser gRPC endpoint. Solana Stream SDK supports Geyser gRPC.

Basic subscription: all non-vote, non-failed transactions

rust
use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use solana_stream_sdk::{
    GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
    GeyserSubscribeRequest as SubscribeRequest,
    GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
    GeyserUpdateOneof as UpdateOneof,
};

#[tokio::main]
async fn main() -> Result<()> {
    let endpoint = "http://your-erpc-grpc-endpoint"; // Set your ERPC gRPC URL

    let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
        .connect()
        .await?;

    let (mut tx, mut stream) = client.subscribe().await?;

    tx.send(SubscribeRequest {
        transactions: HashMap::from([(
            "all_transactions".to_string(),
            SubscribeRequestFilterTransactions {
                vote: Some(false),   // Drop vote txs (~70% of traffic)
                failed: Some(false), // Drop failed txs
                ..Default::default()
            },
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    })
    .await?;

    while let Some(Ok(msg)) = stream.next().await {
        if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
            let sig = tx
                .transaction
                .and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
                .unwrap_or_default();
            println!("Slot: {} | Signature: {}", tx.slot, sig);
        }
    }
    Ok(())
}

Durable client with reconnection, gap recovery, ping/pong

rust
use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use solana_stream_sdk::{
    GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
    GeyserSlotStatus as SlotStatus, GeyserSubscribeRequest as SubscribeRequest,
    GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
    GeyserUpdateOneof as UpdateOneof,
};

#[tokio::main]
async fn main() -> Result<()> {
    let endpoint = "http://your-erpc-grpc-endpoint";

    let mut subscribe_request = SubscribeRequest {
        slots: HashMap::from([(
            "slots".to_string(),
            SubscribeRequestFilterSlots::default(),
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    };

    let mut tracked_slot: u64 = 0;

    loop {
    let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
        .connect()
        .await?;

        subscribe_request.from_slot = if tracked_slot > 0 {
            Some(tracked_slot)
        } else {
            None
        };

        let (mut _sink, mut stream) = client
            .subscribe_with_request(Some(subscribe_request.clone()))
            .await?;

        loop {
            match stream.next().await {
                Some(Ok(msg)) => {
                    if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
                        tracked_slot = slot.slot;
                        println!(
                            "Slot: {} | Status: {:?}",
                            slot.slot,
                            SlotStatus::try_from(slot.status).unwrap()
                        );
                    }
                }
                Some(Err(e)) => {
                    eprintln!("Error receiving updates: {}", e);
                    break;
                }
                None => break,
            }
        }
    }
}

Stream accounts for a mint (token accounts)

rust
SubscribeRequest {
    accounts: HashMap::from([(
        "token_accounts_by_mint".to_string(),
        SubscribeRequestFilterAccounts {
            filters: vec![
                SubscribeRequestFilterAccountsFilter {
                    filter: Some(Filter::TokenAccountState(true)),
                },
                SubscribeRequestFilterAccountsFilter {
                    filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
                        offset: 0,
                        data: Some(Data::Base58(
                            "pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // replace with mint
                        )),
                    })),
                },
            ],
            ..Default::default()
        },
    )]),
    commitment: Some(CommitmentLevel::Confirmed as i32),
    ..Default::default()
}

Stream transactions (successful, non-vote)

rust
SubscribeRequest {
    transactions: HashMap::from([(
        "all_transactions".to_string(),
        SubscribeRequestFilterTransactions {
            vote: Some(false),
            failed: Some(false),
            ..Default::default()
        },
    )]),
    commitment: Some(CommitmentLevel::Confirmed as i32),
    ..Default::default()
}

Production-grade client highlights

  • Durable client with reconnection and gap recovery (from_slot)
  • Ping/pong handling to keep the stream alive
  • Bounded channels/backpressure between ingress and processing
  • Dynamic subscription updates on the same bidi stream
  • Error handling, metrics, and logging for slow consumers/dropped updates

Full production-grade client (durable, gap recovery, ping/pong, dynamic subscriptions)

rust
use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use solana_stream_sdk::{
    GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus,
    GeyserSubscribeRequest as SubscribeRequest,
    GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
    GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
    GeyserSubscribeUpdate as SubscribeUpdate, GeyserUpdateOneof as UpdateOneof,
};
use solana_stream_sdk::yellowstone_grpc_proto::geyser::SubscribeRequestPing;

const UPDATE_CHANNEL_CAPACITY: usize = 10_000;

#[tokio::main]
async fn main() {
    let endpoint = "http://your-erpc-grpc-endpoint".to_string();

    let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
        slots: HashMap::from([(
            "slots".to_string(),
            SubscribeRequestFilterSlots {
                ..Default::default()
            },
        )]),
        transactions: HashMap::from([(
            "all_transactions".to_string(),
            SubscribeRequestFilterTransactions {
                vote: Some(false),
                failed: Some(false),
                ..Default::default()
            },
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    }));

    let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
    let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();

    let dispatcher_handle = tokio::spawn(async move {
        dispatcher_loop(updates_rx).await;
    });

    let ingress_handle = tokio::spawn(async move {
        ingress_loop(
            updates_tx,
            subscribe_rx,
            endpoint,
            subscribe_request,
        )
        .await;
    });

    if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
        println!("Error: {:?}", e);
    }
}

async fn ingress_loop(
    updates_tx: mpsc::Sender<SubscribeUpdate>,
    mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
    endpoint: String,
    subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
    let mut tracked_slot: u64 = 0;

    loop {
        let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()).unwrap();

        let mut client = builder.connect().await.unwrap();

        subscribe_request.write().await.from_slot = if tracked_slot > 0 {
            Some(tracked_slot)
        } else {
            None
        };

        match client
            .subscribe_with_request(Some(subscribe_request.read().await.clone()))
            .await
        {
            Ok((mut subscribe_tx, mut stream)) => {
                loop {
                    tokio::select! {
                        Some(subscribe_request) = subscribe_rx.recv() => {
                            if let Err(e) = subscribe_tx.send(subscribe_request).await {
                                println!("Error sending subscribe request: {:?}", e);
                                break;
                            }
                        }
                        Some(result) = stream.next() => {
                            match result {
                                Ok(update) => {
                                    if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
                                        if let Err(e) = subscribe_tx
                                            .send(SubscribeRequest {
                                                ping: Some(SubscribeRequestPing { id: 1 }),
                                                ..Default::default()
                                            })
                                            .await
                                        {
                                            println!("Error sending ping: {:?}", e);
                                        }
                                    }
                                    if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
                                        continue;
                                    }
                    if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
                        tracked_slot = slot_update.slot;
                        continue;
                    }
                                    if let Err(e) = updates_tx.send(update.clone()).await {
                                        println!("Slow consumer, dropping update {:?} err: {:?}", update, e);
                                    }
                                }
                                Err(e) => {
                                    println!("Error receiving update: {:?}", e);
                                    break;
                                }
                            }
                        }
                    }
                }
            }
            Err(e) => {
                println!("Error subscribing: {:?}", e);
            }
        };

        println!("Disconnected, reconnecting...");
    }
}

async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
    loop {
        if let Some(update) = updates_rx.recv().await {
            match update.update_oneof {
                Some(UpdateOneof::Transaction(tx)) => {
                    let sig = tx
                        .transaction
                        .and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
                        .unwrap_or_default();
                    println!("Slot: {} | Signature: {}", tx.slot, sig);
                }
                _ => {
                    println!("Received update: {:?}", update);
                }
            }
        }
    }
}

Contributing examples

Found a useful pattern? Consider contributing it to these docs.
Solana Stream SDK repository: https://github.com/ValidatorsDAO/solana-stream