Code Examples

This page provides practical code examples for common Yellowstone gRPC use cases. All examples are in Rust and include placeholders that will be filled with complete working code.

Prerequisites & Authentication

See Quickstart for prerequisites & authentication.

Basic subscription that streams all transactions

1use anyhow::Result;
2use futures::{sink::SinkExt, stream::StreamExt};
3use solana_signature::Signature;
4use std::collections::HashMap;
5use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
6use yellowstone_grpc_proto::geyser::{
7 CommitmentLevel, SubscribeRequest, SubscribeRequestFilterTransactions,
8 subscribe_update::UpdateOneof,
9};
10#[tokio::main]
11async fn main() -> Result<()> {
12 // Set up connection parameters
13 let endpoint = "https://solana-mainnet.g.alchemy.com";
14 let x_token = "ALCHEMY_API_KEY"; // Replace with your Alchemy API key
15
16 // Build and connect the gRPC client with TLS and authentication
17 let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
18 .tls_config(ClientTlsConfig::new().with_native_roots())?
19 .x_token(Some(x_token))? // API key passed as X-Token header
20 .connect()
21 .await?;
22 // Create a bidirectional stream for subscribing to updates
23 let (mut tx, mut stream) = client.subscribe().await?;
24 // Send subscription request to filter for non-vote, non-failed transactions
25 tx.send(SubscribeRequest {
26 transactions: HashMap::from([(
27 "all_transactions".to_string(),
28 SubscribeRequestFilterTransactions {
29 vote: Some(false), // Exclude vote transactions
30 failed: Some(false), // Exclude failed transactions
31 ..Default::default()
32 },
33 )]),
34 commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level
35 ..Default::default()
36 })
37 .await?;
38 // Process incoming transaction updates
39 while let Some(Ok(msg)) = stream.next().await {
40 if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
41 // Extract and display transaction signature
42 let sig = tx
43 .transaction
44 .and_then(|t| Some(Signature::try_from(t.signature.as_slice()).unwrap()))
45 .unwrap_or_default();
46 println!("Slot: {} | Signature: {}", tx.slot, sig);
47 }
48 }
49 Ok(())
50}

Durable client

Implement a robust client with reconnection logic & gap recovery with from_slot:

1use anyhow::Result;
2use futures::stream::StreamExt;
3use std::collections::HashMap;
4use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
5use yellowstone_grpc_proto::geyser::{
6 CommitmentLevel, SlotStatus, SubscribeRequest, SubscribeRequestFilterSlots,
7 subscribe_update::UpdateOneof,
8};
9
10#[tokio::main]
11async fn main() -> Result<()> {
12 // Alchemy Solana Mainnet endpoint
13 let endpoint = "https://solana-mainnet.g.alchemy.com";
14 let x_token = "ALCHEMY_API_KEY"; // Replace with your Alchemy API key
15
16 // Build a subscription request to receive slot updates
17 // We subscribe to all slots with confirmed commitment level
18 let mut subscribe_request = SubscribeRequest {
19 slots: HashMap::from([(
20 "slots".to_string(),
21 SubscribeRequestFilterSlots {
22 ..Default::default()
23 },
24 )]),
25 commitment: Some(CommitmentLevel::Confirmed as i32),
26 ..Default::default()
27 };
28
29 // Track the latest slot we've seen for gap recovery
30 let mut tracked_slot: u64 = 0;
31
32 // Main reconnection loop - continuously reconnect
33 loop {
34 // Build and connect to the Yellowstone gRPC client
35 let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
36 .tls_config(ClientTlsConfig::new().with_native_roots())?
37 .x_token(Some(x_token))?
38 .connect()
39 .await?;
40
41 // This ensures that even if we disconnect and miss updates, we can
42 // recover the missed data when we reconnect (assuming the server has
43 // retention for those slots).
44 if tracked_slot > 0 {
45 subscribe_request.from_slot = Some(tracked_slot);
46 } else {
47 subscribe_request.from_slot = None;
48 }
49
50 // Subscribe to slot updates with our configured request
51 let (mut _update_subscription, mut stream) = client
52 .subscribe_with_request(Some(subscribe_request.clone()))
53 .await?;
54
55 // Process incoming slot updates until an error occurs
56 loop {
57 match stream.next().await {
58 Some(Ok(msg)) => {
59 // Extract and print slot information if this update contains slot data
60 if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
61 println!(
62 "Received slot: {} with status {:?}",
63 slot.slot,
64 SlotStatus::try_from(slot.status).unwrap()
65 );
66 // Update our tracked slot to use for the next reconnection
67 tracked_slot = slot.slot;
68 }
69 }
70 Some(Err(e)) => {
71 // On error, log and break to reconnect (outer loop will restart)
72 println!("Error receiving updates: {}", e);
73 break;
74 }
75 _ => {}
76 }
77 }
78 }
79}

Stream accounts

SubscribeRequest for streaming token accounts for a specific mint

1SubscribeRequest {
2 accounts: HashMap::from([(
3 "token_accounts_by_mint".to_string(),
4 SubscribeRequestFilterAccounts {
5 filters: vec![
6 // Filter for token accounts
7 SubscribeRequestFilterAccountsFilter {
8 filter: Some(Filter::TokenAccountState(true)),
9 },
10 // Filter for the specific mint based on the token account structure
11 SubscribeRequestFilterAccountsFilter {
12 filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
13 offset: 0, // Mint is located at the beginning of the account data
14 data: Some(Data::Base58(
15 "pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // Replace with the specific mint address
16 )),
17 })),
18 },
19 ],
20 ..Default::default()
21 },
22 )]),
23 commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level
24 ..Default::default()
25}

Stream transactions

SubscribeRequest for streaming all successful non-vote transactions

1SubscribeRequest {
2 transactions: HashMap::from([(
3 "all_transactions".to_string(),
4 SubscribeRequestFilterTransactions {
5 vote: Some(false), // Exclude vote transactions
6 failed: Some(false), // Exclude failed transactions
7 ..Default::default()
8 },
9 )]),
10 commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level
11 ..Default::default()
12}

Production-Grade client

  • Durable client
  • Gap recovery with from_slot
  • Async processing with tokio channels
  • Dynamic management of subscription
  • Error handling
1use futures::{SinkExt, stream::StreamExt};
2use solana_signature::Signature;
3use std::{collections::HashMap, sync::Arc};
4use tokio::sync::{RwLock, mpsc};
5use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
6use yellowstone_grpc_proto::geyser::{
7 CommitmentLevel, SubscribeRequest, SubscribeRequestFilterSlots,
8 SubscribeRequestFilterTransactions, SubscribeRequestPing, SubscribeUpdate,
9 subscribe_update::UpdateOneof,
10};
11
12// Channel capacity for buffering updates from the ingress loop to the dispatcher.
13// This allows the ingress loop to continue receiving data even if the dispatcher
14// is temporarily slower, preventing backpressure to the gRPC stream.
15const UPDATE_CHANNEL_CAPACITY: usize = 10000;
16
17#[tokio::main]
18async fn main() {
19 // Configure the Yellowstone gRPC endpoint and authentication token
20 // In production, these should be loaded from environment variables or a config file
21 let endpoint = "https://solana-mainnet.g.alchemy.com".to_string();
22 let x_token = "ALCHEMY_API_KEY".to_string(); // Replace with your Alchemy API key
23
24 // This is the initial subscription request. This can be updated and sent to the ingress loop through the `subscribe_rx` channel.
25 let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
26 slots: HashMap::from([(
27 "slots".to_string(),
28 SubscribeRequestFilterSlots {
29 ..Default::default()
30 },
31 )]),
32 transactions: HashMap::from([(
33 "all_transactions".to_string(),
34 SubscribeRequestFilterTransactions {
35 vote: Some(false), // Exclude vote transactions
36 failed: Some(false), // Exclude failed transactions
37 ..Default::default()
38 },
39 )]),
40 commitment: Some(CommitmentLevel::Confirmed as i32),
41 // from_slot will be set dynamically in the ingress loop for gap recovery
42 ..Default::default()
43 }));
44
45 // Create a bounded channel for updates from ingress to dispatcher.
46 // Ingress will drop updates if the channel is full.
47 let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);
48
49 // Create an unbounded channel for dynamic subscription updates.
50 // This allows any part of your application to send new subscription requests
51 // to modify what data you're receiving at runtime (e.g., add new account filters).
52 let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
53
54 // Spawn the dispatcher task: processes updates and implements business logic
55 // In production, this could write to a database, forward to other services, etc.
56 let dispatcher_handle = tokio::spawn(async move {
57 dispatcher_loop(updates_rx).await;
58 });
59
60 // Spawn the ingress task: manages gRPC connection, reconnection, and gap recovery
61 let ingress_handle = tokio::spawn(async move {
62 ingress_loop(
63 updates_tx,
64 subscribe_rx,
65 endpoint,
66 x_token,
67 subscribe_request,
68 )
69 .await;
70 });
71
72 // Wait for both tasks to complete (they run indefinitely in this example)
73 // In production, you might want to handle graceful shutdown with signal handlers
74 if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
75 println!("Error: {:?}", e);
76 }
77}
78
79// - Automatic reconnection on any error or disconnect
80// - Gap recovery using from_slot to resume from last known position
81// - Ping/pong handling to keep connection alive
82// - Dynamic subscription management via subscribe_rx channel
83//
84// The outer loop ensures the client never stops - it will continuously
85// attempt to reconnect if the connection drops for any reason.
86async fn ingress_loop(
87 updates_tx: mpsc::Sender<SubscribeUpdate>,
88 mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
89 endpoint: String,
90 x_token: String,
91 subscribe_request: Arc<RwLock<SubscribeRequest>>,
92) {
93 // Configure TLS for secure connection to Yellowstone gRPC server
94 let tls_config = ClientTlsConfig::new().with_native_roots();
95
96 // Track the last slot we successfully processed for gap recovery
97 // This is critical for ensuring no data is missed across reconnections
98 let mut tracked_slot: u64 = 0;
99
100 // This infinite loop ensures the client is DURABLE - it will never give up.
101 // On any error or disconnect, we simply reconnect and resume from where we left off.
102 loop {
103 // Build the gRPC client with authentication and TLS configuration
104 let builder = GeyserGrpcClient::build_from_shared(endpoint.clone())
105 .unwrap()
106 .x_token(Some(x_token.clone()))
107 .unwrap()
108 .tls_config(tls_config.clone())
109 .unwrap();
110
111 let mut client = builder.connect().await.unwrap();
112
113 // This ensures that even if we disconnect and miss updates, we can
114 // recover the missed data when we reconnect (assuming the server has
115 // retention for those slots).
116 subscribe_request.write().await.from_slot = if tracked_slot > 0 {
117 Some(tracked_slot)
118 } else {
119 None
120 };
121
122 // Attempt to subscribe with our configured request (including from_slot)
123 // Returns a tuple of (sink, stream):
124 // - sink: For sending requests to the server (pings, subscription updates)
125 // - stream: For receiving updates from the server
126 match client
127 .subscribe_with_request(Some(subscribe_request.read().await.clone()))
128 .await
129 {
130 Ok((mut subscribe_tx, mut stream)) => {
131 // This loop runs as long as the connection is healthy.
132 // It concurrently handles:
133 // 1. Dynamic subscription updates from subscribe_rx
134 // 2. Stream updates from the gRPC server
135 loop {
136 tokio::select! {
137 // Branch 1: Dynamic Subscription Management
138 //
139 // Listen for new subscription requests sent via subscribe_tx
140 // This allows runtime modification of subscriptions without
141 // disconnecting (e.g., add/remove account filters)
142 Some(subscribe_request) = subscribe_rx.recv() => {
143 // Forward the updated subscription request to the gRPC server
144 if let Err(e) = subscribe_tx.send(subscribe_request).await {
145 println!("Error sending subscribe request to grpc: {:?}", e);
146 // Connection issue - break to outer loop for reconnect
147 break;
148 }
149 }
150
151 // Branch 2: Process Stream Updates
152 Some(result) = stream.next() => {
153 match result {
154 Ok(update) => {
155 // Yellowstone gRPC servers send pings to ensure
156 // the client is alive. We must respond with a pong.
157 if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
158 if let Err(e) = subscribe_tx
159 .send(SubscribeRequest {
160 ping: Some(SubscribeRequestPing { id: 1 }),
161 ..Default::default()
162 })
163 .await
164 {
165 println!("Error sending ping to grpc: {:?}", e);
166 }
167 }
168
169 // Ping/pong are internal protocol messages,
170 // not business data, so we don't forward them
171 if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
172 continue;
173 }
174
175 // Update tracked_slot with every slot update.
176 // This is essential for gap recovery - if we
177 // disconnect, we know exactly which slot to
178 // resume from when reconnecting.
179 if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
180 tracked_slot = slot_update.slot;
181 continue; // Slot updates are tracked but not forwarded
182 }
183
184 // Send the update through the channel to the
185 // dispatcher for processing.
186 //
187 // Error handling: If the channel is full or
188 // the receiver is dropped, log and continue.
189 // This prevents ingress from blocking.
190 if let Err(e) = updates_tx.send(update.clone()).await {
191 println!("Slow consumer, dropping update {:?} with error: {:?}", update, e);
192 }
193 }
194 // Any error on the stream (network issue, server
195 // restart, etc.) triggers a reconnection attempt.
196 Err(e) => {
197 println!("Error receiving update from stream: {:?}", e);
198 break; // Break to outer loop for reconnect
199 }
200 }
201 }
202 }
203 }
204 }
205 // If we can't subscribe (e.g., auth failure, invalid request),
206 // log the error and loop back to reconnect. In production, you
207 // might want exponential backoff here to avoid hammering the server.
208 Err(e) => {
209 println!("Error subscribing to Yellowstone gRPC server: {:?}", e);
210 }
211 };
212
213 // If we reach here, the connection was lost or subscription failed.
214 // The outer loop will automatically reconnect and resume from tracked_slot.
215 // In production, consider adding:
216 // - Exponential backoff to avoid rapid reconnection attempts
217 // - Connection metrics/monitoring
218 // - Alerting on repeated failures
219 println!("Disconnected from Yellowstone gRPC server, reconnecting...");
220 }
221}
222
223// In production, this is where you would:
224// - Write to databases
225// - Forward to other services
226// - Apply complex business logic
227// - Aggregate metrics
228// - Trigger alerts
229async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
230 loop {
231 tokio::select! {
232 // This receives updates sent from the ingress loop via the channel.
233 // The bounded channel provides backpressure - if this loop is too
234 // slow, the ingress will drop updates.
235 Some(update) = updates_rx.recv() => {
236 match update.update_oneof {
237 Some(UpdateOneof::Transaction(tx)) => {
238 // Extract the transaction signature for display/logging
239 let sig = tx
240 .transaction
241 .and_then(|t| Some(Signature::try_from(t.signature.as_slice()).unwrap()))
242 .unwrap_or_default();
243 println!("Slot: {} | Signature: {}", tx.slot, sig);
244
245 // In production, you might:
246 // - Parse transaction instructions
247 // - Store in database
248 // - Check for specific program interactions
249 // - Calculate metrics
250 // - Forward to downstream services
251 }
252
253 // --------------------------------------------------------
254 // Account Updates
255 // --------------------------------------------------------
256 //
257 // Uncomment to handle account updates:
258 // Some(UpdateOneof::Account(account)) => {
259 // // TODO: Implement account updates
260 // }
261
262 // --------------------------------------------------------
263 // Block Updates
264 // --------------------------------------------------------
265 //
266 // Uncomment to handle block updates:
267 // Some(UpdateOneof::Block(block)) => {
268 // // TODO: Implement block updates
269 // }
270
271 // --------------------------------------------------------
272 // Other Updates
273 // --------------------------------------------------------
274 _ => {
275 println!("Received unknown update from Yellowstone gRPC server: {:?}", update);
276 }
277 }
278 }
279 }
280 }
281}

Contributing Examples

Found a useful pattern? Consider contributing it to these docs!