Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor nyc_taxi example. TODO: update refs to the notebook
Signed-off-by: Jun Ki Min <42475935+loomlike@users.noreply.github.com>
  • Loading branch information
loomlike committed Oct 12, 2022
commit bb76c4365aca59fe45e5caa9de6e39bca8a29bc8
1,248 changes: 1,248 additions & 0 deletions docs/samples/nyc_taxi_demo.ipynb

Large diffs are not rendered by default.

203 changes: 148 additions & 55 deletions feathr_project/feathr/utils/job_utils.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,170 @@
from feathr.client import FeathrClient
import os
import glob
from feathr.constants import OUTPUT_FORMAT
from loguru import logger
import pandas as pd
import os
import tempfile
from typing import Union
from warnings import warn

import pandas as pd
from pandas.errors import EmptyDataError
from pyspark.sql import DataFrame, SparkSession

from feathr.client import FeathrClient
from feathr.constants import OUTPUT_FORMAT


def get_result_pandas_df(
client: FeathrClient,
data_format: str = None,
res_url: str = None,
local_folder: str = None,
) -> pd.DataFrame:
"""Download the job result dataset from cloud as a Pandas DataFrame.

Args:
client: Feathr client
data_format: Format to read the downloaded files. Currently support `parquet`, `delta`, `avro`, and `csv`. Default to `avro` if not specified.
res_url: Output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data.
local_folder (Optional): Specify the absolute download path. if the user does not provide this, function will create a temporary directory and delete it after reading the dataframe.

def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, local_folder: str = None) -> pd.DataFrame:
"""Download the job result dataset from cloud as a Pandas dataframe to make it easier for the client to read.
Returns:
pandas DataFrame
"""
return get_result_df(client, data_format, res_url, local_folder)


def get_result_spark_df(
spark: SparkSession,
client: FeathrClient,
data_format: str = None,
res_url: str = None,
local_folder: str = None,
) -> DataFrame:
"""Download the job result dataset from cloud as a Spark DataFrame.

format: format to read the downloaded files. Currently support `parquet`, `delta`, `avro`, and `csv`. Default to `avro` if not specified.
res_url: output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data.
local_folder: optional parameter to specify the absolute download path. if the user does not provide this, function will create a temporary directory and delete it after reading the dataframe.
Args:
spark: Spark session
client: Feathr client
data_format: Format to read the downloaded files. Currently support `parquet`, `delta`, `avro`, and `csv`. Default to `avro` if not specified.
res_url: Output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data.
local_folder (Optional): Specify the absolute download path. if the user does not provide this, function will create a temporary directory and delete it after reading the dataframe.

Returns:
Spark DataFrame
"""
# use a result url if it's provided by the user, otherwise use the one provided by the job
return get_result_df(client, data_format, res_url, local_folder, spark=spark)


def get_result_df(
client: FeathrClient,
data_format: str = None,
res_url: str = None,
local_folder: str = None,
spark: SparkSession = None,
) -> Union[DataFrame, pd.DataFrame]:
"""Download the job result dataset from cloud as a Spark DataFrame or pandas DataFrame.

Args:
client: Feathr client
data_format: Format to read the downloaded files. Currently support `parquet`, `delta`, `avro`, and `csv`. Default to `avro` if not specified.
res_url: Output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data.
local_folder (Optional): Specify the absolute download path. if the user does not provide this, function will create a temporary directory and delete it after reading the dataframe.
spark (Optional): Spark session. If provided, the function returns spark Dataframe. Otherwise, it returns pd.DataFrame.

Returns:
Either Spark or pandas DataFrame.
"""
# use a result url if it's provided by the user, otherwise use the one provided by the job
res_url: str = res_url or client.get_job_result_uri(block=True, timeout_sec=1200)
if res_url is None:
raise RuntimeError("res_url is None. Please make sure either you provide a res_url or make sure the job finished in FeathrClient has a valid result URI.")
raise RuntimeError(
"res_url is None. Please make sure either you provide a res_url or make sure the job finished in FeathrClient has a valid result URI."
)

# use user provided format, if there isn't one, then otherwise use the one provided by the job;
tmp_dir = None

if client.spark_runtime == "local":
local_dir_path = res_url
if local_folder is not None:
warn(
"In local spark mode, the result files are expected to be stored at a local storage and thus `local_folder` argument will be ignored."
)
else:
# if local_folder params is not provided then create a temporary folder
if local_folder is not None:
local_dir_path = local_folder
else:
tmp_dir = tempfile.TemporaryDirectory()
local_dir_path = tmp_dir.name
client.feathr_spark_launcher.download_result(
result_path=res_url, local_folder=local_dir_path
)

# use user provided format, if there isn't one, then otherwise use the one provided by the job;
# if none of them is available, "avro" is the default format.
format: str = format or client.get_job_tags().get(OUTPUT_FORMAT, "")
if format is None or format == "":
format = "avro"
data_format: str = data_format or client.get_job_tags().get(OUTPUT_FORMAT, "")
if data_format is None or data_format == "":
data_format = "avro"

result_df = None

# if local_folder params is not provided then create a temporary folder
if local_folder is not None:
local_dir_path = local_folder
if spark is not None:
result_df = spark.read.format(data_format).load(local_dir_path)
else:
tmp_dir = tempfile.TemporaryDirectory()
local_dir_path = tmp_dir.name

client.feathr_spark_launcher.download_result(result_path=res_url, local_folder=local_dir_path)
dataframe_list = []
# by default the result are in avro format
if format.casefold()=="parquet":
files = glob.glob(os.path.join(local_dir_path, '*.parquet'))
result_df = _read_files_to_pandas_df(dir_path=local_dir_path, data_format=data_format)

if tmp_dir is not None:
tmp_dir.cleanup()

return result_df


def _read_files_to_pandas_df(dir_path: str, data_format: str = "avro") -> pd.DataFrame:
Comment thread
Yuqing-cat marked this conversation as resolved.

if data_format == "parquet":
from pyarrow.parquet import ParquetDataset

files = glob.glob(os.path.join(dir_path, "*.parquet"))
ds = ParquetDataset(files)
result_df = ds.read().to_pandas()
elif format.casefold()=="delta":
return ds.read().to_pandas()

elif data_format == "delta":
from deltalake import DeltaTable
delta = DeltaTable(local_dir_path)
if not client.spark_runtime == 'azure_synapse':
# don't detect for synapse result with Delta as there's a problem with underlying system
# Issues are tracked here: https://github.com/delta-io/delta-rs/issues/582
result_df = delta.to_pyarrow_table().to_pandas()
else:
logger.info("Please use Azure Synapse to read the result in the Azure Synapse cluster. Reading local results is not supported for Azure Synapse. Empty DataFrame is returned.")
result_df = pd.DataFrame()
elif format.casefold()=="avro":

delta = DeltaTable(dir_path)
# if client.spark_runtime != "azure_synapse":
# don't detect for synapse result with Delta as there's a problem with underlying system
# Issues are tracked here: https://github.com/delta-io/delta-rs/issues/582
return delta.to_pyarrow_table().to_pandas()
# else:
# TODO -- Proper warning messages. Is this applied for all the other formats?
# raise RuntimeError(
# "Please use Azure Synapse to read the result in the Azure Synapse cluster. Reading local results is not supported for Azure Synapse."
# )

elif data_format == "avro":
import pandavro as pdx
for file in glob.glob(os.path.join(local_dir_path, '*.avro')):
dataframe_list.append(pdx.read_avro(file))
result_df = pd.concat(dataframe_list, axis=0)
elif format.casefold()=="csv":
for file in glob.glob(os.path.join(local_dir_path, '*.csv')):

dataframe_list = [
pdx.read_avro(file) for file in glob.glob(os.path.join(dir_path, "*.avro"))
]
return pd.concat(dataframe_list, axis=0)

elif data_format == "csv":
dataframe_list = []
for file in glob.glob(os.path.join(dir_path, "*.csv")):
try:
df = pd.read_csv(file, index_col=None, header=None)
dataframe_list.append(pd.read_csv(file, index_col=None, header=None))
except EmptyDataError:
# in case there are empty files
df = pd.DataFrame()
dataframe_list.append(df)
result_df = pd.concat(dataframe_list, axis=0)
# Reset index to avoid duplicated indices
result_df.reset_index(drop=True)
else:
raise RuntimeError(f"{format} is currently not supported in get_result_df. Currently only parquet, delta, avro, and csv are supported, please consider writing a customized function to read the result.")
pass


if local_folder is None:
tmp_dir.cleanup()
return result_df
if dataframe_list:
# Reset index to avoid duplicated indices -- TODO don't we need reset_index when reading avro too?
return pd.concat(dataframe_list, axis=0).reset_index(drop=True)
else:
raise ValueError(f"Empty files in {dir_path}.")

else:
raise ValueError(
f"{data_format} is currently not supported in get_result_df. Currently only parquet, delta, avro, and csv are supported, please consider writing a customized function to read the result."
)
125 changes: 0 additions & 125 deletions feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml

This file was deleted.

This file was deleted.

Loading