Skip to content

Commit c370981

Browse files
zabarnZach Barnett
andauthored
feat: Remote Registry Support and Bug Fixes (#294)
* EAPC-19174 feat: Remote Registry Support * fix: import wrappers.proto * feat: add integration tests for search functions * fix: format python * fix: update RegistryServer python compiled protos * fix: lint error * fix: unit-test-go * fix: add separate workflow file for go pr unit tests * fix: failing ray python integration test * fix: test_compute.py::test_spark_compute_engine_materialize error * fix: a few more materialization integration tests * fix: registry tests * fix: registry integration tests * fix: operator data source types test * fix: go unit test flakiness * fix: protos, registry_server.py, and test_universal_registry.py * fix: remove sqlite_registry() changes * fix: go_integration_test_utils.go * fix: revert test_compute_dag.py and remove duplicate spark.py field calculation * remove go_pr_changes.yaml * fix: revert unnecessary timestamp related changes after spark fix * fix: how we update project timestamps * fix: format python * fix: revert test_registry_cache changes * fix: update test_registry_cache --------- Co-authored-by: Zach Barnett <zbarnett@expediagroup.com>
1 parent 1d74dfd commit c370981

16 files changed

Lines changed: 1818 additions & 243 deletions

File tree

.github/workflows/go_pr.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,12 @@ jobs:
5858
with:
5959
python-version: "3.11"
6060
architecture: x64
61+
- name: Install the latest version of uv
62+
uses: astral-sh/setup-uv@v5
63+
with:
64+
enable-cache: true
6165
- name: Test Go
62-
run: make test-go
66+
run: make test-go-github
6367
- uses: actions/upload-artifact@v4
6468
with:
6569
name: go-coverage-report

go/internal/feast/server/grpc_server_test.go

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ package server
44

55
import (
66
"context"
7+
"log"
78
"os"
89
"path/filepath"
10+
"runtime"
911
"testing"
1012
"time"
1113

@@ -22,16 +24,27 @@ import (
2224
"github.com/stretchr/testify/assert"
2325
)
2426

27+
var testRepoBasePath string
28+
29+
func TestMain(m *testing.M) {
30+
// Get the file path of this source file, regardless of the working directory
31+
_, filename, _, ok := runtime.Caller(0)
32+
if !ok {
33+
log.Print("couldn't find file path of the test file")
34+
os.Exit(1)
35+
}
36+
testRepoBasePath = filepath.Join(filename, "..", "..", "..", "test")
37+
if err := test.SetupInitializedRepo(testRepoBasePath); err != nil {
38+
log.Print("Could not initialize test repo: ", err)
39+
os.Exit(1)
40+
}
41+
os.Exit(m.Run())
42+
}
43+
2544
func TestGetFeastServingInfo(t *testing.T) {
2645
ctx := context.Background()
27-
// Pregenerated using `feast init`.
28-
dir := "../../test/"
29-
err := test.SetupInitializedRepo(dir)
30-
defer test.CleanUpInitializedRepo(dir)
3146

32-
require.Nil(t, err)
33-
34-
client, closer := GetClient(ctx, dir, "")
47+
client, closer := GetClient(ctx, testRepoBasePath, "")
3548
defer closer()
3649
response, err := client.GetFeastServingInfo(ctx, &serving.GetFeastServingInfoRequest{})
3750
assert.Nil(t, err)
@@ -40,14 +53,8 @@ func TestGetFeastServingInfo(t *testing.T) {
4053

4154
func TestGetOnlineFeaturesSqlite(t *testing.T) {
4255
ctx := context.Background()
43-
// Pregenerated using `feast init`.
44-
dir := "../../test/"
45-
err := test.SetupInitializedRepo(dir)
46-
defer test.CleanUpInitializedRepo(dir)
4756

48-
require.NoError(t, err)
49-
50-
client, closer := GetClient(ctx, dir, "")
57+
client, closer := GetClient(ctx, testRepoBasePath, "")
5158
defer closer()
5259
entities := make(map[string]*types.RepeatedValue)
5360
entities["driver_id"] = &types.RepeatedValue{
@@ -74,7 +81,7 @@ func TestGetOnlineFeaturesSqlite(t *testing.T) {
7481
{Val: &types.Value_Int64Val{Int64Val: 1005}},
7582
}
7683
expectedFeatureNamesResp := []string{"driver_id", "conv_rate", "acc_rate", "avg_daily_trips"}
77-
rows, err := test.ReadParquet(filepath.Join(dir, "feature_repo", "driver_stats.parquet"))
84+
rows, err := test.ReadParquet(filepath.Join(testRepoBasePath, "feature_repo", "driver_stats.parquet"))
7885
assert.Nil(t, err)
7986
entityKeys := map[int64]bool{1001: true, 1003: true, 1005: true}
8087
correctFeatures := test.GetLatestFeatures(rows, entityKeys)
@@ -100,15 +107,9 @@ func TestGetOnlineFeaturesSqlite(t *testing.T) {
100107

101108
func TestGetOnlineFeaturesSqliteWithLogging(t *testing.T) {
102109
ctx := context.Background()
103-
// Pregenerated using `feast init`.
104-
dir := "../../test/"
105-
err := test.SetupInitializedRepo(dir)
106-
defer test.CleanUpInitializedRepo(dir)
107-
108-
require.NoError(t, err)
109110

110111
logPath := t.TempDir()
111-
client, closer := GetClient(ctx, dir, logPath)
112+
client, closer := GetClient(ctx, testRepoBasePath, logPath)
112113
defer closer()
113114
entities := make(map[string]*types.RepeatedValue)
114115
entities["driver_id"] = &types.RepeatedValue{

go/internal/test/go_integration_test_utils.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -238,22 +238,40 @@ func SetupInitializedRepo(basePath string) error {
238238
log.Printf("Repo base path error: %s", err.Error())
239239
return err
240240
}
241-
applyCommand := exec.Command(feastExec, "apply")
242-
applyCommand.Env = os.Environ()
243241
featureRepoPath, err := filepath.Abs(filepath.Join(path, "feature_repo"))
244242
if err != nil {
245243
log.Printf("Repo filepath error: %s", err.Error())
246244
return err
247245
}
246+
247+
// Ensure data directory exists
248+
dataDir := filepath.Join(featureRepoPath, "data")
249+
if err := os.MkdirAll(dataDir, 0755); err != nil {
250+
log.Printf("Failed to create data directory: %s", err.Error())
251+
return err
252+
}
253+
254+
applyCommand := exec.Command(feastExec, "apply")
255+
applyCommand.Env = os.Environ()
248256
applyCommand.Dir = featureRepoPath
249257
out, err := applyCommand.CombinedOutput()
250258
if err != nil {
251-
log.Printf("Repo setup error: %s", string(out))
259+
log.Printf("Repo setup error: %s, %v", string(out), err)
252260
return err
261+
} else {
262+
// Verify registry.db was created with retries
263+
registryPath := filepath.Join(dataDir, "registry.db")
264+
for i := 0; i < 10; i++ {
265+
if _, err := os.Stat(registryPath); err == nil {
266+
log.Printf("Registry file created successfully after %d attempts", i+1)
267+
break
268+
}
269+
if i == 9 {
270+
return fmt.Errorf("registry.db was not created after feast apply")
271+
}
272+
time.Sleep(1 * time.Second)
273+
}
253274
}
254-
255-
// Pause to ensure apply completes
256-
time.Sleep(5 * time.Second)
257275
t := time.Now()
258276

259277
formattedTime := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d",

infra/feast-operator/api/v1alpha1/featurestore_types.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ type OnlineStoreFilePersistence struct {
363363
// OnlineStoreDBStorePersistence configures the DB store persistence for the online store service
364364
type OnlineStoreDBStorePersistence struct {
365365
// Type of the persistence type you want to use.
366-
// +kubebuilder:validation:Enum=snowflake.online;redis;ikv;datastore;dynamodb;bigtable;postgres;cassandra;mysql;hazelcast;singlestore;hbase;elasticsearch;qdrant;couchbase.online;milvus
366+
// +kubebuilder:validation:Enum=snowflake.online;redis;eg-valkey;ikv;datastore;dynamodb;bigtable;postgres;cassandra;scylladb;mysql;hazelcast;singlestore;hbase;elasticsearch;qdrant;couchbase.online;milvus;eg-milvus
367367
Type string `json:"type"`
368368
// Data store parameters should be placed as-is from the "feature_store.yaml" under the secret key. "registry_type" & "type" fields should be removed.
369369
SecretRef corev1.LocalObjectReference `json:"secretRef"`
@@ -374,12 +374,14 @@ type OnlineStoreDBStorePersistence struct {
374374
var ValidOnlineStoreDBStorePersistenceTypes = []string{
375375
"snowflake.online",
376376
"redis",
377+
"eg-valkey",
377378
"ikv",
378379
"datastore",
379380
"dynamodb",
380381
"bigtable",
381382
"postgres",
382383
"cassandra",
384+
"scylladb",
383385
"mysql",
384386
"hazelcast",
385387
"singlestore",
@@ -388,6 +390,7 @@ var ValidOnlineStoreDBStorePersistenceTypes = []string{
388390
"qdrant",
389391
"couchbase.online",
390392
"milvus",
393+
"eg-milvus",
391394
}
392395

393396
// LocalRegistryConfig configures the registry service
@@ -418,7 +421,7 @@ type RegistryFilePersistence struct {
418421
// RegistryDBStorePersistence configures the DB store persistence for the registry service
419422
type RegistryDBStorePersistence struct {
420423
// Type of the persistence type you want to use.
421-
// +kubebuilder:validation:Enum=sql;snowflake.registry
424+
// +kubebuilder:validation:Enum=sql;sql-fallback;snowflake.registry;http
422425
Type string `json:"type"`
423426
// Data store parameters should be placed as-is from the "feature_store.yaml" under the secret key. "registry_type" & "type" fields should be removed.
424427
SecretRef corev1.LocalObjectReference `json:"secretRef"`
@@ -428,7 +431,9 @@ type RegistryDBStorePersistence struct {
428431

429432
var ValidRegistryDBStorePersistenceTypes = []string{
430433
"sql",
434+
"sql-fallback",
431435
"snowflake.registry",
436+
"http",
432437
}
433438

434439
// PvcConfig defines the settings for a persistent file store based on PVCs.

protos/feast/registry/RegistryServer.proto

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package feast.registry;
44

55
import "google/protobuf/empty.proto";
66
import "google/protobuf/timestamp.proto";
7+
import "google/protobuf/wrappers.proto";
78
import "feast/core/DataSource.proto";
89
import "feast/core/Entity.proto";
910
import "feast/core/FeatureService.proto";
@@ -100,6 +101,10 @@ service RegistryServer{
100101
// Feature RPCs
101102
rpc ListFeatures (ListFeaturesRequest) returns (ListFeaturesResponse) {}
102103
rpc GetFeature (GetFeatureRequest) returns (Feature) {}
104+
105+
// Expedia Search RPCs
106+
rpc ExpediaSearchProjects (ExpediaSearchProjectsRequest) returns (ExpediaSearchProjectsResponse) {}
107+
rpc ExpediaSearchFeatureViews (ExpediaSearchFeatureViewsRequest) returns (ExpediaSearchFeatureViewsResponse) {}
103108
}
104109

105110
// Common pagination and sorting messages
@@ -590,3 +595,44 @@ message GetFeatureRequest {
590595
string name = 3;
591596
bool allow_cache = 4;
592597
}
598+
599+
// Expedia Search
600+
601+
message ExpediaProjectAndRelatedFeatureViews {
602+
feast.core.Project project = 1;
603+
repeated feast.core.FeatureView feature_views = 2;
604+
}
605+
606+
message ExpediaSearchFeatureViewsRequest {
607+
string search_text = 1;
608+
/*
609+
Using google.protobuf.BoolValue here because we need tri-state (true/false/not set),
610+
which is not possible with a simple bool.
611+
*/
612+
google.protobuf.BoolValue online = 2;
613+
string application = 3;
614+
string team = 4;
615+
google.protobuf.Timestamp created_at = 5;
616+
google.protobuf.Timestamp updated_at = 6;
617+
int32 page_size = 7;
618+
int32 page_index = 8;
619+
}
620+
621+
message ExpediaSearchFeatureViewsResponse {
622+
repeated feast.core.FeatureView feature_views = 1;
623+
int32 total_feature_views = 2;
624+
int32 total_page_indices = 3;
625+
}
626+
627+
message ExpediaSearchProjectsRequest {
628+
string search_text = 1;
629+
google.protobuf.Timestamp updated_at = 2;
630+
int32 page_size = 3;
631+
int32 page_index = 4;
632+
}
633+
634+
message ExpediaSearchProjectsResponse {
635+
repeated ExpediaProjectAndRelatedFeatureViews projects_and_related_feature_views = 1;
636+
int32 total_projects = 2;
637+
int32 total_page_indices = 3;
638+
}

0 commit comments

Comments
 (0)