Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sdk/python/feast/infra/compute_engines/spark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from pyspark.sql import SparkSession

from feast.infra.common.serde import SerializedArtifacts
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping
from feast.utils import (
_convert_arrow_to_proto,
_run_pyarrow_field_mapping,
get_user_agent,
)

try:
import boto3
Expand Down Expand Up @@ -88,6 +92,7 @@ def _ensure_s3a_event_log_dir(spark_config: Dict[str, str]) -> None:
config=BotoConfig(
signature_version="s3v4",
s3={"addressing_style": addressing_style},
user_agent_extra=get_user_agent(),
),
)
resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1)
Expand Down
9 changes: 8 additions & 1 deletion sdk/python/feast/infra/offline_stores/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,15 @@ def point_in_time_join(

def list_s3_files(path: str, endpoint_url: str) -> List[str]:
import boto3
from botocore.config import Config

s3 = boto3.client("s3", endpoint_url=endpoint_url)
from feast.utils import get_user_agent

s3 = boto3.client(
"s3",
endpoint_url=endpoint_url,
config=Config(user_agent_extra=get_user_agent()),
)
if path.startswith("s3://"):
path = path[len("s3://") :]
bucket, prefix = path.split("/", 1)
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/feast/infra/registry/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from feast.infra.registry.registry_store import RegistryStore
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.repo_config import RegistryConfig
from feast.utils import _utc_now
from feast.utils import _utc_now, get_user_agent

try:
import boto3
from botocore.config import Config
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

Expand All @@ -28,8 +29,11 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
self._key = self._uri.path.lstrip("/")
self._boto_extra_args = registry_config.s3_additional_kwargs or {}

# FEAST_S3_ENDPOINT_URL may point at Amazon S3 or an S3-compatible endpoint.
self.s3_client = boto3.resource(
"s3", endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL")
"s3",
endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL"),
config=Config(user_agent_extra=get_user_agent()),
)

def get_registry_proto(self):
Expand Down
10 changes: 8 additions & 2 deletions sdk/python/feast/infra/utils/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ def get_s3_resource(aws_region: str):
"""
Get the S3 resource for the given AWS region.
"""
return boto3.resource("s3", config=Config(region_name=aws_region))
return boto3.resource(
"s3",
config=Config(region_name=aws_region, user_agent_extra=get_user_agent()),
)


def get_bucket_and_key(s3_path: str) -> Tuple[str, str]:
Expand Down Expand Up @@ -716,7 +719,10 @@ def get_account_id() -> str:


def list_s3_files(aws_region: str, path: str) -> List[str]:
s3 = boto3.client("s3", config=Config(region_name=aws_region))
s3 = boto3.client(
"s3",
config=Config(region_name=aws_region, user_agent_extra=get_user_agent()),
)
if path.startswith("s3://"):
path = path[len("s3://") :]
bucket, prefix = path.split("/", 1)
Expand Down