Geyser gRPC - Exemples de codes
Exemples pratiques de Rust pour les cas d'utilisation communs de Geyser gRPC. Remplissez votre endpoint Geyser gRPC. Solana Stream SDK prend en charge Geyser gRPC.
Abonnement de base: toutes les transactions non-votes, non-échecs
rust
use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint"; // Set your ERPC gRPC URL
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
let (mut tx, mut stream) = client.subscribe().await?;
tx.send(SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false), // Drop vote txs (~70% of traffic)
failed: Some(false), // Drop failed txs
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
})
.await?;
while let Some(Ok(msg)) = stream.next().await {
if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
}
Ok(())
}use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint"; // Set your ERPC gRPC URL
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
let (mut tx, mut stream) = client.subscribe().await?;
tx.send(SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false), // Drop vote txs (~70% of traffic)
failed: Some(false), // Drop failed txs
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
})
.await?;
while let Some(Ok(msg)) = stream.next().await {
if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
}
Ok(())
}client durable avec reconnexion, récupération des écarts, ping/pong
rust
use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSlotStatus as SlotStatus, GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint";
let mut subscribe_request = SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots::default(),
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
};
let mut tracked_slot: u64 = 0;
loop {
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
subscribe_request.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
let (mut _sink, mut stream) = client
.subscribe_with_request(Some(subscribe_request.clone()))
.await?;
loop {
match stream.next().await {
Some(Ok(msg)) => {
if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
tracked_slot = slot.slot;
println!(
"Slot: {} | Status: {:?}",
slot.slot,
SlotStatus::try_from(slot.status).unwrap()
);
}
}
Some(Err(e)) => {
eprintln!("Error receiving updates: {}", e);
break;
}
None => break,
}
}
}
}use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSlotStatus as SlotStatus, GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint";
let mut subscribe_request = SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots::default(),
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
};
let mut tracked_slot: u64 = 0;
loop {
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
subscribe_request.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
let (mut _sink, mut stream) = client
.subscribe_with_request(Some(subscribe_request.clone()))
.await?;
loop {
match stream.next().await {
Some(Ok(msg)) => {
if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
tracked_slot = slot.slot;
println!(
"Slot: {} | Status: {:?}",
slot.slot,
SlotStatus::try_from(slot.status).unwrap()
);
}
}
Some(Err(e)) => {
eprintln!("Error receiving updates: {}", e);
break;
}
None => break,
}
}
}
}Le flux représente une monnaie (comptes de jetons)
rust
SubscribeRequest {
accounts: HashMap::from([(
"token_accounts_by_mint".to_string(),
SubscribeRequestFilterAccounts {
filters: vec![
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::TokenAccountState(true)),
},
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
offset: 0,
data: Some(Data::Base58(
"pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // replace with mint
)),
})),
},
],
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}SubscribeRequest {
accounts: HashMap::from([(
"token_accounts_by_mint".to_string(),
SubscribeRequestFilterAccounts {
filters: vec![
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::TokenAccountState(true)),
},
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
offset: 0,
data: Some(Data::Base58(
"pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // replace with mint
)),
})),
},
],
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}Opérations sur flux (réussite, non-vote)
rust
SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}Faits saillants concernant la qualité de la production
- client durable avec reconnexion et récupération des écarts (
from_slot) - Ping/pong manipulation pour garder le ruisseau en vie
- Chaînes bombées/backpressure entre l'entrée et la transformation
- Mises à jour dynamiques de l'abonnement sur le même flux bidi
- Gestion des erreurs, mesures et enregistrement pour les consommateurs lents/dropped mises à jour
client en pleine production (durable, récupération des écarts, ping)/pong, abonnements dynamiques)
rust
use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserSubscribeUpdate as SubscribeUpdate, GeyserUpdateOneof as UpdateOneof,
};
use solana_stream_sdk::yellowstone_grpc_proto::geyser::SubscribeRequestPing;
const UPDATE_CHANNEL_CAPACITY: usize = 10_000;
#[tokio::main]
async fn main() {
let endpoint = "http://your-erpc-grpc-endpoint".to_string();
let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots {
..Default::default()
},
)]),
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}));
let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
let dispatcher_handle = tokio::spawn(async move {
dispatcher_loop(updates_rx).await;
});
let ingress_handle = tokio::spawn(async move {
ingress_loop(
updates_tx,
subscribe_rx,
endpoint,
subscribe_request,
)
.await;
});
if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
println!("Error: {:?}", e);
}
}
async fn ingress_loop(
updates_tx: mpsc::Sender<SubscribeUpdate>,
mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
endpoint: String,
subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
let mut tracked_slot: u64 = 0;
loop {
let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()).unwrap();
let mut client = builder.connect().await.unwrap();
subscribe_request.write().await.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
match client
.subscribe_with_request(Some(subscribe_request.read().await.clone()))
.await
{
Ok((mut subscribe_tx, mut stream)) => {
loop {
tokio::select! {
Some(subscribe_request) = subscribe_rx.recv() => {
if let Err(e) = subscribe_tx.send(subscribe_request).await {
println!("Error sending subscribe request: {:?}", e);
break;
}
}
Some(result) = stream.next() => {
match result {
Ok(update) => {
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
if let Err(e) = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await
{
println!("Error sending ping: {:?}", e);
}
}
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
continue;
}
if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
tracked_slot = slot_update.slot;
continue;
}
if let Err(e) = updates_tx.send(update.clone()).await {
println!("Slow consumer, dropping update {:?} err: {:?}", update, e);
}
}
Err(e) => {
println!("Error receiving update: {:?}", e);
break;
}
}
}
}
}
}
Err(e) => {
println!("Error subscribing: {:?}", e);
}
};
println!("Disconnected, reconnecting...");
}
}
async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
loop {
if let Some(update) = updates_rx.recv().await {
match update.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
_ => {
println!("Received update: {:?}", update);
}
}
}
}
}use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserSubscribeUpdate as SubscribeUpdate, GeyserUpdateOneof as UpdateOneof,
};
use solana_stream_sdk::yellowstone_grpc_proto::geyser::SubscribeRequestPing;
const UPDATE_CHANNEL_CAPACITY: usize = 10_000;
#[tokio::main]
async fn main() {
let endpoint = "http://your-erpc-grpc-endpoint".to_string();
let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots {
..Default::default()
},
)]),
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}));
let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
let dispatcher_handle = tokio::spawn(async move {
dispatcher_loop(updates_rx).await;
});
let ingress_handle = tokio::spawn(async move {
ingress_loop(
updates_tx,
subscribe_rx,
endpoint,
subscribe_request,
)
.await;
});
if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
println!("Error: {:?}", e);
}
}
async fn ingress_loop(
updates_tx: mpsc::Sender<SubscribeUpdate>,
mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
endpoint: String,
subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
let mut tracked_slot: u64 = 0;
loop {
let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()).unwrap();
let mut client = builder.connect().await.unwrap();
subscribe_request.write().await.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
match client
.subscribe_with_request(Some(subscribe_request.read().await.clone()))
.await
{
Ok((mut subscribe_tx, mut stream)) => {
loop {
tokio::select! {
Some(subscribe_request) = subscribe_rx.recv() => {
if let Err(e) = subscribe_tx.send(subscribe_request).await {
println!("Error sending subscribe request: {:?}", e);
break;
}
}
Some(result) = stream.next() => {
match result {
Ok(update) => {
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
if let Err(e) = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await
{
println!("Error sending ping: {:?}", e);
}
}
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
continue;
}
if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
tracked_slot = slot_update.slot;
continue;
}
if let Err(e) = updates_tx.send(update.clone()).await {
println!("Slow consumer, dropping update {:?} err: {:?}", update, e);
}
}
Err(e) => {
println!("Error receiving update: {:?}", e);
break;
}
}
}
}
}
}
Err(e) => {
println!("Error subscribing: {:?}", e);
}
};
println!("Disconnected, reconnecting...");
}
}
async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
loop {
if let Some(update) = updates_rx.recv().await {
match update.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
_ => {
println!("Received update: {:?}", update);
}
}
}
}
}Exemples de contributions
Vous avez trouvé un modèle utile? Envisagez de contribuer à ces documents.
Dépôt SDK Solana Stream: https://github.com/ValidatorsDAO/solana-stream