Skip to content

feat: capture write-time materialization metrics (local engine)#371

Open
Manisha4 wants to merge 1 commit into
masterfrom
feat/EAPC-22385-materialization-metrics-capture
Open

feat: capture write-time materialization metrics (local engine)#371
Manisha4 wants to merge 1 commit into
masterfrom
feat/EAPC-22385-materialization-metrics-capture

Conversation

@Manisha4

@Manisha4 Manisha4 commented Jul 1, 2026

Copy link
Copy Markdown
Collaborator

Add a write-time metrics capture layer for materialization (EAPC-22385), local compute engine + Cassandra first. Off by default; gated behind the ENABLE_MATERIALIZATION_METRICS env var.

  • MaterializationMetricsAggregator (feast/_materialization_metrics.py): accumulates rows_read_offline, rows_written_online, drop_reasons, fields_written, field_null_counts, and max_event_timestamp/lag_seconds. Invariant: rows_read - rows_written == rows_dropped == sum(drop_reasons).
  • ExecutionContext carries an optional metrics_collector; instantiated in ComputeEngine.get_execution_context only for a MaterializationTask when the env gate is on.
  • Local nodes populate it: LocalSourceReadNode (rows_read), LocalFilterNode (filter drops), LocalDedupNode (dedup drops), LocalOutputNode (rows_written, fields, null counts, freshness). All no-ops when the collector is absent.
  • The online store reaches the aggregator via a ContextVar the output node binds around the write (collecting()), so online_write_batch's signature is unchanged. Cassandra records ttl_expired / ttl_exceeds_max at its TTL skip points, best-effort.
  • Unit tests for the aggregator and the node hooks, incl. the contextvar store-drop seam.

What this PR does / why we need it:

Which issue(s) this PR fixes:

Misc

Add a write-time metrics capture layer for materialization (EAPC-22385),
local compute engine + Cassandra first. Off by default; gated behind the
ENABLE_MATERIALIZATION_METRICS env var.

- MaterializationMetricsAggregator (feast/_materialization_metrics.py):
  accumulates rows_read_offline, rows_written_online, drop_reasons,
  fields_written, field_null_counts, and max_event_timestamp/lag_seconds.
  Invariant: rows_read - rows_written == rows_dropped == sum(drop_reasons).
- ExecutionContext carries an optional metrics_collector; instantiated in
  ComputeEngine.get_execution_context only for a MaterializationTask when
  the env gate is on.
- Local nodes populate it: LocalSourceReadNode (rows_read), LocalFilterNode
  (filter drops), LocalDedupNode (dedup drops), LocalOutputNode (rows_written,
  fields, null counts, freshness). All no-ops when the collector is absent.
- The online store reaches the aggregator via a ContextVar the output node
  binds around the write (collecting()), so online_write_batch's signature is
  unchanged. Cassandra records ttl_expired / ttl_exceeds_max at its TTL skip
  points, best-effort.
- Unit tests for the aggregator and the node hooks, incl. the contextvar
  store-drop seam.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant