@@ -38,8 +38,33 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio
3838
3939---
4040
41- ## 🛠️ Example DAG Flow
42- ` Read → Aggregate → Join → Transform → Write `
41+ ## 🛠️ Feature Builder Flow
42+ ``` markdown
43+ SourceReadNode
44+ |
45+ v
46+ JoinNode (Only for get_historical_features with entity df)
47+ |
48+ v
49+ FilterNode (Always included; applies TTL or user-defined filters)
50+ |
51+ v
52+ AggregationNode (If aggregations are defined in FeatureView)
53+ |
54+ v
55+ DeduplicationNode (If no aggregation is defined for get_historical_features)
56+ |
57+ v
58+ TransformationNode (If feature_transformation is defined)
59+ |
60+ v
61+ ValidationNode (If enable_validation = True)
62+ |
63+ v
64+ Output
65+ ├──> RetrievalOutput (For get_historical_features)
66+ └──> OnlineStoreWrite / OfflineStoreWrite (For materialize)
67+ ```
4368
4469Each step is implemented as a ` DAGNode ` . An ` ExecutionPlan ` executes these nodes in topological order, caching ` DAGValue ` outputs.
4570
@@ -59,7 +84,7 @@ class MyComputeEngine(ComputeEngine):
5984 def materialize (self , task : MaterializationTask) -> MaterializationJob:
6085 ...
6186
62- def get_historical_features (self , task : HistoricalRetrievalTask) -> pa.Table :
87+ def get_historical_features (self , task : HistoricalRetrievalTask) -> RetrievalJob :
6388 ...
6489```
6590
@@ -71,6 +96,8 @@ class CustomFeatureBuilder(FeatureBuilder):
7196 def build_source_node (self ): ...
7297 def build_aggregation_node (self , input_node ): ...
7398 def build_join_node (self , input_node ): ...
99+ def build_filter_node (self , input_node ):
100+ def build_dedup_node (self , input_node ):
74101 def build_transformation_node (self , input_node ): ...
75102 def build_output_nodes (self , input_node ): ...
76103```
0 commit comments