Geyser gRPC - Ví dụ mã nguồn

Ví dụ Rust thực tế cho các trường hợp sử dụng phổ biến của Geyser gRPC. Hãy điền endpoint Geyser gRPC của bạn. Solana Stream SDK hỗ trợ Geyser gRPC.

Subscription cơ bản: tất cả transaction không phải vote, không thất bại

rust
use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use solana_stream_sdk::{
    GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
    GeyserSubscribeRequest as SubscribeRequest,
    GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
    GeyserUpdateOneof as UpdateOneof,
};

#[tokio::main]
async fn main() -> Result<()> {
    let endpoint = "http://your-erpc-grpc-endpoint"; // Đặt URL gRPC ERPC của bạn

    let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
        .connect()
        .await?;

    let (mut tx, mut stream) = client.subscribe().await?;

    tx.send(SubscribeRequest {
        transactions: HashMap::from([(
            "all_transactions".to_string(),
            SubscribeRequestFilterTransactions {
                vote: Some(false),   // Loại bỏ vote txs (~70% lưu lượng)
                failed: Some(false), // Loại bỏ failed txs
                ..Default::default()
            },
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    })
    .await?;

    while let Some(Ok(msg)) = stream.next().await {
        if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
            let sig = tx
                .transaction
                .and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
                .unwrap_or_default();
            println!("Slot: {} | Signature: {}", tx.slot, sig);
        }
    }
    Ok(())
}

Durable client với kết nối lại, khôi phục khoảng trống, ping/pong

rust
use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use solana_stream_sdk::{
    GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
    GeyserSlotStatus as SlotStatus, GeyserSubscribeRequest as SubscribeRequest,
    GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
    GeyserUpdateOneof as UpdateOneof,
};

#[tokio::main]
async fn main() -> Result<()> {
    let endpoint = "http://your-erpc-grpc-endpoint";

    let mut subscribe_request = SubscribeRequest {
        slots: HashMap::from([(
            "slots".to_string(),
            SubscribeRequestFilterSlots::default(),
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    };

    let mut tracked_slot: u64 = 0;

    loop {
    let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
        .connect()
        .await?;

        subscribe_request.from_slot = if tracked_slot > 0 {
            Some(tracked_slot)
        } else {
            None
        };

        let (mut _sink, mut stream) = client
            .subscribe_with_request(Some(subscribe_request.clone()))
            .await?;

        loop {
            match stream.next().await {
                Some(Ok(msg)) => {
                    if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
                        tracked_slot = slot.slot;
                        println!(
                            "Slot: {} | Status: {:?}",
                            slot.slot,
                            SlotStatus::try_from(slot.status).unwrap()
                        );
                    }
                }
                Some(Err(e)) => {
                    eprintln!("Error receiving updates: {}", e);
                    break;
                }
                None => break,
            }
        }
    }
}

Stream account cho một mint (token account)

rust
SubscribeRequest {
    accounts: HashMap::from([(
        "token_accounts_by_mint".to_string(),
        SubscribeRequestFilterAccounts {
            filters: vec![
                SubscribeRequestFilterAccountsFilter {
                    filter: Some(Filter::TokenAccountState(true)),
                },
                SubscribeRequestFilterAccountsFilter {
                    filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
                        offset: 0,
                        data: Some(Data::Base58(
                            "pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // thay bằng mint
                        )),
                    })),
                },
            ],
            ..Default::default()
        },
    )]),
    commitment: Some(CommitmentLevel::Confirmed as i32),
    ..Default::default()
}

Stream transaction (thành công, không phải vote)

rust
SubscribeRequest {
    transactions: HashMap::from([(
        "all_transactions".to_string(),
        SubscribeRequestFilterTransactions {
            vote: Some(false),
            failed: Some(false),
            ..Default::default()
        },
    )]),
    commitment: Some(CommitmentLevel::Confirmed as i32),
    ..Default::default()
}

Điểm nổi bật của production-grade client

  • Durable client với kết nối lại và khôi phục khoảng trống (from_slot)
  • Xử lý ping/pong để duy trì stream hoạt động
  • Bounded channel/backpressure giữa ingress và processing
  • Cập nhật subscription động trên cùng một bidi stream
  • Xử lý lỗi, metric, và logging cho slow consumer/update bị drop

Full production-grade client (durable, khôi phục khoảng trống, ping/pong, subscription động)

rust
use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use solana_stream_sdk::{
    GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus,
    GeyserSubscribeRequest as SubscribeRequest,
    GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
    GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
    GeyserSubscribeUpdate as SubscribeUpdate, GeyserUpdateOneof as UpdateOneof,
};
use solana_stream_sdk::yellowstone_grpc_proto::geyser::SubscribeRequestPing;

const UPDATE_CHANNEL_CAPACITY: usize = 10_000;

#[tokio::main]
async fn main() {
    let endpoint = "http://your-erpc-grpc-endpoint".to_string();

    let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
        slots: HashMap::from([(
            "slots".to_string(),
            SubscribeRequestFilterSlots {
                ..Default::default()
            },
        )]),
        transactions: HashMap::from([(
            "all_transactions".to_string(),
            SubscribeRequestFilterTransactions {
                vote: Some(false),
                failed: Some(false),
                ..Default::default()
            },
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    }));

    let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
    let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();

    let dispatcher_handle = tokio::spawn(async move {
        dispatcher_loop(updates_rx).await;
    });

    let ingress_handle = tokio::spawn(async move {
        ingress_loop(
            updates_tx,
            subscribe_rx,
            endpoint,
            subscribe_request,
        )
        .await;
    });

    if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
        println!("Error: {:?}", e);
    }
}

async fn ingress_loop(
    updates_tx: mpsc::Sender<SubscribeUpdate>,
    mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
    endpoint: String,
    subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
    let mut tracked_slot: u64 = 0;

    loop {
        let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()).unwrap();

        let mut client = builder.connect().await.unwrap();

        subscribe_request.write().await.from_slot = if tracked_slot > 0 {
            Some(tracked_slot)
        } else {
            None
        };

        match client
            .subscribe_with_request(Some(subscribe_request.read().await.clone()))
            .await
        {
            Ok((mut subscribe_tx, mut stream)) => {
                loop {
                    tokio::select! {
                        Some(subscribe_request) = subscribe_rx.recv() => {
                            if let Err(e) = subscribe_tx.send(subscribe_request).await {
                                println!("Error sending subscribe request: {:?}", e);
                                break;
                            }
                        }
                        Some(result) = stream.next() => {
                            match result {
                                Ok(update) => {
                                    if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
                                        if let Err(e) = subscribe_tx
                                            .send(SubscribeRequest {
                                                ping: Some(SubscribeRequestPing { id: 1 }),
                                                ..Default::default()
                                            })
                                            .await
                                        {
                                            println!("Error sending ping: {:?}", e);
                                        }
                                    }
                                    if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
                                        continue;
                                    }
                    if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
                        tracked_slot = slot_update.slot;
                        continue;
                    }
                                    if let Err(e) = updates_tx.send(update.clone()).await {
                                        println!("Slow consumer, dropping update {:?} err: {:?}", update, e);
                                    }
                                }
                                Err(e) => {
                                    println!("Error receiving update: {:?}", e);
                                    break;
                                }
                            }
                        }
                    }
                }
            }
            Err(e) => {
                println!("Error subscribing: {:?}", e);
            }
        };

        println!("Disconnected, reconnecting...");
    }
}

async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
    loop {
        if let Some(update) = updates_rx.recv().await {
            match update.update_oneof {
                Some(UpdateOneof::Transaction(tx)) => {
                    let sig = tx
                        .transaction
                        .and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
                        .unwrap_or_default();
                    println!("Slot: {} | Signature: {}", tx.slot, sig);
                }
                _ => {
                    println!("Received update: {:?}", update);
                }
            }
        }
    }
}

Đóng góp ví dụ

Tìm thấy một pattern hữu ích? Hãy cân nhắc đóng góp vào tài liệu này. Solana Stream SDK repository: https://github.com/ValidatorsDAO/solana-stream