Ingesting Posts with Thunder
The Thunder component is responsible for processing and storing "In-Network" content—posts from accounts a user explicitly follows. Unlike the out-of-network pipeline which relies on global indices, Thunder maintains a high-performance, in-memory store of recent tweets retrieved via Kafka event streams.
This guide walks you through configuring the Kafka listeners and the PostStore to ingest and track real-time tweet events.
1. Configuring the PostStore
The PostStore is an in-memory repository that holds recent posts for rapid retrieval. Because it resides in RAM, you must configure a retention policy to balance memory usage and data freshness.
To initialize the store, define the retention period (in seconds) and the internal request timeout:
use std::sync::Arc;
use thunder::posts::post_store::PostStore;
// Example: 2-day retention (172,800 seconds)
let post_retention_seconds = 172_800;
let request_timeout_ms = 200;
let post_store = Arc::new(PostStore::new(
post_retention_seconds,
request_timeout_ms,
));
2. Setting Up Kafka Listeners
Thunder uses Kafka consumers to listen for tweet events. Configuration is split into two primary modes based on the is_serving flag:
- Serving Mode (
is_serving: true): The instance focuses on responding to gRPC queries and listens to a refined "In-Network" event stream. - Ingestion Mode (
is_serving: false): The instance processes raw tweet events and prepares them for the system.
Define Consumer Configuration
You must provide a KafkaConsumerConfig which includes connection details, SASL credentials for security, and the target topic.
use xai_kafka::config::{KafkaConfig, KafkaConsumerConfig, SslConfig};
let consumer_config = KafkaConsumerConfig {
base_config: KafkaConfig {
dest: "kafka-cluster-destination".to_string(),
topic: "tweet-events-v2".to_string(),
ssl: Some(SslConfig {
security_protocol: "SASL_SSL".to_string(),
sasl_mechanism: Some("SCRAM-SHA-512".to_string()),
sasl_username: Some("your_username".into()),
sasl_password: Some("your_password".into()),
}),
..Default::default()
},
group_id: "thunder-ingestion-group".to_string(),
auto_offset_reset: "latest".to_string(),
..Default::default()
};
3. Launching the Ingestion Pipeline
Once the PostStore and Kafka configs are ready, use the start_kafka utility to begin processing. This function spawns background tasks that deserialize incoming Kafka messages and populate the PostStore.
use thunder::kafka_utils;
// tx is a mpsc channel used to coordinate startup signals
let (tx, mut rx) = tokio::sync::mpsc::channel::<i64>(num_threads);
kafka_utils::start_kafka(
&args,
post_store.clone(),
"user_context_id",
tx
).await?;
Kafka Catchup
When starting a new Thunder instance, it is critical to wait for the Kafka consumer to "catch up" to the latest events before marking the service as ready. In the main loop, you should block until the initialization signals are received:
// Wait for each Kafka thread to signal it has reached the head of the stream
for _ in 0..args.kafka_num_threads {
rx.recv().await;
}
// Finalize store and start maintenance tasks
post_store.finalize_init().await?;
post_store.start_auto_trim(2); // Remove expired posts every 2 minutes
4. Handling Following Lists with Strato
Thunder needs to know which authors a user follows to filter the PostStore effectively. This is handled by the StratoClient.
When a GetInNetworkPosts request arrives, the ThunderServiceImpl coordinates between the following list (from Strato) and the tweets (from the PostStore).
use thunder::strato_client::StratoClient;
use thunder::thunder_service::ThunderServiceImpl;
let strato_client = Arc::new(StratoClient::new());
let thunder_service = ThunderServiceImpl::new(
Arc::clone(&post_store),
Arc::clone(&strato_client),
args.max_concurrent_requests,
);
5. Monitoring Ingestion Health
Thunder exports several metrics to track the health of the ingestion pipeline. Monitor these values to ensure posts are being ingested correctly:
GET_IN_NETWORK_POSTS_FOUND_FRESHNESS_SECONDS: The age of the most recent post found in the store. High values indicate Kafka lag.KAFKA_MESSAGES_FAILED_PARSE: Increments if incoming payloads do not match the expected schema.POST_STORE_STATS: Periodically logged to show total posts in memory and current RAM utilization.
Verification Example
To verify ingestion is working, check the logs after startup. You should see the PostStore stats logger confirming the population of the cache:
INFO: Initialized PostStore for in-memory post storage (retention: 172800 seconds)
INFO: Kafka init took 45.2s
INFO: Started PostStore auto-trim task (interval: 2 minutes)