Configuring Kafka Event Listeners
The Thunder component of the X recommendation algorithm relies on real-time data ingestion via Kafka to keep its in-memory post store synchronized with the latest activity. By configuring Kafka event listeners, you ensure that the "In-Network" feed (content from accounts a user follows) remains fresh and accurate.
This guide walks you through setting up and configuring these listeners within the Thunder service.
Overview
Thunder uses Kafka consumers to listen to two primary types of events:
- Tweet Events: Raw activity from the global X corpus.
- In-Network Events: Refined events specifically used to populate the in-memory cache for followed accounts.
The configuration is managed primarily through CLI arguments and environment variables which populate the KafkaConsumerConfig and KafkaProducerConfig structures.
Prerequisites
- A running Kafka cluster accessible from your Thunder deployment.
- SASL/SSL credentials for your Kafka brokers.
- The Thunder binary built from the
x-algorithmrepository.
Step 1: Configure Authentication
Thunder uses SASL for secure communication with Kafka. You can provide these credentials via environment variables or CLI flags. Environment variables are recommended for production environments.
Set the following variables in your environment:
# Consumer credentials
export KAFKA_SASL_USER="your_username"
export KAFKA_SASL_PASSWORD="your_password"
# Producer credentials (if different)
export KAFKA_PRODUCER_SASL_USER="your_producer_username"
export KAFKA_PRODUCER_SASL_PASSWORD="your_producer_password"
Step 2: Define Consumer Settings
When launching Thunder, you must specify how the Kafka consumer behaves. Key parameters include the group ID, offset management, and fetch limits.
| Argument | Description | Default |
| :--- | :--- | :--- |
| --kafka-group-id | The consumer group ID for this instance. | Required |
| --auto-offset-reset | Where to start if no offset is found (smallest, largest). | largest |
| --fetch-timeout-ms | Max time to wait for a batch of messages. | 1000ms |
| --kafka-num-threads | Number of concurrent processing threads. | 1 |
Step 3: Launch the Listener
The listener behavior changes depending on whether the service is in Serving Mode or Ingestion Mode.
Option A: Serving Mode (--is-serving)
In serving mode, Thunder starts a "V2" listener that hydrates the PostStore with in-network events. It generates a unique ID for the consumer group to ensure the instance gets its own view of the data.
./thunder \
--is-serving \
--grpc-port 50051 \
--kafka-group-id thunder-serving \
--auto-offset-reset smallest \
--in-network-events-consumer-dest "kafka-broker:9092"
Option B: Ingestion/Background Mode
If --is-serving is false, Thunder acts as a processor, moving raw tweet events into the refined in-network event topics.
./thunder \
--kafka-group-id thunder-processor \
--sasl-mechanism PLAIN \
--security-protocol SASL_SSL
Step 4: Fine-tuning Batch Processing
To optimize throughput, you can adjust the partition fetch limits in the configuration. By default, Thunder is configured to handle large payloads (up to 100MB for V2 events) to accommodate high-velocity data.
If you are seeing "Failed to parse Kafka message" errors in your logs, verify that your deserializer logic matches the schema of the topic defined in thunder/kafka_utils.rs.
Step 5: Verifying Connection and Health
Once the service starts, you can monitor the initialization progress in the logs. Thunder will wait for a "catchup" signal from the Kafka threads before declaring the gRPC server ready.
Expected Log Output
INFO thunder::main: Initialized PostStore for in-memory post storage
INFO thunder::kafka_utils: Starting Kafka background tasks
INFO thunder::main: Kafka init took 45.2s
INFO thunder::main: HTTP/gRPC server is ready
Monitoring Metrics
Thunder exports Prometheus-compatible metrics to track consumer health:
kafka_messages_failed_parse: Increments if a message doesn't match the expected schema.batch_processing_time: Measures the latency of deserializing and storing a Kafka batch.
Troubleshooting
- Consumer Lag: If the
PostStoreis missing recent tweets, increase--kafka-num-threadsor check themax_partition_fetch_bytessetting to ensure you aren't hitting throughput bottlenecks. - Authentication Errors: Ensure the
sasl_mechanism(e.g.,SCRAM-SHA-512orPLAIN) matches your Kafka broker's configuration. - Missing Data on Restart: If you want a new instance to replay history, set
--auto-offset-reset smallest. Otherwise, it will only process new events produced while the service is running.