Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix ODFV decorators in Java
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 committed Apr 12, 2022
commit 7a1a630e5bff998fa0afe4d23e435c81d7e9bc1e
21 changes: 12 additions & 9 deletions examples/java-demo/feature_repo/driver_repo.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import pandas as pd
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.data_source import RequestSource
from feast.field import Field
from feast.on_demand_feature_view import on_demand_feature_view
from feast.request_feature_view import RequestFeatureView
from feast.types import Float32, Float64, Int64, String
from google.protobuf.duration_pb2 import Duration

from feast import Entity, Feature, FeatureView, FileSource, ValueType

driver_hourly_stats = FileSource(
path="data/driver_stats_with_string.parquet",
timestamp_field="event_timestamp",
Expand All @@ -15,11 +18,11 @@
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400000),
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
Feature(name="string_feature", dtype=ValueType.STRING),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="string_feature", dtype=String),
],
online=True,
batch_source=driver_hourly_stats,
Expand All @@ -40,9 +43,9 @@
"driver_hourly_stats": driver_hourly_stats_view,
"vals_to_add": input_request,
},
features=[
Feature(name="conv_rate_plus_val1", dtype=ValueType.DOUBLE),
Feature(name="conv_rate_plus_val2", dtype=ValueType.DOUBLE),
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import pandas as pd

from google.protobuf.duration_pb2 import Duration

from feast.value_type import ValueType
from feast.feature import Feature
from feast.feature_view import FeatureView
from feast.data_source import RequestSource
from feast.entity import Entity
from feast.feature_service import FeatureService
from feast.data_source import RequestSource
from feast.feature_view import FeatureView
from feast.field import Field
from feast.on_demand_feature_view import on_demand_feature_view
from feast import FileSource
from feast.types import Float32, Float64, Int64
from feast.value_type import ValueType
from google.protobuf.duration_pb2 import Duration

from feast import FileSource

file_path = "driver_stats.parquet"
driver_hourly_stats = FileSource(
Expand All @@ -30,10 +29,10 @@
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400 * 7),
features=[
Feature(name="conv_rate", dtype=ValueType.DOUBLE),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
schema=[
Field(name="conv_rate", dtype=Float64),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
batch_source=driver_hourly_stats,
Expand All @@ -43,56 +42,45 @@

input_request = RequestSource(
name="vals_to_add",
schema={
"val_to_add": ValueType.INT64,
"val_to_add_2": ValueType.INT64
}
schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64},
)


@on_demand_feature_view(
sources={
'driver_hourly_stats': driver_hourly_stats_view,
'vals_to_add': input_request
},
features=[
Feature(name='conv_rate_plus_val1', dtype=ValueType.DOUBLE),
Feature(name='conv_rate_plus_val2', dtype=ValueType.DOUBLE)
]
sources={
"driver_hourly_stats": driver_hourly_stats_view,
"vals_to_add": input_request,
},
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(features_df: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df['conv_rate_plus_val1'] = (features_df['conv_rate'] + features_df['val_to_add'])
df['conv_rate_plus_val2'] = (features_df['conv_rate'] + features_df['val_to_add_2'])
df["conv_rate_plus_val1"] = features_df["conv_rate"] + features_df["val_to_add"]
df["conv_rate_plus_val2"] = features_df["conv_rate"] + features_df["val_to_add_2"]
return df


generated_data_source = FileSource(
path="benchmark_data.parquet",
timestamp_field="event_timestamp",
path="benchmark_data.parquet", timestamp_field="event_timestamp",
)

entity = Entity(
name="entity",
value_type=ValueType.STRING,
)
entity = Entity(name="entity", value_type=ValueType.STRING,)

benchmark_feature_views = [
FeatureView(
name=f"feature_view_{i}",
entities=["entity"],
ttl=Duration(seconds=86400),
features=[
Feature(name=f"feature_{10 * i + j}", dtype=ValueType.INT64)
for j in range(10)
],
schema=[Field(name=f"feature_{10 * i + j}", dtype=Int64) for j in range(10)],
online=True,
batch_source=generated_data_source,
)
for i in range(25)
]

benchmark_feature_service = FeatureService(
name=f"benchmark_feature_service",
features=benchmark_feature_views,
name=f"benchmark_feature_service", features=benchmark_feature_views,
)
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import pandas as pd
import numpy as np

from datetime import datetime, timedelta
from feast import FeatureStore

from definitions import driver_hourly_stats_view, driver, entity,\
benchmark_feature_service, benchmark_feature_views, transformed_conv_rate
import numpy as np
import pandas as pd
from definitions import (
benchmark_feature_service,
benchmark_feature_views,
driver,
driver_hourly_stats_view,
entity,
transformed_conv_rate,
)

from feast import FeatureStore

print("Running materialize.py")

Expand All @@ -21,7 +26,9 @@
df["avg_daily_trips"] = np.arange(0, 1000, 100)

# some of rows are beyond 7 days to test OUTSIDE_MAX_AGE status
df["event_timestamp"] = start + pd.Series(np.arange(0, 10)).map(lambda days: timedelta(days=days))
df["event_timestamp"] = start + pd.Series(np.arange(0, 10)).map(
lambda days: timedelta(days=days)
)

# Store data in parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
Expand All @@ -41,21 +48,27 @@ def generate_data(num_rows: int, num_features: int, destination: str) -> pd.Data
for column in features:
df[column] = np.random.randint(1, num_rows, num_rows)

df["entity"] = "key-" + \
pd.Series(np.arange(1, num_rows + 1)).astype(pd.StringDtype())
df["entity"] = "key-" + pd.Series(np.arange(1, num_rows + 1)).astype(
pd.StringDtype()
)

df.to_parquet(destination)


generate_data(10**3, 250, "benchmark_data.parquet")
generate_data(10 ** 3, 250, "benchmark_data.parquet")


fs = FeatureStore(".")
fs.apply([driver_hourly_stats_view,
transformed_conv_rate,
driver,
entity, benchmark_feature_service,
*benchmark_feature_views])
fs.apply(
[
driver_hourly_stats_view,
transformed_conv_rate,
driver,
entity,
benchmark_feature_service,
*benchmark_feature_views,
]
)

now = datetime.now()
fs.materialize(start, now)
Expand Down