Code Examples
This page provides practical code examples for common Yellowstone gRPC use cases. All examples are in Rust and include placeholders that will be filled with complete working code.
Prerequisites & Authentication
See Quickstart for prerequisites & authentication.
Basic subscription that streams all transactions
1 use anyhow::Result; 2 use futures::{sink::SinkExt, stream::StreamExt}; 3 use solana_signature::Signature; 4 use std::collections::HashMap; 5 use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient}; 6 use yellowstone_grpc_proto::geyser::{ 7 CommitmentLevel, SubscribeRequest, SubscribeRequestFilterTransactions, 8 subscribe_update::UpdateOneof, 9 }; 10 #[tokio::main] 11 async fn main() -> Result<()> { 12 // Set up connection parameters 13 let endpoint = "https://solana-mainnet.g.alchemy.com"; 14 let x_token = "ALCHEMY_API_KEY"; // Replace with your Alchemy API key 15 16 // Build and connect the gRPC client with TLS and authentication 17 let mut client = GeyserGrpcClient::build_from_shared(endpoint)? 18 .tls_config(ClientTlsConfig::new().with_native_roots())? 19 .x_token(Some(x_token))? // API key passed as X-Token header 20 .connect() 21 .await?; 22 // Create a bidirectional stream for subscribing to updates 23 let (mut tx, mut stream) = client.subscribe().await?; 24 // Send subscription request to filter for non-vote, non-failed transactions 25 tx.send(SubscribeRequest { 26 transactions: HashMap::from([( 27 "all_transactions".to_string(), 28 SubscribeRequestFilterTransactions { 29 vote: Some(false), // Exclude vote transactions 30 failed: Some(false), // Exclude failed transactions 31 ..Default::default() 32 }, 33 )]), 34 commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level 35 ..Default::default() 36 }) 37 .await?; 38 // Process incoming transaction updates 39 while let Some(Ok(msg)) = stream.next().await { 40 if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof { 41 // Extract and display transaction signature 42 let sig = tx 43 .transaction 44 .and_then(|t| Some(Signature::try_from(t.signature.as_slice()).unwrap())) 45 .unwrap_or_default(); 46 println!("Slot: {} | Signature: {}", tx.slot, sig); 47 } 48 } 49 Ok(()) 50 }
Durable client
Implement a robust client with reconnection logic & gap recovery with from_slot
:
1 use anyhow::Result; 2 use futures::stream::StreamExt; 3 use std::collections::HashMap; 4 use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient}; 5 use yellowstone_grpc_proto::geyser::{ 6 CommitmentLevel, SlotStatus, SubscribeRequest, SubscribeRequestFilterSlots, 7 subscribe_update::UpdateOneof, 8 }; 9 10 #[tokio::main] 11 async fn main() -> Result<()> { 12 // Alchemy Solana Mainnet endpoint 13 let endpoint = "https://solana-mainnet.g.alchemy.com"; 14 let x_token = "ALCHEMY_API_KEY"; // Replace with your Alchemy API key 15 16 // Build a subscription request to receive slot updates 17 // We subscribe to all slots with confirmed commitment level 18 let mut subscribe_request = SubscribeRequest { 19 slots: HashMap::from([( 20 "slots".to_string(), 21 SubscribeRequestFilterSlots { 22 ..Default::default() 23 }, 24 )]), 25 commitment: Some(CommitmentLevel::Confirmed as i32), 26 ..Default::default() 27 }; 28 29 // Track the latest slot we've seen for gap recovery 30 let mut tracked_slot: u64 = 0; 31 32 // Main reconnection loop - continuously reconnect 33 loop { 34 // Build and connect to the Yellowstone gRPC client 35 let mut client = GeyserGrpcClient::build_from_shared(endpoint)? 36 .tls_config(ClientTlsConfig::new().with_native_roots())? 37 .x_token(Some(x_token))? 38 .connect() 39 .await?; 40 41 // This ensures that even if we disconnect and miss updates, we can 42 // recover the missed data when we reconnect (assuming the server has 43 // retention for those slots). 44 if tracked_slot > 0 { 45 subscribe_request.from_slot = Some(tracked_slot); 46 } else { 47 subscribe_request.from_slot = None; 48 } 49 50 // Subscribe to slot updates with our configured request 51 let (mut _update_subscription, mut stream) = client 52 .subscribe_with_request(Some(subscribe_request.clone())) 53 .await?; 54 55 // Process incoming slot updates until an error occurs 56 loop { 57 match stream.next().await { 58 Some(Ok(msg)) => { 59 // Extract and print slot information if this update contains slot data 60 if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof { 61 println!( 62 "Received slot: {} with status {:?}", 63 slot.slot, 64 SlotStatus::try_from(slot.status).unwrap() 65 ); 66 // Update our tracked slot to use for the next reconnection 67 tracked_slot = slot.slot; 68 } 69 } 70 Some(Err(e)) => { 71 // On error, log and break to reconnect (outer loop will restart) 72 println!("Error receiving updates: {}", e); 73 break; 74 } 75 _ => {} 76 } 77 } 78 } 79 }
Stream accounts
SubscribeRequest for streaming token accounts for a specific mint
1 SubscribeRequest { 2 accounts: HashMap::from([( 3 "token_accounts_by_mint".to_string(), 4 SubscribeRequestFilterAccounts { 5 filters: vec![ 6 // Filter for token accounts 7 SubscribeRequestFilterAccountsFilter { 8 filter: Some(Filter::TokenAccountState(true)), 9 }, 10 // Filter for the specific mint based on the token account structure 11 SubscribeRequestFilterAccountsFilter { 12 filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp { 13 offset: 0, // Mint is located at the beginning of the account data 14 data: Some(Data::Base58( 15 "pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // Replace with the specific mint address 16 )), 17 })), 18 }, 19 ], 20 ..Default::default() 21 }, 22 )]), 23 commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level 24 ..Default::default() 25 }
Stream transactions
SubscribeRequest for streaming all successful non-vote transactions
1 SubscribeRequest { 2 transactions: HashMap::from([( 3 "all_transactions".to_string(), 4 SubscribeRequestFilterTransactions { 5 vote: Some(false), // Exclude vote transactions 6 failed: Some(false), // Exclude failed transactions 7 ..Default::default() 8 }, 9 )]), 10 commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level 11 ..Default::default() 12 }
Production-Grade client
- Durable client
- Gap recovery with
from_slot
- Async processing with tokio channels
- Dynamic management of subscription
- Error handling
1 use futures::{SinkExt, stream::StreamExt}; 2 use solana_signature::Signature; 3 use std::{collections::HashMap, sync::Arc}; 4 use tokio::sync::{RwLock, mpsc}; 5 use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient}; 6 use yellowstone_grpc_proto::geyser::{ 7 CommitmentLevel, SubscribeRequest, SubscribeRequestFilterSlots, 8 SubscribeRequestFilterTransactions, SubscribeRequestPing, SubscribeUpdate, 9 subscribe_update::UpdateOneof, 10 }; 11 12 // Channel capacity for buffering updates from the ingress loop to the dispatcher. 13 // This allows the ingress loop to continue receiving data even if the dispatcher 14 // is temporarily slower, preventing backpressure to the gRPC stream. 15 const UPDATE_CHANNEL_CAPACITY: usize = 10000; 16 17 #[tokio::main] 18 async fn main() { 19 // Configure the Yellowstone gRPC endpoint and authentication token 20 // In production, these should be loaded from environment variables or a config file 21 let endpoint = "https://solana-mainnet.g.alchemy.com".to_string(); 22 let x_token = "ALCHEMY_API_KEY".to_string(); // Replace with your Alchemy API key 23 24 // This is the initial subscription request. This can be updated and sent to the ingress loop through the `subscribe_rx` channel. 25 let subscribe_request = Arc::new(RwLock::new(SubscribeRequest { 26 slots: HashMap::from([( 27 "slots".to_string(), 28 SubscribeRequestFilterSlots { 29 ..Default::default() 30 }, 31 )]), 32 transactions: HashMap::from([( 33 "all_transactions".to_string(), 34 SubscribeRequestFilterTransactions { 35 vote: Some(false), // Exclude vote transactions 36 failed: Some(false), // Exclude failed transactions 37 ..Default::default() 38 }, 39 )]), 40 commitment: Some(CommitmentLevel::Confirmed as i32), 41 // from_slot will be set dynamically in the ingress loop for gap recovery 42 ..Default::default() 43 })); 44 45 // Create a bounded channel for updates from ingress to dispatcher. 46 // Ingress will drop updates if the channel is full. 47 let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY); 48 49 // Create an unbounded channel for dynamic subscription updates. 50 // This allows any part of your application to send new subscription requests 51 // to modify what data you're receiving at runtime (e.g., add new account filters). 52 let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>(); 53 54 // Spawn the dispatcher task: processes updates and implements business logic 55 // In production, this could write to a database, forward to other services, etc. 56 let dispatcher_handle = tokio::spawn(async move { 57 dispatcher_loop(updates_rx).await; 58 }); 59 60 // Spawn the ingress task: manages gRPC connection, reconnection, and gap recovery 61 let ingress_handle = tokio::spawn(async move { 62 ingress_loop( 63 updates_tx, 64 subscribe_rx, 65 endpoint, 66 x_token, 67 subscribe_request, 68 ) 69 .await; 70 }); 71 72 // Wait for both tasks to complete (they run indefinitely in this example) 73 // In production, you might want to handle graceful shutdown with signal handlers 74 if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 { 75 println!("Error: {:?}", e); 76 } 77 } 78 79 // - Automatic reconnection on any error or disconnect 80 // - Gap recovery using from_slot to resume from last known position 81 // - Ping/pong handling to keep connection alive 82 // - Dynamic subscription management via subscribe_rx channel 83 // 84 // The outer loop ensures the client never stops - it will continuously 85 // attempt to reconnect if the connection drops for any reason. 86 async fn ingress_loop( 87 updates_tx: mpsc::Sender<SubscribeUpdate>, 88 mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>, 89 endpoint: String, 90 x_token: String, 91 subscribe_request: Arc<RwLock<SubscribeRequest>>, 92 ) { 93 // Configure TLS for secure connection to Yellowstone gRPC server 94 let tls_config = ClientTlsConfig::new().with_native_roots(); 95 96 // Track the last slot we successfully processed for gap recovery 97 // This is critical for ensuring no data is missed across reconnections 98 let mut tracked_slot: u64 = 0; 99 100 // This infinite loop ensures the client is DURABLE - it will never give up. 101 // On any error or disconnect, we simply reconnect and resume from where we left off. 102 loop { 103 // Build the gRPC client with authentication and TLS configuration 104 let builder = GeyserGrpcClient::build_from_shared(endpoint.clone()) 105 .unwrap() 106 .x_token(Some(x_token.clone())) 107 .unwrap() 108 .tls_config(tls_config.clone()) 109 .unwrap(); 110 111 let mut client = builder.connect().await.unwrap(); 112 113 // This ensures that even if we disconnect and miss updates, we can 114 // recover the missed data when we reconnect (assuming the server has 115 // retention for those slots). 116 subscribe_request.write().await.from_slot = if tracked_slot > 0 { 117 Some(tracked_slot) 118 } else { 119 None 120 }; 121 122 // Attempt to subscribe with our configured request (including from_slot) 123 // Returns a tuple of (sink, stream): 124 // - sink: For sending requests to the server (pings, subscription updates) 125 // - stream: For receiving updates from the server 126 match client 127 .subscribe_with_request(Some(subscribe_request.read().await.clone())) 128 .await 129 { 130 Ok((mut subscribe_tx, mut stream)) => { 131 // This loop runs as long as the connection is healthy. 132 // It concurrently handles: 133 // 1. Dynamic subscription updates from subscribe_rx 134 // 2. Stream updates from the gRPC server 135 loop { 136 tokio::select! { 137 // Branch 1: Dynamic Subscription Management 138 // 139 // Listen for new subscription requests sent via subscribe_tx 140 // This allows runtime modification of subscriptions without 141 // disconnecting (e.g., add/remove account filters) 142 Some(subscribe_request) = subscribe_rx.recv() => { 143 // Forward the updated subscription request to the gRPC server 144 if let Err(e) = subscribe_tx.send(subscribe_request).await { 145 println!("Error sending subscribe request to grpc: {:?}", e); 146 // Connection issue - break to outer loop for reconnect 147 break; 148 } 149 } 150 151 // Branch 2: Process Stream Updates 152 Some(result) = stream.next() => { 153 match result { 154 Ok(update) => { 155 // Yellowstone gRPC servers send pings to ensure 156 // the client is alive. We must respond with a pong. 157 if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) { 158 if let Err(e) = subscribe_tx 159 .send(SubscribeRequest { 160 ping: Some(SubscribeRequestPing { id: 1 }), 161 ..Default::default() 162 }) 163 .await 164 { 165 println!("Error sending ping to grpc: {:?}", e); 166 } 167 } 168 169 // Ping/pong are internal protocol messages, 170 // not business data, so we don't forward them 171 if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) { 172 continue; 173 } 174 175 // Update tracked_slot with every slot update. 176 // This is essential for gap recovery - if we 177 // disconnect, we know exactly which slot to 178 // resume from when reconnecting. 179 if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof { 180 tracked_slot = slot_update.slot; 181 continue; // Slot updates are tracked but not forwarded 182 } 183 184 // Send the update through the channel to the 185 // dispatcher for processing. 186 // 187 // Error handling: If the channel is full or 188 // the receiver is dropped, log and continue. 189 // This prevents ingress from blocking. 190 if let Err(e) = updates_tx.send(update.clone()).await { 191 println!("Slow consumer, dropping update {:?} with error: {:?}", update, e); 192 } 193 } 194 // Any error on the stream (network issue, server 195 // restart, etc.) triggers a reconnection attempt. 196 Err(e) => { 197 println!("Error receiving update from stream: {:?}", e); 198 break; // Break to outer loop for reconnect 199 } 200 } 201 } 202 } 203 } 204 } 205 // If we can't subscribe (e.g., auth failure, invalid request), 206 // log the error and loop back to reconnect. In production, you 207 // might want exponential backoff here to avoid hammering the server. 208 Err(e) => { 209 println!("Error subscribing to Yellowstone gRPC server: {:?}", e); 210 } 211 }; 212 213 // If we reach here, the connection was lost or subscription failed. 214 // The outer loop will automatically reconnect and resume from tracked_slot. 215 // In production, consider adding: 216 // - Exponential backoff to avoid rapid reconnection attempts 217 // - Connection metrics/monitoring 218 // - Alerting on repeated failures 219 println!("Disconnected from Yellowstone gRPC server, reconnecting..."); 220 } 221 } 222 223 // In production, this is where you would: 224 // - Write to databases 225 // - Forward to other services 226 // - Apply complex business logic 227 // - Aggregate metrics 228 // - Trigger alerts 229 async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) { 230 loop { 231 tokio::select! { 232 // This receives updates sent from the ingress loop via the channel. 233 // The bounded channel provides backpressure - if this loop is too 234 // slow, the ingress will drop updates. 235 Some(update) = updates_rx.recv() => { 236 match update.update_oneof { 237 Some(UpdateOneof::Transaction(tx)) => { 238 // Extract the transaction signature for display/logging 239 let sig = tx 240 .transaction 241 .and_then(|t| Some(Signature::try_from(t.signature.as_slice()).unwrap())) 242 .unwrap_or_default(); 243 println!("Slot: {} | Signature: {}", tx.slot, sig); 244 245 // In production, you might: 246 // - Parse transaction instructions 247 // - Store in database 248 // - Check for specific program interactions 249 // - Calculate metrics 250 // - Forward to downstream services 251 } 252 253 // -------------------------------------------------------- 254 // Account Updates 255 // -------------------------------------------------------- 256 // 257 // Uncomment to handle account updates: 258 // Some(UpdateOneof::Account(account)) => { 259 // // TODO: Implement account updates 260 // } 261 262 // -------------------------------------------------------- 263 // Block Updates 264 // -------------------------------------------------------- 265 // 266 // Uncomment to handle block updates: 267 // Some(UpdateOneof::Block(block)) => { 268 // // TODO: Implement block updates 269 // } 270 271 // -------------------------------------------------------- 272 // Other Updates 273 // -------------------------------------------------------- 274 _ => { 275 println!("Received unknown update from Yellowstone gRPC server: {:?}", update); 276 } 277 } 278 } 279 } 280 } 281 }
Contributing Examples
Found a useful pattern? Consider contributing it to these docs!