Handling Pipeline Side Effects
In the X recommendation engine, Side Effects are operations performed outside the primary retrieval and ranking flow. While the core pipeline focuses on generating and scoring candidates using the Phoenix transformer, side effects handle critical "leaf" tasks such as logging impressions, caching request metadata, and updating feature stores.
This guide walks you through implementing and integrating a side effect into the Home Mixer or Candidate Pipeline.
Understanding the Side Effect Lifecycle
Side effects are typically executed at specific hooks within the Home Mixer orchestration layer:
- Pre-Retrieval: Capturing state before candidates are fetched.
- Post-Scoring: Logging the scores assigned by Phoenix for later analysis.
- Post-Filtering: Recording exactly what the user saw (Impressions) after heuristics and filters were applied.
Step 1: Define Your Side Effect Logic
Side effects in the x-algorithm are modular components. To create a new one, you must implement the logic required to handle request context and the resulting candidate list.
Create a new file (e.g., home-mixer/side_effects/impression_logger.rs) and define your execution logic.
// Example: A side effect that logs the top-ranked post IDs
pub struct ImpressionLogger {
// Client for external logging service or database
client: LogClient,
}
impl ImpressionLogger {
pub async fn execute(&self, user_id: u64, posts: &[ScoredPost]) -> anyhow::Result<()> {
let post_ids: Vec<i64> = posts.iter().map(|p| p.id).collect();
// Asynchronously send to an external store
self.client.log_impressions(user_id, post_ids).await?;
Ok(())
}
}
Step 2: Integrate into the Home Mixer
The HomeMixerServer (found in home-mixer/lib.rs) acts as the orchestration layer. To ensure your side effect runs during a request, you must register it within the service's request handler.
- Open
home-mixer/server.rs. - Locate the
get_scored_postsmethod. - Insert your side effect call after the ranking or filtering stage.
pub async fn get_scored_posts(&self, req: Request<GetPostsRequest>) -> Status {
let user_id = req.user_id;
// 1. Retrieval & Scoring
let candidates = self.candidate_pipeline.get_candidates(user_id).await?;
let scored_posts = self.phoenix_ranker.score(candidates).await?;
// 2. Filtering
let final_feed = self.filters.apply(scored_posts);
// 3. Side Effect Execution
// We run this asynchronously to avoid blocking the user's response
let logger = self.impression_logger.clone();
let feed_to_log = final_feed.clone();
tokio::spawn(async move {
if let Err(e) = logger.execute(user_id, &feed_to_log).await {
log::error!("Failed to log side effect: {}", e);
}
});
Response::new(final_feed)
}
Step 3: Configure Request Caching
For performance monitoring and debugging, the system supports caching request information (like the User Action Sequence) so it can be tied to the final output.
To enable request info caching:
- Initialize the Cache: Ensure the
HomeMixerServerhas a reference to a TTL-based cache (usually configured inmain.rs). - Store Query Hydration: During the Query Hydration phase (see System Architecture), store the retrieved user features using the
request_idas the key. - Retrieve in Side Effect: Use that
request_idin your side effect to log exactly what features led to a specific ranking.
// Inside a side effect handler
let user_features = self.request_cache.get(&request_id).await;
self.logger.log_debug_info(request_id, user_features, final_results).await;
Best Practices
Non-Blocking Execution
Side effects should almost always be "fire-and-forget." Use tokio::spawn or a background task processor so that logging or caching latency does not delay the "For You" feed delivery to the user.
Observability
All side effects should report their own health. Use the metrics module (similar to the implementation in thunder/metrics.rs) to track:
- Execution Latency: How long the side effect takes to process.
- Success/Failure Rate: Important for identifying if your logging or caching backend is down.
Data Minimization
When caching request info or logging side effects, only store the post_id and the score. Avoid re-logging the full post content or sensitive user PII, as these should be looked up from the primary PostStore using the ID.
Troubleshooting
- Missing Side Effects in Logs: Check the
CancellationTokeninmain.rs. If the server is shutting down, background tasks spawned viatokio::spawnmay be terminated before they finish. - Performance Degradation: If the pipeline slows down after adding a side effect, ensure you aren't accidentally
await-ing the side effect inside the main request path.