Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ option java_package = "feast.proto.core";

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";
import "feast/core/Feature.proto";
Expand Down Expand Up @@ -231,6 +232,73 @@ message DataSource {

// Date Format of date partition column (e.g. %Y-%m-%d)
string date_partition_column_format = 5;

// Declarative data quality constraints, keyed by column name. Columns
// without an entry are not validated. Meant to be consumed by external
// feature engineering jobs.
map<string, FieldConstraints> field_constraints = 6;
}

// Imputation strategy for filling null values in a column during feature engineering
message Imputation {
enum Strategy {
STRATEGY_UNSPECIFIED = 0;
// Fill nulls with a customer-supplied constant (default_value).
DEFAULT = 1;
// Fill nulls with the mean of the current run's data for this column.
MEAN = 2;
// Fill nulls with the median of the current run's data for this column.
MEDIAN = 3;
}

Strategy strategy = 1;

// Required when strategy == DEFAULT. Typed via oneof so a string
// column cannot accidentally receive a numeric default at registration
// time. Ignored for MEAN / MEDIAN.
oneof default_value {
double default_double = 2;
int64 default_long = 3;
string default_string = 4;
bool default_bool = 5;
}
}

// Declarative data quality constraints attached to one column on a
// SparkSource. Only the fields the customer sets are enforced; unset
// fields are not validated.
message FieldConstraints {
// Null handling.
// false => no nulls allowed; true (or unset) => any null rate allowed
google.protobuf.BoolValue nullable = 1;
// e.g. 0.01 = allow up to 1% nulls
google.protobuf.DoubleValue max_null_pct = 2;

// Numeric range (numeric columns).
google.protobuf.DoubleValue min_value = 3;
google.protobuf.DoubleValue max_value = 4;

// Minimum row-level compliance for value/range/regex/allowed-values
// checks. Default 1.0 (strict). Set below 1.0 when the underlying data
// is known-noisy (e.g. floating-point ratios that drift past [0,1] by
// ULP-scale rounding error).
google.protobuf.DoubleValue min_compliance = 5;

// Categorical (string/enum columns).
repeated string allowed_values = 6;

// Pattern match (string columns). Regex must compile. Empty == unset.
string regex = 7;

// Uniqueness contract.
google.protobuf.BoolValue unique = 8;

// Escape hatch for library-specific checks. Standard checks belong in typed fields.
map<string, string> custom = 9;

// Optional null-imputation strategy applied to this column before
// validation runs. Unset == no imputation.
Imputation imputation = 10;
}

// Defines configuration for custom third-party data sources.
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .feature_store import FeatureStore
from .feature_view import FeatureView
from .field import Field
from .field_constraints import FieldConstraints, Imputation
from .on_demand_feature_view import OnDemandFeatureView
from .project import Project
from .repo_config import RepoConfig
Expand All @@ -44,6 +45,8 @@
"FeastDataFrame",
"Feature",
"Field",
"FieldConstraints",
"Imputation",
"FeatureService",
"FeatureStore",
"FeatureView",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
JsonFormatModel,
ProtoFormatModel,
)
from feast.field_constraints import FieldConstraints
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
Expand Down Expand Up @@ -133,6 +134,8 @@ class SparkSourceModel(DataSourceModel):
tags: Optional[Dict[str, str]] = None
owner: Optional[str] = ""
timestamp_field: Optional[str] = None
# Declarative data quality rules keyed by column name.
field_constraints: Optional[Dict[str, FieldConstraints]] = None

def to_data_source(self) -> SparkSource:
"""
Expand All @@ -153,6 +156,7 @@ def to_data_source(self) -> SparkSource:
tags=self.tags,
owner=self.owner,
timestamp_field=self.timestamp_field,
field_constraints=self.field_constraints,
)
return self._attach_timestamps(source)

Expand Down Expand Up @@ -181,6 +185,7 @@ def from_data_source(
timestamp_field=data_source.timestamp_field,
created_timestamp=data_source.created_timestamp,
last_updated_timestamp=data_source.last_updated_timestamp,
field_constraints=data_source.field_constraints,
)


Expand Down
256 changes: 256 additions & 0 deletions sdk/python/feast/field_constraints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
"""
Declarative data quality constraints for a SparkSource's columns.

Customers attach a `Dict[str, FieldConstraints]` to a `SparkSource` keyed by
column name. The constraints are stored on the source's proto, round-tripped
through the registry, and consumed at write time.

Only fields the customer sets are enforced. Unset fields are not validated.
"""

import re
from typing import Any, Dict, List, Optional, Union

from google.protobuf.wrappers_pb2 import BoolValue, DoubleValue
from pydantic import BaseModel, ConfigDict, field_validator, model_validator

from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto

# Both messages are nested under the `DataSource` proto message in
# DataSource.proto (alongside SparkOptions, KafkaOptions, etc.). Alias them
# at module load time so the rest of this file reads cleanly.
FieldConstraintsProto = DataSourceProto.FieldConstraints
ImputationProto = DataSourceProto.Imputation


_IMPUTATION_STRATEGIES = ("default", "mean", "median")


class Imputation(BaseModel):
"""How to fill null values in a column before validation runs."""

model_config = ConfigDict(extra="forbid")
strategy: str
# Required when strategy == "default"; ignored otherwise.
default_value: Optional[Union[float, int, str, bool]] = None

@field_validator("strategy")
@classmethod
def _strategy_known(cls, v: str) -> str:
if v not in _IMPUTATION_STRATEGIES:
raise ValueError(
f"imputation.strategy must be one of {list(_IMPUTATION_STRATEGIES)}, "
f"got {v!r}"
)
return v

@model_validator(mode="after")
def _check_default_value(self) -> "Imputation":
if self.strategy == "default" and self.default_value is None:
raise ValueError(
"imputation.default_value is required when strategy='default'"
)
if self.strategy in ("mean", "median") and self.default_value is not None:
raise ValueError(
"imputation.default_value is only valid with strategy='default'; "
f"got strategy='{self.strategy}'"
)
return self

def to_proto(self) -> ImputationProto:
strategy_enum = {
"default": ImputationProto.DEFAULT,
"mean": ImputationProto.MEAN,
"median": ImputationProto.MEDIAN,
}[self.strategy]

kwargs: Dict[str, Any] = {"strategy": strategy_enum}
if self.strategy == "default":
# bool must be checked before int since `isinstance(True, int)` is True.
if isinstance(self.default_value, bool):
kwargs["default_bool"] = self.default_value
elif isinstance(self.default_value, int):
kwargs["default_long"] = self.default_value
elif isinstance(self.default_value, float):
kwargs["default_double"] = self.default_value
elif isinstance(self.default_value, str):
kwargs["default_string"] = self.default_value
else:
raise ValueError(
f"unsupported default_value type: {type(self.default_value).__name__}"
)
return ImputationProto(**kwargs)

@classmethod
def from_proto(cls, proto: ImputationProto) -> "Imputation":
strategy_name = {
ImputationProto.DEFAULT: "default",
ImputationProto.MEAN: "mean",
ImputationProto.MEDIAN: "median",
}.get(proto.strategy)
if strategy_name is None:
raise ValueError(
f"unknown imputation strategy proto value: {proto.strategy}"
)

default_value: Optional[Union[float, int, str, bool]] = None
if strategy_name == "default":
which = proto.WhichOneof("default_value")
if which == "default_bool":
default_value = proto.default_bool
elif which == "default_long":
default_value = proto.default_long
elif which == "default_double":
default_value = proto.default_double
elif which == "default_string":
default_value = proto.default_string

return cls(strategy=strategy_name, default_value=default_value)


class FieldConstraints(BaseModel):
"""
Per-column data quality rules. Attached to a `SparkSource` as
`field_constraints: Dict[str, FieldConstraints]`, keyed by column name.

Only the fields the customer sets are enforced; unset fields are not
validated.
"""

model_config = ConfigDict(extra="forbid")

nullable: Optional[bool] = None
max_null_pct: Optional[float] = None
min_value: Optional[float] = None
max_value: Optional[float] = None
# Minimum row-level compliance for value/range/regex/allowed-values
# checks. Default 1.0 (strict) — every non-null row must satisfy the
# predicate. Set below 1.0 when the underlying data is known-noisy
# (e.g. floating-point ratios that drift past `[0, 1]` by ULP-scale
# rounding error).
min_compliance: Optional[float] = None
allowed_values: Optional[List[str]] = None
regex: Optional[str] = None
unique: Optional[bool] = None
custom: Optional[Dict[str, str]] = None

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every other field validates at definition time (regex compiles, ranges in [0,1], etc.), but custom predicates are stored as-is, so a bad/empty predicate only fails at the next FE run. Could we reject empty predicate strings here? Maybe a non-empty check would catch the common typo early.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a method _custom_nonempty that checks that the predicate and name are non-empty. Other invalid custom sql expressions will still be accepted by the registry, but we dont have a spark session at the feast apply stage, so basic syntax checks seem like the best we can do for now. The upstream feature engineering job will (likely) have a spark session to perform that check.

imputation: Optional[Imputation] = None

@field_validator("max_null_pct")
@classmethod
def _max_null_pct_range(cls, v: Optional[float]) -> Optional[float]:
if v is not None and not 0.0 <= v <= 1.0:
raise ValueError(f"max_null_pct must be in [0, 1], got {v}")
return v

@field_validator("min_compliance")
@classmethod
def _min_compliance_range(cls, v: Optional[float]) -> Optional[float]:
if v is not None and not 0.0 <= v <= 1.0:
raise ValueError(f"min_compliance must be in [0, 1], got {v}")
return v

@field_validator("regex")
@classmethod
def _regex_compiles(cls, v: Optional[str]) -> Optional[str]:
if v is not None:
if v.strip() == "":
raise ValueError("regex must not be empty if set")
try:
re.compile(v)
except re.error as e:
raise ValueError(f"regex does not compile: {e}") from e
return v

@field_validator("allowed_values")
@classmethod
def _allowed_values_nonempty(cls, v: Optional[List[str]]) -> Optional[List[str]]:
if v is not None and len(v) == 0:
raise ValueError("allowed_values must not be empty if set")
return v

@field_validator("custom")
@classmethod
def _custom_nonempty(cls, v: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
# Every other field validates at definition time; custom predicates are
# raw SQL passed straight to the downstream check, so an empty/whitespace
# predicate would otherwise only surface at FE run time. Reject the
# common typos here. Also catches the empty-map set != get asymmetry
# (proto3 reads an empty map back as unset, i.e. None).
if v is None:
return v
if len(v) == 0:
raise ValueError("custom must not be empty if set")
for name, predicate in v.items():
if name is None or name.strip() == "":
raise ValueError("custom check names must not be empty")
if predicate is None or predicate.strip() == "":
raise ValueError(f"custom predicate for {name!r} must not be empty")
return v

@model_validator(mode="after")
def _cross_field(self) -> "FieldConstraints":
if self.nullable is False and (self.max_null_pct or 0) > 0:
raise ValueError("nullable=False contradicts max_null_pct > 0")
if (
self.min_value is not None
and self.max_value is not None
and self.min_value > self.max_value
):
raise ValueError(
f"min_value ({self.min_value}) > max_value ({self.max_value})"
)
return self

def to_proto(self) -> FieldConstraintsProto:
kwargs: Dict[str, Any] = {}
# bool / double fields wrap as google.protobuf.BoolValue/DoubleValue
# so the proto can distinguish unset from a meaningful zero/false.
if self.nullable is not None:
kwargs["nullable"] = BoolValue(value=self.nullable)
if self.max_null_pct is not None:
kwargs["max_null_pct"] = DoubleValue(value=self.max_null_pct)
if self.min_value is not None:
kwargs["min_value"] = DoubleValue(value=self.min_value)
if self.max_value is not None:
kwargs["max_value"] = DoubleValue(value=self.max_value)
if self.min_compliance is not None:
kwargs["min_compliance"] = DoubleValue(value=self.min_compliance)
if self.allowed_values is not None:
kwargs["allowed_values"] = list(self.allowed_values)
if self.regex is not None:
kwargs["regex"] = self.regex
if self.unique is not None:
kwargs["unique"] = BoolValue(value=self.unique)
if self.custom is not None:
kwargs["custom"] = dict(self.custom)
if self.imputation is not None:
kwargs["imputation"] = self.imputation.to_proto()
return FieldConstraintsProto(**kwargs)

@classmethod
def from_proto(cls, proto: FieldConstraintsProto) -> "FieldConstraints":
kwargs: Dict[str, Any] = {}
# Wrapper messages: HasField returns True iff the wrapper itself was
# set on the wire. Unwrap .value for the contained primitive.
if proto.HasField("nullable"):
kwargs["nullable"] = proto.nullable.value
if proto.HasField("max_null_pct"):
kwargs["max_null_pct"] = proto.max_null_pct.value
if proto.HasField("min_value"):
kwargs["min_value"] = proto.min_value.value
if proto.HasField("max_value"):
kwargs["max_value"] = proto.max_value.value
if proto.HasField("min_compliance"):
kwargs["min_compliance"] = proto.min_compliance.value
if len(proto.allowed_values) > 0:
kwargs["allowed_values"] = list(proto.allowed_values)
# `regex` is a plain string in proto3; empty == unset.
if proto.regex != "":

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw the empty == unset note here, makes sense for proto3. Only nit: the validator still accepts regex="" and to_proto writes it, so you can create a value that silently reads back as None. Could reject empty regex in _regex_compiles to keep the two ends consistent.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find. I added a check to reject empty regex strings and a unit test that should reject them as well.

kwargs["regex"] = proto.regex
if proto.HasField("unique"):
kwargs["unique"] = proto.unique.value
if len(proto.custom) > 0:
kwargs["custom"] = dict(proto.custom)
if proto.HasField("imputation"):
kwargs["imputation"] = Imputation.from_proto(proto.imputation)
return cls(**kwargs)
Loading
Loading