Geyser gRPC Kod Örnekleri
Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Pratik Rust Yaygın örnekler için örnekler Geyser gRPC Kullanım vakaları. Geyser gRPC endpoint. Solana Stream SDK destek destekliyor Geyser gRPC.
Temel abonelik: tüm teşviksiz işlemler, gereksiz olmayan işlemler
rust
use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint"; // Set your ERPC gRPC URL
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
let (mut tx, mut stream) = client.subscribe().await?;
tx.send(SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false), // Drop vote txs (~70% of traffic)
failed: Some(false), // Drop failed txs
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
})
.await?;
while let Some(Ok(msg)) = stream.next().await {
if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
}
Ok(())
}use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint"; // Set your ERPC gRPC URL
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
let (mut tx, mut stream) = client.subscribe().await?;
tx.send(SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false), // Drop vote txs (~70% of traffic)
failed: Some(false), // Drop failed txs
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
})
.await?;
while let Some(Ok(msg)) = stream.next().await {
if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
}
Ok(())
}Yeniden bağlantı, boşluk kurtarma, ping /pong
rust
use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSlotStatus as SlotStatus, GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint";
let mut subscribe_request = SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots::default(),
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
};
let mut tracked_slot: u64 = 0;
loop {
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
subscribe_request.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
let (mut _sink, mut stream) = client
.subscribe_with_request(Some(subscribe_request.clone()))
.await?;
loop {
match stream.next().await {
Some(Ok(msg)) => {
if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
tracked_slot = slot.slot;
println!(
"Slot: {} | Status: {:?}",
slot.slot,
SlotStatus::try_from(slot.status).unwrap()
);
}
}
Some(Err(e)) => {
eprintln!("Error receiving updates: {}", e);
break;
}
None => break,
}
}
}
}use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient,
GeyserSlotStatus as SlotStatus, GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserUpdateOneof as UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = "http://your-erpc-grpc-endpoint";
let mut subscribe_request = SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots::default(),
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
};
let mut tracked_slot: u64 = 0;
loop {
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.connect()
.await?;
subscribe_request.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
let (mut _sink, mut stream) = client
.subscribe_with_request(Some(subscribe_request.clone()))
.await?;
loop {
match stream.next().await {
Some(Ok(msg)) => {
if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
tracked_slot = slot.slot;
println!(
"Slot: {} | Status: {:?}",
slot.slot,
SlotStatus::try_from(slot.status).unwrap()
);
}
}
Some(Err(e)) => {
eprintln!("Error receiving updates: {}", e);
break;
}
None => break,
}
}
}
}Bir mint için akış hesapları (token hesapları)
rust
SubscribeRequest {
accounts: HashMap::from([(
"token_accounts_by_mint".to_string(),
SubscribeRequestFilterAccounts {
filters: vec![
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::TokenAccountState(true)),
},
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
offset: 0,
data: Some(Data::Base58(
"pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // replace with mint
)),
})),
},
],
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}SubscribeRequest {
accounts: HashMap::from([(
"token_accounts_by_mint".to_string(),
SubscribeRequestFilterAccounts {
filters: vec![
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::TokenAccountState(true)),
},
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
offset: 0,
data: Some(Data::Base58(
"pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // replace with mint
)),
})),
},
],
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}Akış işlemleri (başarısız, teşviksiz)
rust
SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}Prodüksiyon-grad müşteri vurgular
- Yeniden bağlantı ve boşluk kurtarma ile dayanıklı müşteri (Reconnection and Gap recovery)
from_slot) - Ping/pong, akışı canlı tutmak için çalışır
- Sınırlanmış kanallar / geri baskı ingress and processing
- Aynı teklifi akışta dinamik abonelik güncellemeleri
- Hata kullanımı, metrikler ve yavaş tüketiciler için giriş /
Tam üretim-grad müşteri (muhtemelen, boşluk kurtarma, ping/pong, dinamik abonelikler)
rust
use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserSubscribeUpdate as SubscribeUpdate, GeyserUpdateOneof as UpdateOneof,
};
use solana_stream_sdk::yellowstone_grpc_proto::geyser::SubscribeRequestPing;
const UPDATE_CHANNEL_CAPACITY: usize = 10_000;
#[tokio::main]
async fn main() {
let endpoint = "http://your-erpc-grpc-endpoint".to_string();
let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots {
..Default::default()
},
)]),
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}));
let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
let dispatcher_handle = tokio::spawn(async move {
dispatcher_loop(updates_rx).await;
});
let ingress_handle = tokio::spawn(async move {
ingress_loop(
updates_tx,
subscribe_rx,
endpoint,
subscribe_request,
)
.await;
});
if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
println!("Error: {:?}", e);
}
}
async fn ingress_loop(
updates_tx: mpsc::Sender<SubscribeUpdate>,
mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
endpoint: String,
subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
let mut tracked_slot: u64 = 0;
loop {
let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()).unwrap();
let mut client = builder.connect().await.unwrap();
subscribe_request.write().await.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
match client
.subscribe_with_request(Some(subscribe_request.read().await.clone()))
.await
{
Ok((mut subscribe_tx, mut stream)) => {
loop {
tokio::select! {
Some(subscribe_request) = subscribe_rx.recv() => {
if let Err(e) = subscribe_tx.send(subscribe_request).await {
println!("Error sending subscribe request: {:?}", e);
break;
}
}
Some(result) = stream.next() => {
match result {
Ok(update) => {
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
if let Err(e) = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await
{
println!("Error sending ping: {:?}", e);
}
}
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
continue;
}
if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
tracked_slot = slot_update.slot;
continue;
}
if let Err(e) = updates_tx.send(update.clone()).await {
println!("Slow consumer, dropping update {:?} err: {:?}", update, e);
}
}
Err(e) => {
println!("Error receiving update: {:?}", e);
break;
}
}
}
}
}
}
Err(e) => {
println!("Error subscribing: {:?}", e);
}
};
println!("Disconnected, reconnecting...");
}
}
async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
loop {
if let Some(update) = updates_rx.recv().await {
match update.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
_ => {
println!("Received update: {:?}", update);
}
}
}
}
}use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use solana_stream_sdk::{
GeyserCommitmentLevel as CommitmentLevel, GeyserGrpcClient, GeyserSlotStatus,
GeyserSubscribeRequest as SubscribeRequest,
GeyserSubscribeRequestFilterSlots as SubscribeRequestFilterSlots,
GeyserSubscribeRequestFilterTransactions as SubscribeRequestFilterTransactions,
GeyserSubscribeUpdate as SubscribeUpdate, GeyserUpdateOneof as UpdateOneof,
};
use solana_stream_sdk::yellowstone_grpc_proto::geyser::SubscribeRequestPing;
const UPDATE_CHANNEL_CAPACITY: usize = 10_000;
#[tokio::main]
async fn main() {
let endpoint = "http://your-erpc-grpc-endpoint".to_string();
let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots {
..Default::default()
},
)]),
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
}));
let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
let dispatcher_handle = tokio::spawn(async move {
dispatcher_loop(updates_rx).await;
});
let ingress_handle = tokio::spawn(async move {
ingress_loop(
updates_tx,
subscribe_rx,
endpoint,
subscribe_request,
)
.await;
});
if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
println!("Error: {:?}", e);
}
}
async fn ingress_loop(
updates_tx: mpsc::Sender<SubscribeUpdate>,
mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
endpoint: String,
subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
let mut tracked_slot: u64 = 0;
loop {
let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()).unwrap();
let mut client = builder.connect().await.unwrap();
subscribe_request.write().await.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
match client
.subscribe_with_request(Some(subscribe_request.read().await.clone()))
.await
{
Ok((mut subscribe_tx, mut stream)) => {
loop {
tokio::select! {
Some(subscribe_request) = subscribe_rx.recv() => {
if let Err(e) = subscribe_tx.send(subscribe_request).await {
println!("Error sending subscribe request: {:?}", e);
break;
}
}
Some(result) = stream.next() => {
match result {
Ok(update) => {
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
if let Err(e) = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await
{
println!("Error sending ping: {:?}", e);
}
}
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
continue;
}
if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
tracked_slot = slot_update.slot;
continue;
}
if let Err(e) = updates_tx.send(update.clone()).await {
println!("Slow consumer, dropping update {:?} err: {:?}", update, e);
}
}
Err(e) => {
println!("Error receiving update: {:?}", e);
break;
}
}
}
}
}
}
Err(e) => {
println!("Error subscribing: {:?}", e);
}
};
println!("Disconnected, reconnecting...");
}
}
async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
loop {
if let Some(update) = updates_rx.recv().await {
match update.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
let sig = tx
.transaction
.and_then(|t| Signature::try_from(t.signature.as_slice()).ok())
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
_ => {
println!("Received update: {:?}", update);
}
}
}
}
}Katkı örnekleri
Yararlı bir model mi buldunuz? Bu doklara katkıda bulun.
Solana Stream SDK repository: https://github.com/ValidatorsDAO/solana-stream