-
Notifications
You must be signed in to change notification settings - Fork 134
Fix cdc_order_by with multiple fields. #6346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,11 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; | |
| use dbsp::circuit::tokio::TOKIO; | ||
| use deltalake::datafusion::dataframe::DataFrame; | ||
| use deltalake::datafusion::execution::context::SQLOptions; | ||
| use deltalake::datafusion::logical_expr::SortExpr; | ||
| use deltalake::datafusion::prelude::SessionContext; | ||
| use deltalake::datafusion::sql::sqlparser::dialect::GenericDialect; | ||
| use deltalake::datafusion::sql::sqlparser::parser::Parser; | ||
| use deltalake::datafusion::sql::sqlparser::tokenizer::Token; | ||
| use deltalake::kernel::Action; | ||
| use deltalake::logstore::{self, IORuntime}; | ||
| use deltalake::table::builder::ensure_table_uri; | ||
|
|
@@ -28,7 +32,8 @@ use feldera_adapterlib::metrics::{ConnectorMetrics, ValueType}; | |
| use feldera_adapterlib::transport::{InputQueueEntry, Resume, Watermark, parse_resume_info}; | ||
| use feldera_adapterlib::utils::datafusion::{ | ||
| array_to_string, create_session_context_with, execute_query_collect, execute_singleton_query, | ||
| timestamp_to_sql_expression, validate_sql_expression, validate_timestamp_column, | ||
| timestamp_to_sql_expression, validate_sql_expression, validate_sql_order_by, | ||
| validate_timestamp_column, | ||
| }; | ||
| use feldera_storage::tokio::TOKIO_DEDICATED_IO; | ||
| use feldera_types::adapter_stats::ConnectorHealth; | ||
|
|
@@ -199,14 +204,49 @@ pub(super) fn build_cdc_dataframe( | |
| } | ||
| }; | ||
|
|
||
| let order_by_expr = result_df | ||
| .parse_sql_expr(order_by) | ||
| .map_err(|e| anyhow!("invalid 'cdc_order_by' expression '{order_by}': {e}"))?; | ||
| result_df.sort_by(vec![order_by_expr]).map_err(|e| { | ||
| let sort_exprs = parse_cdc_order_by(&result_df, order_by)?; | ||
| result_df.sort(sort_exprs).map_err(|e| { | ||
| anyhow!("internal error processing {description}; {REPORT_ERROR}; error applying 'cdc_order_by': {e}") | ||
| }) | ||
| } | ||
|
|
||
| /// Parse the `cdc_order_by` clause into DataFusion sort expressions resolved | ||
| /// against df's schema. | ||
| /// | ||
| /// `cdc_order_by` is the body of an ORDER BY clause: a comma-separated list | ||
| /// of keys, each optionally annotated with ASC / DESC and NULLS | ||
| /// FIRST / NULLS LAST. | ||
| pub(super) fn parse_cdc_order_by(df: &DataFrame, order_by: &str) -> AnyResult<Vec<SortExpr>> { | ||
| let mut parser = Parser::new(&GenericDialect) | ||
| .try_with_sql(order_by) | ||
| .map_err(|e| anyhow!("invalid 'cdc_order_by' expression '{order_by}': {e}"))?; | ||
| let order_exprs = parser | ||
| .parse_comma_separated(Parser::parse_order_by_expr) | ||
| .map_err(|e| anyhow!("invalid 'cdc_order_by' expression '{order_by}': {e}"))?; | ||
| if parser.peek_token().token != Token::EOF { | ||
| bail!( | ||
| "invalid 'cdc_order_by' expression '{order_by}': unexpected trailing input near '{}'", | ||
| parser.peek_token() | ||
| ); | ||
| } | ||
|
|
||
| order_exprs | ||
| .into_iter() | ||
| .map(|order_expr| { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| let key = df | ||
| .parse_sql_expr(&order_expr.expr.to_string()) | ||
| .map_err(|e| anyhow!("invalid 'cdc_order_by' expression '{order_by}': {e}"))?; | ||
| // Match DataFusion's SQL planner defaults: a missing direction is | ||
| // ASC, and missing NULLS follows nulls_max (last for ASC). | ||
| // - https://datafusion.apache.org/user-guide/sql/select.html#order-by-clause | ||
| // - https://datafusion.apache.org/user-guide/configs.html | ||
| let asc = order_expr.options.asc.unwrap_or(true); | ||
| let nulls_first = order_expr.options.nulls_first.unwrap_or(!asc); | ||
| Ok(key.sort(asc, nulls_first)) | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| /// Integrated input connector that reads from a delta table. | ||
| pub struct DeltaTableInputEndpoint { | ||
| endpoint_name: String, | ||
|
|
@@ -1985,7 +2025,7 @@ impl DeltaTableInputEndpointInner { | |
| /// Validate the cdc_order_by expression. | ||
| fn validate_cdc_order_by(&self) -> Result<(), ControllerError> { | ||
| if let Some(order_by) = &self.config.cdc_order_by { | ||
| validate_sql_expression(order_by).map_err(|e| { | ||
| validate_sql_order_by(order_by).map_err(|e| { | ||
| ControllerError::invalid_transport_configuration( | ||
| &self.endpoint_name, | ||
| &format!("error parsing 'cdc_order_by' expression '{order_by}': {e}"), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate_sql_order_bylives inadapterlib::utils::datafusionandparse_cdc_order_byre-implements the same prefix here (Parser::new(GenericDialect).try_with_sql + parse_comma_separated + EOF check), just with deltalake'ssqlparserre-export. They have to stay in lock-step forvalidate_cdc_order_byto actually pre-validate whatbuild_cdc_dataframewill accept. Worth extracting the&str -> Vec<OrderByExpr>step into a shared helper inadapterlibso the two sites can never drift.