This page provides practical code examples for common Yellowstone gRPC use cases. All examples are in Rust and include placeholders that will be filled with complete working code.
See Quickstart for prerequisites & authentication.
use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterTransactions,
subscribe_update::UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
// Set up connection parameters
let endpoint = "https://solana-mainnet.g.alchemy.com";
let x_token = "ALCHEMY_API_KEY"; // Replace with your Alchemy API key
// Build and connect the gRPC client with TLS and authentication
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.x_token(Some(x_token))? // API key passed as X-Token header
.connect()
.await?;
// Create a bidirectional stream for subscribing to updates
let (mut tx, mut stream) = client.subscribe().await?;
// Send subscription request to filter for non-vote, non-failed transactions
tx.send(SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false), // Exclude vote transactions
failed: Some(false), // Exclude failed transactions
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level
..Default::default()
})
.await?;
// Process incoming transaction updates
while let Some(Ok(msg)) = stream.next().await {
if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
// Extract and display transaction signature
let sig = tx
.transaction
.and_then(|t| Some(Signature::try_from(t.signature.as_slice()).unwrap()))
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
}
}
Ok(())
}Implement a robust client with reconnection logic & gap recovery with from_slot:
use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SlotStatus, SubscribeRequest, SubscribeRequestFilterSlots,
subscribe_update::UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
// Alchemy Solana Mainnet endpoint
let endpoint = "https://solana-mainnet.g.alchemy.com";
let x_token = "ALCHEMY_API_KEY"; // Replace with your Alchemy API key
// Build a subscription request to receive slot updates
// We subscribe to all slots with confirmed commitment level
let mut subscribe_request = SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots {
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
};
// Track the latest slot we've seen for gap recovery
let mut tracked_slot: u64 = 0;
// Main reconnection loop - continuously reconnect
loop {
// Build and connect to the Yellowstone gRPC client
let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.x_token(Some(x_token))?
.connect()
.await?;
// This ensures that even if we disconnect and miss updates, we can
// recover the missed data when we reconnect (assuming the server has
// retention for those slots).
if tracked_slot > 0 {
subscribe_request.from_slot = Some(tracked_slot);
} else {
subscribe_request.from_slot = None;
}
// Subscribe to slot updates with our configured request
let (mut _update_subscription, mut stream) = client
.subscribe_with_request(Some(subscribe_request.clone()))
.await?;
// Process incoming slot updates until an error occurs
loop {
match stream.next().await {
Some(Ok(msg)) => {
// Extract and print slot information if this update contains slot data
if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
println!(
"Received slot: {} with status {:?}",
slot.slot,
SlotStatus::try_from(slot.status).unwrap()
);
// Update our tracked slot to use for the next reconnection
tracked_slot = slot.slot;
}
}
Some(Err(e)) => {
// On error, log and break to reconnect (outer loop will restart)
println!("Error receiving updates: {}", e);
break;
}
_ => {}
}
}
}
}SubscribeRequest {
accounts: HashMap::from([(
"token_accounts_by_mint".to_string(),
SubscribeRequestFilterAccounts {
filters: vec![
// Filter for token accounts
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::TokenAccountState(true)),
},
// Filter for the specific mint based on the token account structure
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
offset: 0, // Mint is located at the beginning of the account data
data: Some(Data::Base58(
"pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // Replace with the specific mint address
)),
})),
},
],
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level
..Default::default()
}SubscribeRequest {
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false), // Exclude vote transactions
failed: Some(false), // Exclude failed transactions
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level
..Default::default()
}- Durable client
- Gap recovery with
from_slot - Async processing with tokio channels
- Dynamic management of subscription
- Error handling
use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions, SubscribeRequestPing, SubscribeUpdate,
subscribe_update::UpdateOneof,
};
// Channel capacity for buffering updates from the ingress loop to the dispatcher.
// This allows the ingress loop to continue receiving data even if the dispatcher
// is temporarily slower, preventing backpressure to the gRPC stream.
const UPDATE_CHANNEL_CAPACITY: usize = 10000;
#[tokio::main]
async fn main() {
// Configure the Yellowstone gRPC endpoint and authentication token
// In production, these should be loaded from environment variables or a config file
let endpoint = "https://solana-mainnet.g.alchemy.com".to_string();
let x_token = "ALCHEMY_API_KEY".to_string(); // Replace with your Alchemy API key
// This is the initial subscription request. This can be updated and sent to the ingress loop through the `subscribe_rx` channel.
let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
slots: HashMap::from([(
"slots".to_string(),
SubscribeRequestFilterSlots {
..Default::default()
},
)]),
transactions: HashMap::from([(
"all_transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false), // Exclude vote transactions
failed: Some(false), // Exclude failed transactions
..Default::default()
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
// from_slot will be set dynamically in the ingress loop for gap recovery
..Default::default()
}));
// Create a bounded channel for updates from ingress to dispatcher.
// Ingress will drop updates if the channel is full.
let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
// Create an unbounded channel for dynamic subscription updates.
// This allows any part of your application to send new subscription requests
// to modify what data you're receiving at runtime (e.g., add new account filters).
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
// Spawn the dispatcher task: processes updates and implements business logic
// In production, this could write to a database, forward to other services, etc.
let dispatcher_handle = tokio::spawn(async move {
dispatcher_loop(updates_rx).await;
});
// Spawn the ingress task: manages gRPC connection, reconnection, and gap recovery
let ingress_handle = tokio::spawn(async move {
ingress_loop(
updates_tx,
subscribe_rx,
endpoint,
x_token,
subscribe_request,
)
.await;
});
// Wait for both tasks to complete (they run indefinitely in this example)
// In production, you might want to handle graceful shutdown with signal handlers
if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
println!("Error: {:?}", e);
}
}
// - Automatic reconnection on any error or disconnect
// - Gap recovery using from_slot to resume from last known position
// - Ping/pong handling to keep connection alive
// - Dynamic subscription management via subscribe_rx channel
//
// The outer loop ensures the client never stops - it will continuously
// attempt to reconnect if the connection drops for any reason.
async fn ingress_loop(
updates_tx: mpsc::Sender<SubscribeUpdate>,
mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
endpoint: String,
x_token: String,
subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
// Configure TLS for secure connection to Yellowstone gRPC server
let tls_config = ClientTlsConfig::new().with_native_roots();
// Track the last slot we successfully processed for gap recovery
// This is critical for ensuring no data is missed across reconnections
let mut tracked_slot: u64 = 0;
// This infinite loop ensures the client is DURABLE - it will never give up.
// On any error or disconnect, we simply reconnect and resume from where we left off.
loop {
// Build the gRPC client with authentication and TLS configuration
let builder = GeyserGrpcClient::build_from_shared(endpoint.clone())
.unwrap()
.x_token(Some(x_token.clone()))
.unwrap()
.tls_config(tls_config.clone())
.unwrap();
let mut client = builder.connect().await.unwrap();
// This ensures that even if we disconnect and miss updates, we can
// recover the missed data when we reconnect (assuming the server has
// retention for those slots).
subscribe_request.write().await.from_slot = if tracked_slot > 0 {
Some(tracked_slot)
} else {
None
};
// Attempt to subscribe with our configured request (including from_slot)
// Returns a tuple of (sink, stream):
// - sink: For sending requests to the server (pings, subscription updates)
// - stream: For receiving updates from the server
match client
.subscribe_with_request(Some(subscribe_request.read().await.clone()))
.await
{
Ok((mut subscribe_tx, mut stream)) => {
// This loop runs as long as the connection is healthy.
// It concurrently handles:
// 1. Dynamic subscription updates from subscribe_rx
// 2. Stream updates from the gRPC server
loop {
tokio::select! {
// Branch 1: Dynamic Subscription Management
//
// Listen for new subscription requests sent via subscribe_tx
// This allows runtime modification of subscriptions without
// disconnecting (e.g., add/remove account filters)
Some(subscribe_request) = subscribe_rx.recv() => {
// Forward the updated subscription request to the gRPC server
if let Err(e) = subscribe_tx.send(subscribe_request).await {
println!("Error sending subscribe request to grpc: {:?}", e);
// Connection issue - break to outer loop for reconnect
break;
}
}
// Branch 2: Process Stream Updates
Some(result) = stream.next() => {
match result {
Ok(update) => {
// Yellowstone gRPC servers send pings to ensure
// the client is alive. We must respond with a pong.
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
if let Err(e) = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await
{
println!("Error sending ping to grpc: {:?}", e);
}
}
// Ping/pong are internal protocol messages,
// not business data, so we don't forward them
if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
continue;
}
// Update tracked_slot with every slot update.
// This is essential for gap recovery - if we
// disconnect, we know exactly which slot to
// resume from when reconnecting.
if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
tracked_slot = slot_update.slot;
continue; // Slot updates are tracked but not forwarded
}
// Send the update through the channel to the
// dispatcher for processing.
//
// Error handling: If the channel is full or
// the receiver is dropped, log and continue.
// This prevents ingress from blocking.
if let Err(e) = updates_tx.send(update.clone()).await {
println!("Slow consumer, dropping update {:?} with error: {:?}", update, e);
}
}
// Any error on the stream (network issue, server
// restart, etc.) triggers a reconnection attempt.
Err(e) => {
println!("Error receiving update from stream: {:?}", e);
break; // Break to outer loop for reconnect
}
}
}
}
}
}
// If we can't subscribe (e.g., auth failure, invalid request),
// log the error and loop back to reconnect. In production, you
// might want exponential backoff here to avoid hammering the server.
Err(e) => {
println!("Error subscribing to Yellowstone gRPC server: {:?}", e);
}
};
// If we reach here, the connection was lost or subscription failed.
// The outer loop will automatically reconnect and resume from tracked_slot.
// In production, consider adding:
// - Exponential backoff to avoid rapid reconnection attempts
// - Connection metrics/monitoring
// - Alerting on repeated failures
println!("Disconnected from Yellowstone gRPC server, reconnecting...");
}
}
// In production, this is where you would:
// - Write to databases
// - Forward to other services
// - Apply complex business logic
// - Aggregate metrics
// - Trigger alerts
async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
loop {
tokio::select! {
// This receives updates sent from the ingress loop via the channel.
// The bounded channel provides backpressure - if this loop is too
// slow, the ingress will drop updates.
Some(update) = updates_rx.recv() => {
match update.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
// Extract the transaction signature for display/logging
let sig = tx
.transaction
.and_then(|t| Some(Signature::try_from(t.signature.as_slice()).unwrap()))
.unwrap_or_default();
println!("Slot: {} | Signature: {}", tx.slot, sig);
// In production, you might:
// - Parse transaction instructions
// - Store in database
// - Check for specific program interactions
// - Calculate metrics
// - Forward to downstream services
}
// --------------------------------------------------------
// Account Updates
// --------------------------------------------------------
//
// Uncomment to handle account updates:
// Some(UpdateOneof::Account(account)) => {
// // TODO: Implement account updates
// }
// --------------------------------------------------------
// Block Updates
// --------------------------------------------------------
//
// Uncomment to handle block updates:
// Some(UpdateOneof::Block(block)) => {
// // TODO: Implement block updates
// }
// --------------------------------------------------------
// Other Updates
// --------------------------------------------------------
_ => {
println!("Received unknown update from Yellowstone gRPC server: {:?}", update);
}
}
}
}
}
}Found a useful pattern? Consider contributing it to these docs!