-
Notifications
You must be signed in to change notification settings - Fork 245
Refine example notebooks #756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
b4f09d7
Fix local spark output file-format bug
loomlike 283b7c8
Add dev dependencies. Add unit-test for local spark job launcher
loomlike d6c24bf
Fix local spark submission unused param error
loomlike 24aaaea
Merge branch 'main' into jumin/localspark_submission_fix
loomlike bb76c43
Refactor nyc_taxi example. TODO: update refs to the notebook
loomlike 8447f31
Add dataset utilities and notebook path refactor. TODO: update refere…
loomlike ad4942c
Add init.py to datasets module. Modify maybe_download to accept dir a…
loomlike d16d1e1
Merge branch 'main' into jumin/example_notebook
loomlike b72ff3b
Add notebook test
loomlike 9ed4b96
change notebook to use scrap flag and is_databricks
loomlike 656d50d
Fix databricks path
loomlike 06008ee
Fix unittest
loomlike 2a36d51
Modify databricks notebook. Fix dbfs path errors in utils.
loomlike 9db09df
Merge branch 'main' into jumin/example_notebook
loomlike 4c020dc
Address review comments
loomlike 3ecc70a
put the user_workspace feature python files back
loomlike db25051
Merge branch 'main' into jumin/example_notebook
loomlike bd9fdb3
Revive feathr_config.yaml
loomlike fe40c47
Merge branch 'main' into jumin/example_notebook
loomlike File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Refactor nyc_taxi example. TODO: update refs to the notebook
Signed-off-by: Jun Ki Min <42475935+loomlike@users.noreply.github.com>
- Loading branch information
commit bb76c4365aca59fe45e5caa9de6e39bca8a29bc8
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,77 +1,170 @@ | ||
| from feathr.client import FeathrClient | ||
| import os | ||
| import glob | ||
| from feathr.constants import OUTPUT_FORMAT | ||
| from loguru import logger | ||
| import pandas as pd | ||
| import os | ||
| import tempfile | ||
| from typing import Union | ||
| from warnings import warn | ||
|
|
||
| import pandas as pd | ||
| from pandas.errors import EmptyDataError | ||
| from pyspark.sql import DataFrame, SparkSession | ||
|
|
||
| from feathr.client import FeathrClient | ||
| from feathr.constants import OUTPUT_FORMAT | ||
|
|
||
|
|
||
| def get_result_pandas_df( | ||
| client: FeathrClient, | ||
| data_format: str = None, | ||
| res_url: str = None, | ||
| local_folder: str = None, | ||
| ) -> pd.DataFrame: | ||
| """Download the job result dataset from cloud as a Pandas DataFrame. | ||
|
|
||
| Args: | ||
| client: Feathr client | ||
| data_format: Format to read the downloaded files. Currently support `parquet`, `delta`, `avro`, and `csv`. Default to `avro` if not specified. | ||
| res_url: Output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data. | ||
| local_folder (Optional): Specify the absolute download path. if the user does not provide this, function will create a temporary directory and delete it after reading the dataframe. | ||
|
|
||
| def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, local_folder: str = None) -> pd.DataFrame: | ||
| """Download the job result dataset from cloud as a Pandas dataframe to make it easier for the client to read. | ||
| Returns: | ||
| pandas DataFrame | ||
| """ | ||
| return get_result_df(client, data_format, res_url, local_folder) | ||
|
|
||
|
|
||
| def get_result_spark_df( | ||
| spark: SparkSession, | ||
| client: FeathrClient, | ||
| data_format: str = None, | ||
| res_url: str = None, | ||
| local_folder: str = None, | ||
| ) -> DataFrame: | ||
| """Download the job result dataset from cloud as a Spark DataFrame. | ||
|
|
||
| format: format to read the downloaded files. Currently support `parquet`, `delta`, `avro`, and `csv`. Default to `avro` if not specified. | ||
| res_url: output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data. | ||
| local_folder: optional parameter to specify the absolute download path. if the user does not provide this, function will create a temporary directory and delete it after reading the dataframe. | ||
| Args: | ||
| spark: Spark session | ||
| client: Feathr client | ||
| data_format: Format to read the downloaded files. Currently support `parquet`, `delta`, `avro`, and `csv`. Default to `avro` if not specified. | ||
| res_url: Output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data. | ||
| local_folder (Optional): Specify the absolute download path. if the user does not provide this, function will create a temporary directory and delete it after reading the dataframe. | ||
|
|
||
| Returns: | ||
| Spark DataFrame | ||
| """ | ||
| # use a result url if it's provided by the user, otherwise use the one provided by the job | ||
| return get_result_df(client, data_format, res_url, local_folder, spark=spark) | ||
|
|
||
|
|
||
| def get_result_df( | ||
| client: FeathrClient, | ||
| data_format: str = None, | ||
| res_url: str = None, | ||
| local_folder: str = None, | ||
| spark: SparkSession = None, | ||
| ) -> Union[DataFrame, pd.DataFrame]: | ||
| """Download the job result dataset from cloud as a Spark DataFrame or pandas DataFrame. | ||
|
|
||
| Args: | ||
| client: Feathr client | ||
| data_format: Format to read the downloaded files. Currently support `parquet`, `delta`, `avro`, and `csv`. Default to `avro` if not specified. | ||
| res_url: Output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data. | ||
| local_folder (Optional): Specify the absolute download path. if the user does not provide this, function will create a temporary directory and delete it after reading the dataframe. | ||
| spark (Optional): Spark session. If provided, the function returns spark Dataframe. Otherwise, it returns pd.DataFrame. | ||
|
|
||
| Returns: | ||
| Either Spark or pandas DataFrame. | ||
| """ | ||
| # use a result url if it's provided by the user, otherwise use the one provided by the job | ||
| res_url: str = res_url or client.get_job_result_uri(block=True, timeout_sec=1200) | ||
| if res_url is None: | ||
| raise RuntimeError("res_url is None. Please make sure either you provide a res_url or make sure the job finished in FeathrClient has a valid result URI.") | ||
| raise RuntimeError( | ||
| "res_url is None. Please make sure either you provide a res_url or make sure the job finished in FeathrClient has a valid result URI." | ||
| ) | ||
|
|
||
| # use user provided format, if there isn't one, then otherwise use the one provided by the job; | ||
| tmp_dir = None | ||
|
|
||
| if client.spark_runtime == "local": | ||
| local_dir_path = res_url | ||
| if local_folder is not None: | ||
| warn( | ||
| "In local spark mode, the result files are expected to be stored at a local storage and thus `local_folder` argument will be ignored." | ||
| ) | ||
| else: | ||
| # if local_folder params is not provided then create a temporary folder | ||
| if local_folder is not None: | ||
| local_dir_path = local_folder | ||
| else: | ||
| tmp_dir = tempfile.TemporaryDirectory() | ||
| local_dir_path = tmp_dir.name | ||
| client.feathr_spark_launcher.download_result( | ||
| result_path=res_url, local_folder=local_dir_path | ||
| ) | ||
|
|
||
| # use user provided format, if there isn't one, then otherwise use the one provided by the job; | ||
| # if none of them is available, "avro" is the default format. | ||
| format: str = format or client.get_job_tags().get(OUTPUT_FORMAT, "") | ||
| if format is None or format == "": | ||
| format = "avro" | ||
| data_format: str = data_format or client.get_job_tags().get(OUTPUT_FORMAT, "") | ||
| if data_format is None or data_format == "": | ||
| data_format = "avro" | ||
|
|
||
| result_df = None | ||
|
|
||
| # if local_folder params is not provided then create a temporary folder | ||
| if local_folder is not None: | ||
| local_dir_path = local_folder | ||
| if spark is not None: | ||
| result_df = spark.read.format(data_format).load(local_dir_path) | ||
| else: | ||
| tmp_dir = tempfile.TemporaryDirectory() | ||
| local_dir_path = tmp_dir.name | ||
|
|
||
| client.feathr_spark_launcher.download_result(result_path=res_url, local_folder=local_dir_path) | ||
| dataframe_list = [] | ||
| # by default the result are in avro format | ||
| if format.casefold()=="parquet": | ||
| files = glob.glob(os.path.join(local_dir_path, '*.parquet')) | ||
| result_df = _read_files_to_pandas_df(dir_path=local_dir_path, data_format=data_format) | ||
|
|
||
| if tmp_dir is not None: | ||
| tmp_dir.cleanup() | ||
|
|
||
| return result_df | ||
|
|
||
|
|
||
| def _read_files_to_pandas_df(dir_path: str, data_format: str = "avro") -> pd.DataFrame: | ||
|
|
||
| if data_format == "parquet": | ||
| from pyarrow.parquet import ParquetDataset | ||
|
|
||
| files = glob.glob(os.path.join(dir_path, "*.parquet")) | ||
| ds = ParquetDataset(files) | ||
| result_df = ds.read().to_pandas() | ||
| elif format.casefold()=="delta": | ||
| return ds.read().to_pandas() | ||
|
|
||
| elif data_format == "delta": | ||
| from deltalake import DeltaTable | ||
| delta = DeltaTable(local_dir_path) | ||
| if not client.spark_runtime == 'azure_synapse': | ||
| # don't detect for synapse result with Delta as there's a problem with underlying system | ||
| # Issues are tracked here: https://github.com/delta-io/delta-rs/issues/582 | ||
| result_df = delta.to_pyarrow_table().to_pandas() | ||
| else: | ||
| logger.info("Please use Azure Synapse to read the result in the Azure Synapse cluster. Reading local results is not supported for Azure Synapse. Empty DataFrame is returned.") | ||
| result_df = pd.DataFrame() | ||
| elif format.casefold()=="avro": | ||
|
|
||
| delta = DeltaTable(dir_path) | ||
| # if client.spark_runtime != "azure_synapse": | ||
| # don't detect for synapse result with Delta as there's a problem with underlying system | ||
| # Issues are tracked here: https://github.com/delta-io/delta-rs/issues/582 | ||
| return delta.to_pyarrow_table().to_pandas() | ||
| # else: | ||
| # TODO -- Proper warning messages. Is this applied for all the other formats? | ||
| # raise RuntimeError( | ||
| # "Please use Azure Synapse to read the result in the Azure Synapse cluster. Reading local results is not supported for Azure Synapse." | ||
| # ) | ||
|
|
||
| elif data_format == "avro": | ||
| import pandavro as pdx | ||
| for file in glob.glob(os.path.join(local_dir_path, '*.avro')): | ||
| dataframe_list.append(pdx.read_avro(file)) | ||
| result_df = pd.concat(dataframe_list, axis=0) | ||
| elif format.casefold()=="csv": | ||
| for file in glob.glob(os.path.join(local_dir_path, '*.csv')): | ||
|
|
||
| dataframe_list = [ | ||
| pdx.read_avro(file) for file in glob.glob(os.path.join(dir_path, "*.avro")) | ||
| ] | ||
| return pd.concat(dataframe_list, axis=0) | ||
|
|
||
| elif data_format == "csv": | ||
| dataframe_list = [] | ||
| for file in glob.glob(os.path.join(dir_path, "*.csv")): | ||
| try: | ||
| df = pd.read_csv(file, index_col=None, header=None) | ||
| dataframe_list.append(pd.read_csv(file, index_col=None, header=None)) | ||
| except EmptyDataError: | ||
| # in case there are empty files | ||
| df = pd.DataFrame() | ||
| dataframe_list.append(df) | ||
| result_df = pd.concat(dataframe_list, axis=0) | ||
| # Reset index to avoid duplicated indices | ||
| result_df.reset_index(drop=True) | ||
| else: | ||
| raise RuntimeError(f"{format} is currently not supported in get_result_df. Currently only parquet, delta, avro, and csv are supported, please consider writing a customized function to read the result.") | ||
| pass | ||
|
|
||
|
|
||
| if local_folder is None: | ||
| tmp_dir.cleanup() | ||
| return result_df | ||
| if dataframe_list: | ||
| # Reset index to avoid duplicated indices -- TODO don't we need reset_index when reading avro too? | ||
| return pd.concat(dataframe_list, axis=0).reset_index(drop=True) | ||
| else: | ||
| raise ValueError(f"Empty files in {dir_path}.") | ||
|
|
||
| else: | ||
| raise ValueError( | ||
| f"{data_format} is currently not supported in get_result_df. Currently only parquet, delta, avro, and csv are supported, please consider writing a customized function to read the result." | ||
| ) | ||
125 changes: 0 additions & 125 deletions
125
feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml
This file was deleted.
Oops, something went wrong.
33 changes: 0 additions & 33 deletions
33
feathr_project/feathrcli/data/feathr_user_workspace/features/agg_features.py
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.