# Code Examples

> Practical Rust examples for Yellowstone gRPC

> For the complete documentation index, see [llms.txt](/docs/llms.txt).

This page provides practical code examples for common Yellowstone gRPC use cases. All examples are in Rust and include placeholders that will be filled with complete working code.

## Prerequisites and authentication

See [Quickstart](/docs/reference/yellowstone-grpc-quickstart) for prerequisites & authentication.

## Basic subscription that streams all transactions

```rust
use anyhow::Result;
use futures::{sink::SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::collections::HashMap;
use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
use yellowstone_grpc_proto::geyser::{
    CommitmentLevel, SubscribeRequest, SubscribeRequestFilterTransactions,
    subscribe_update::UpdateOneof,
};
#[tokio::main]
async fn main() -> Result<()> {
    // Set up connection parameters
    let endpoint = "https://solana-mainnet.g.alchemy.com";
    let x_token = "ALCHEMY_API_KEY"; // Replace with your Alchemy API key
    
    // Build and connect the gRPC client with TLS and authentication
    let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
        .tls_config(ClientTlsConfig::new().with_native_roots())?
        .x_token(Some(x_token))? // API key passed as X-Token header
        .connect()
        .await?;
    // Create a bidirectional stream for subscribing to updates
    let (mut tx, mut stream) = client.subscribe().await?;
    // Send subscription request to filter for non-vote, non-failed transactions
    tx.send(SubscribeRequest {
        transactions: HashMap::from([(
            "all_transactions".to_string(),
            SubscribeRequestFilterTransactions {
                vote: Some(false),   // Exclude vote transactions
                failed: Some(false), // Exclude failed transactions
                ..Default::default()
            },
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level
        ..Default::default()
    })
    .await?;
    // Process incoming transaction updates
    while let Some(Ok(msg)) = stream.next().await {
        if let Some(UpdateOneof::Transaction(tx)) = msg.update_oneof {
            // Extract and display transaction signature
            let sig = tx
                .transaction
                .and_then(|t| Some(Signature::try_from(t.signature.as_slice()).unwrap()))
                .unwrap_or_default();
            println!("Slot: {} | Signature: {}", tx.slot, sig);
        }
    }
    Ok(())
}
```

## Durable client

Implement a robust client with reconnection logic & gap recovery with `from_slot`:

```rust
use anyhow::Result;
use futures::stream::StreamExt;
use std::collections::HashMap;
use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
use yellowstone_grpc_proto::geyser::{
    CommitmentLevel, SlotStatus, SubscribeRequest, SubscribeRequestFilterSlots,
    subscribe_update::UpdateOneof,
};

#[tokio::main]
async fn main() -> Result<()> {
    // Alchemy Solana Mainnet endpoint
    let endpoint = "https://solana-mainnet.g.alchemy.com";
    let x_token = "ALCHEMY_API_KEY"; // Replace with your Alchemy API key

    // Build a subscription request to receive slot updates
    // We subscribe to all slots with confirmed commitment level
    let mut subscribe_request = SubscribeRequest {
        slots: HashMap::from([(
            "slots".to_string(),
            SubscribeRequestFilterSlots {
                ..Default::default()
            },
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    };

    // Track the latest slot we've seen for gap recovery
    let mut tracked_slot: u64 = 0;

    // Main reconnection loop - continuously reconnect
    loop {
        // Build and connect to the Yellowstone gRPC client
        let mut client = GeyserGrpcClient::build_from_shared(endpoint)?
            .tls_config(ClientTlsConfig::new().with_native_roots())?
            .x_token(Some(x_token))?
            .connect()
            .await?;

        // This ensures that even if we disconnect and miss updates, we can
        // recover the missed data when we reconnect (assuming the server has
        // retention for those slots).
        if tracked_slot > 0 {
            subscribe_request.from_slot = Some(tracked_slot);
        } else {
            subscribe_request.from_slot = None;
        }

        // Subscribe to slot updates with our configured request
        let (mut _update_subscription, mut stream) = client
            .subscribe_with_request(Some(subscribe_request.clone()))
            .await?;

        // Process incoming slot updates until an error occurs
        loop {
            match stream.next().await {
                Some(Ok(msg)) => {
                    // Extract and print slot information if this update contains slot data
                    if let Some(UpdateOneof::Slot(slot)) = msg.update_oneof {
                        println!(
                            "Received slot: {} with status {:?}",
                            slot.slot,
                            SlotStatus::try_from(slot.status).unwrap()
                        );
                        // Update our tracked slot to use for the next reconnection
                        tracked_slot = slot.slot;
                    }
                }
                Some(Err(e)) => {
                    // On error, log and break to reconnect (outer loop will restart)
                    println!("Error receiving updates: {}", e);
                    break;
                }
                _ => {}
            }
        }
    }
}
```

## Stream accounts

### Stream token accounts by mint

```rust
SubscribeRequest {
    accounts: HashMap::from([(
        "token_accounts_by_mint".to_string(),
        SubscribeRequestFilterAccounts {
            filters: vec![
                // Filter for token accounts
                SubscribeRequestFilterAccountsFilter {
                    filter: Some(Filter::TokenAccountState(true)),
                },
                // Filter for the specific mint based on the token account structure
                SubscribeRequestFilterAccountsFilter {
                    filter: Some(Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
                        offset: 0, // Mint is located at the beginning of the account data
                        data: Some(Data::Base58(
                            "pumpCmXqMfrsAkQ5r49WcJnRayYRqmXz6ae8H7H9Dfn".to_string(), // Replace with the specific mint address
                        )),
                    })),
                },
            ],
            ..Default::default()
        },
    )]),
    commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level
    ..Default::default()
}
```

## Stream transactions

### Stream successful non-vote transactions

```rust
SubscribeRequest {
    transactions: HashMap::from([(
        "all_transactions".to_string(),
        SubscribeRequestFilterTransactions {
            vote: Some(false),   // Exclude vote transactions
            failed: Some(false), // Exclude failed transactions
            ..Default::default()
        },
    )]),
    commitment: Some(CommitmentLevel::Confirmed as i32), // Use confirmed commitment level
    ..Default::default()
}
```

## Production-grade client

* Durable client
* Gap recovery with `from_slot`
* Async processing with tokio channels
* Dynamic management of subscription
* Error handling

```rust
use futures::{SinkExt, stream::StreamExt};
use solana_signature::Signature;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, mpsc};
use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
use yellowstone_grpc_proto::geyser::{
    CommitmentLevel, SubscribeRequest, SubscribeRequestFilterSlots,
    SubscribeRequestFilterTransactions, SubscribeRequestPing, SubscribeUpdate,
    subscribe_update::UpdateOneof,
};

// Channel capacity for buffering updates from the ingress loop to the dispatcher.
// This allows the ingress loop to continue receiving data even if the dispatcher
// is temporarily slower, preventing backpressure to the gRPC stream.
const UPDATE_CHANNEL_CAPACITY: usize = 10000;

#[tokio::main]
async fn main() {
    // Configure the Yellowstone gRPC endpoint and authentication token
    // In production, these should be loaded from environment variables or a config file
    let endpoint = "https://solana-mainnet.g.alchemy.com".to_string();
    let x_token = "ALCHEMY_API_KEY".to_string(); // Replace with your Alchemy API key

    // This is the initial subscription request. This can be updated and sent to the ingress loop through the `subscribe_rx` channel.
    let subscribe_request = Arc::new(RwLock::new(SubscribeRequest {
        slots: HashMap::from([(
            "slots".to_string(),
            SubscribeRequestFilterSlots {
                ..Default::default()
            },
        )]),
        transactions: HashMap::from([(
            "all_transactions".to_string(),
            SubscribeRequestFilterTransactions {
                vote: Some(false),   // Exclude vote transactions
                failed: Some(false), // Exclude failed transactions
                ..Default::default()
            },
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        // from_slot will be set dynamically in the ingress loop for gap recovery
        ..Default::default()
    }));

    // Create a bounded channel for updates from ingress to dispatcher.
    // Ingress will drop updates if the channel is full.
    let (updates_tx, updates_rx) = mpsc::channel::<SubscribeUpdate>(UPDATE_CHANNEL_CAPACITY);

    // Create an unbounded channel for dynamic subscription updates.
    // This allows any part of your application to send new subscription requests
    // to modify what data you're receiving at runtime (e.g., add new account filters).
    let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel::<SubscribeRequest>();

    // Spawn the dispatcher task: processes updates and implements business logic
    // In production, this could write to a database, forward to other services, etc.
    let dispatcher_handle = tokio::spawn(async move {
        dispatcher_loop(updates_rx).await;
    });

    // Spawn the ingress task: manages gRPC connection, reconnection, and gap recovery
    let ingress_handle = tokio::spawn(async move {
        ingress_loop(
            updates_tx,
            subscribe_rx,
            endpoint,
            x_token,
            subscribe_request,
        )
        .await;
    });

    // Wait for both tasks to complete (they run indefinitely in this example)
    // In production, you might want to handle graceful shutdown with signal handlers
    if let Err(e) = tokio::join!(dispatcher_handle, ingress_handle).0 {
        println!("Error: {:?}", e);
    }
}

// - Automatic reconnection on any error or disconnect
// - Gap recovery using from_slot to resume from last known position
// - Ping/pong handling to keep connection alive
// - Dynamic subscription management via subscribe_rx channel
//
// The outer loop ensures the client never stops - it will continuously
// attempt to reconnect if the connection drops for any reason.
async fn ingress_loop(
    updates_tx: mpsc::Sender<SubscribeUpdate>,
    mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequest>,
    endpoint: String,
    x_token: String,
    subscribe_request: Arc<RwLock<SubscribeRequest>>,
) {
    // Configure TLS for secure connection to Yellowstone gRPC server
    let tls_config = ClientTlsConfig::new().with_native_roots();

    // Track the last slot we successfully processed for gap recovery
    // This is critical for ensuring no data is missed across reconnections
    let mut tracked_slot: u64 = 0;

    // This infinite loop ensures the client is DURABLE - it will never give up.
    // On any error or disconnect, we simply reconnect and resume from where we left off.
    loop {
        // Build the gRPC client with authentication and TLS configuration
        let builder = GeyserGrpcClient::build_from_shared(endpoint.clone())
            .unwrap()
            .x_token(Some(x_token.clone()))
            .unwrap()
            .tls_config(tls_config.clone())
            .unwrap();

        let mut client = builder.connect().await.unwrap();

        // This ensures that even if we disconnect and miss updates, we can
        // recover the missed data when we reconnect (assuming the server has
        // retention for those slots).
        subscribe_request.write().await.from_slot = if tracked_slot > 0 {
            Some(tracked_slot)
        } else {
            None
        };

        // Attempt to subscribe with our configured request (including from_slot)
        // Returns a tuple of (sink, stream):
        // - sink: For sending requests to the server (pings, subscription updates)
        // - stream: For receiving updates from the server
        match client
            .subscribe_with_request(Some(subscribe_request.read().await.clone()))
            .await
        {
            Ok((mut subscribe_tx, mut stream)) => {
                // This loop runs as long as the connection is healthy.
                // It concurrently handles:
                // 1. Dynamic subscription updates from subscribe_rx
                // 2. Stream updates from the gRPC server
                loop {
                    tokio::select! {
                        // Branch 1: Dynamic Subscription Management
                        //
                        // Listen for new subscription requests sent via subscribe_tx
                        // This allows runtime modification of subscriptions without
                        // disconnecting (e.g., add/remove account filters)
                        Some(subscribe_request) = subscribe_rx.recv() => {
                            // Forward the updated subscription request to the gRPC server
                            if let Err(e) = subscribe_tx.send(subscribe_request).await {
                                println!("Error sending subscribe request to grpc: {:?}", e);
                                // Connection issue - break to outer loop for reconnect
                                break;
                            }
                        }

                        // Branch 2: Process Stream Updates
                        Some(result) = stream.next() => {
                            match result {
                                Ok(update) => {
                                    // Yellowstone gRPC servers send pings to ensure
                                    // the client is alive. We must respond with a pong.
                                    if matches!(update.update_oneof, Some(UpdateOneof::Ping(_))) {
                                        if let Err(e) = subscribe_tx
                                            .send(SubscribeRequest {
                                                ping: Some(SubscribeRequestPing { id: 1 }),
                                                ..Default::default()
                                            })
                                            .await
                                        {
                                            println!("Error sending ping to grpc: {:?}", e);
                                        }
                                    }

                                    // Ping/pong are internal protocol messages,
                                    // not business data, so we don't forward them
                                    if matches!(update.update_oneof, Some(UpdateOneof::Ping(_)) | Some(UpdateOneof::Pong(_))) {
                                        continue;
                                    }

                                    // Update tracked_slot with every slot update.
                                    // This is essential for gap recovery - if we
                                    // disconnect, we know exactly which slot to
                                    // resume from when reconnecting.
                                    if let Some(UpdateOneof::Slot(ref slot_update)) = update.update_oneof {
                                        tracked_slot = slot_update.slot;
                                        continue; // Slot updates are tracked but not forwarded
                                    }

                                    // Send the update through the channel to the
                                    // dispatcher for processing.
                                    //
                                    // Error handling: If the channel is full or
                                    // the receiver is dropped, log and continue.
                                    // This prevents ingress from blocking.
                                    if let Err(e) = updates_tx.send(update.clone()).await {
                                        println!("Slow consumer, dropping update {:?} with error: {:?}", update, e);
                                    }
                                }
                                // Any error on the stream (network issue, server
                                // restart, etc.) triggers a reconnection attempt.
                                Err(e) => {
                                    println!("Error receiving update from stream: {:?}", e);
                                    break; // Break to outer loop for reconnect
                                }
                            }
                        }
                    }
                }
            }
            // If we can't subscribe (e.g., auth failure, invalid request),
            // log the error and loop back to reconnect. In production, you
            // might want exponential backoff here to avoid hammering the server.
            Err(e) => {
                println!("Error subscribing to Yellowstone gRPC server: {:?}", e);
            }
        };

        // If we reach here, the connection was lost or subscription failed.
        // The outer loop will automatically reconnect and resume from tracked_slot.
        // In production, consider adding:
        // - Exponential backoff to avoid rapid reconnection attempts
        // - Connection metrics/monitoring
        // - Alerting on repeated failures
        println!("Disconnected from Yellowstone gRPC server, reconnecting...");
    }
}

// In production, this is where you would:
// - Write to databases
// - Forward to other services
// - Apply complex business logic
// - Aggregate metrics
// - Trigger alerts
async fn dispatcher_loop(mut updates_rx: mpsc::Receiver<SubscribeUpdate>) {
    loop {
        tokio::select! {
            // This receives updates sent from the ingress loop via the channel.
            // The bounded channel provides backpressure - if this loop is too
            // slow, the ingress will drop updates.
            Some(update) = updates_rx.recv() => {
                match update.update_oneof {
                    Some(UpdateOneof::Transaction(tx)) => {
                        // Extract the transaction signature for display/logging
                        let sig = tx
                            .transaction
                            .and_then(|t| Some(Signature::try_from(t.signature.as_slice()).unwrap()))
                            .unwrap_or_default();
                        println!("Slot: {} | Signature: {}", tx.slot, sig);

                        // In production, you might:
                        // - Parse transaction instructions
                        // - Store in database
                        // - Check for specific program interactions
                        // - Calculate metrics
                        // - Forward to downstream services
                    }

                    // --------------------------------------------------------
                    // Account Updates
                    // --------------------------------------------------------
                    //
                    // Uncomment to handle account updates:
                    // Some(UpdateOneof::Account(account)) => {
                    //     // TODO: Implement account updates
                    // }

                    // --------------------------------------------------------
                    // Block Updates
                    // --------------------------------------------------------
                    //
                    // Uncomment to handle block updates:
                    // Some(UpdateOneof::Block(block)) => {
                    //     // TODO: Implement block updates
                    // }

                    // --------------------------------------------------------
                    // Other Updates
                    // --------------------------------------------------------
                    _ => {
                        println!("Received unknown update from Yellowstone gRPC server: {:?}", update);
                    }
                }
            }
        }
    }
}
```

## Contributing examples

Found a useful pattern? Consider contributing it to these [docs](https://github.com/alchemyplatform/docs/)!