-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmain.go
More file actions
103 lines (91 loc) · 2.8 KB
/
Copy pathmain.go
File metadata and controls
103 lines (91 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main
import (
"context"
feast "github.com/feast-dev/feast/sdk/go"
"github.com/feast-dev/feast/sdk/go/protos/feast/serving"
"github.com/feast-dev/feast/sdk/go/protos/feast/types"
"github.com/kelseyhightower/envconfig"
"log"
"net/http"
"strconv"
"time"
)
type Config struct {
FeastServingHost string `default:"localhost"`
FeastServingPort int `default:"6566"`
ListenPort string `default:"8080"`
}
func main() {
var c Config
err := envconfig.Process("LOAD", &c)
if err != nil {
log.Fatal(err.Error())
}
log.Printf("Creating client to connect to Feast Serving at %s:%d", c.FeastServingHost, c.FeastServingPort)
client, err := feast.NewGrpcClient(c.FeastServingHost, c.FeastServingPort)
if err != nil {
log.Fatalf("Could not connect to: %v", err)
}
http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
entityCountParam := r.URL.Query().Get("entity_count")
if len(entityCountParam) < 1 {
log.Fatal("Url parameter 'entity_count' is missing. Please specify the entity count in order to generate the appropriate load")
}
entityCount, err := strconv.Atoi(entityCountParam)
request := buildRequest(entityCount)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
values, err := client.GetOnlineFeatures(ctx, &request)
if err != nil {
log.Fatalf("%v", err)
}
if values.RawResponse.FieldValues[0].Fields["float_feature"].GetFloatVal() != 0.1 {
log.Fatal("Hardcoded float value of 0.1 was not found in response for feature \"float_feature\", please make sure the correct values have been ingested.")
}
w.WriteHeader(200)
})
http.HandleFunc("/echo", func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var req serving.GetFeastServingInfoRequest
_, err := client.GetFeastServingInfo(ctx, &req)
if err != nil {
log.Fatalf("%v", err)
}
w.WriteHeader(200)
})
log.Printf("Starting server on port %s\n", c.ListenPort)
err = http.ListenAndServe(":"+c.ListenPort, nil)
if err != nil {
log.Fatalf("could not start server")
}
}
func buildRequest(entityRowCount int) feast.OnlineFeaturesRequest {
var entityRows []feast.Row
for i := 0; i <= entityRowCount; i++ {
row := make(map[string]*types.Value)
val := feast.Int64Val(int64(1000 + i))
row["user_id"] = val
entityRows = append(entityRows, row)
}
request := feast.OnlineFeaturesRequest{
Features: []string{
"int32_feature",
"int64_feature",
"float_feature",
"double_feature",
"string_feature",
"bytes_feature",
"bool_feature",
"int32_list_feature",
"int64_list_feature",
"float_list_feature",
"double_list_feature",
"string_list_feature",
"bytes_list_feature",
},
Entities: entityRows,
OmitEntities: false,
}
return request
}