Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions crates/adapterlib/src/utils/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use datafusion::logical_expr::sqlparser::parser::ParserError;
use datafusion::prelude::{SQLOptions, SessionConfig, SessionContext};
use datafusion::sql::sqlparser::dialect::GenericDialect;
use datafusion::sql::sqlparser::parser::Parser;
use datafusion::sql::sqlparser::tokenizer::Token;
use feldera_types::config::PipelineConfig;
use feldera_types::constants::DATAFUSION_TEMP_DIR;
use feldera_types::program_schema::{ColumnType, Field, Relation, SqlType};
Expand Down Expand Up @@ -294,6 +295,20 @@ pub fn validate_sql_expression(expr: &str) -> Result<(), ParserError> {
Ok(())
}

/// Validate the body of an ORDER BY clause (e.g. "ts asc, lsn desc").
///
/// Unlike [`validate_sql_expression`], this accepts the comma-separated,
/// ASC/DESC/NULLS annotated key list a real ORDER BY allows, and
/// requires the whole string to parse so a malformed clause fails here rather
/// than silently dropping every key after the first.
pub fn validate_sql_order_by(order_by: &str) -> Result<(), ParserError> {
let mut parser = Parser::new(&GenericDialect).try_with_sql(order_by)?;
parser.parse_comma_separated(Parser::parse_order_by_expr)?;
parser.expect_token(&Token::EOF)?;

Ok(())
}

/// Convert a value of the timestamp column returned by a SQL query into a valid
/// SQL expression.
pub fn timestamp_to_sql_expression(column_type: &ColumnType, expr: &str) -> String {
Expand Down Expand Up @@ -664,4 +679,106 @@ mod tests {
assert_eq!(min_pool_mb_for_adhoc_sort(2), 135);
assert_eq!(min_pool_mb_for_adhoc_sort(8), 537);
}

/// Make sure random shapes for `filter`, `cdc_delete_filter`, and `cdc_order_by`
/// are parsed correctly by our connector.
#[test]
fn cdc_connector_expr_shapes_validate() {
use super::{validate_sql_expression, validate_sql_order_by};

// Set as `filter` or `cdc_delete_filter`; validated as a scalar predicate.
const FILTER_SHAPES: &[&str] = &[
"0=0",
"0=0 AND (a = 's0' AND b = 's1')",
"0=0 AND (a = 's0')",
"0=0 AND (a IN ('s0'))",
"0=0 AND (a IN ('s0','s1'))",
"0=0 AND (a IN (1,2) OR a IS NULL)",
"0=0 AND (a IN (1,2) OR a IS NULL) AND (b = false)",
"0=0 AND (a IN (1,2) OR a IS NULL) AND (b IN ('s0'))",
"0=0 AND (a IN (1,2) OR a IS NULL) AND (b IN ('s0','s1'))",
"0=0 AND (a IN (1,2) OR a IS NULL) AND (b IS NOT NULL)",
"0=0 AND (a IN (1,2) OR a IS NULL) AND (b IS NULL AND c IS NULL)",
"0=0 AND (a IN (1,2) OR a IS NULL) AND (b IS NULL)",
"0=0 AND (a IN (1,2) OR a IS NULL) AND (b NOT IN ('s0','s1') AND c IS NOT NULL)",
"0=0 AND (a IN('s0','s1'))",
"0=0 AND (a IS NOT NULL AND b IS NOT NULL)",
"0=0 AND a = false",
"0=0 AND a = false AND (b = 's0' AND c = 's1')",
"0=0 AND a = false AND (b = 's0')",
"0=0 AND a = false AND (b IN ('s0'))",
"0=0 AND a = false AND (b IN ('s0','s1'))",
"0=0 AND a = false AND (b IN (1,2) OR b IS NULL)",
"0=0 AND a = false AND (b IN (1,2) OR b IS NULL) AND (c = false)",
"0=0 AND a = false AND (b IN (1,2) OR b IS NULL) AND (c IS NOT NULL)",
"0=0 AND a = false AND (b IS NOT NULL AND c IS NOT NULL)",
"0=0 AND a = false AND b is null",
"0=0 AND a = false AND b is null AND (c = 's0')",
"0=0 AND a = false AND b is null AND (c IN ('s0','s1'))",
"0=0 AND a = false AND b is null AND (c IN (1,2) OR c IS NULL)",
"0=0 AND a = false AND b is null AND (c IN (1,2) OR c IS NULL) AND (d NOT IN ('s0','s1') AND e IS NOT NULL)",
"a > 0",
"a >= 0 AND a <= 9",
"a <> 's0'",
"a != 's0'",
"a BETWEEN 0 AND 9",
"a LIKE 's0'",
"a IS NULL OR b IS NOT NULL",
"NOT (a = false)",
"lower(a) = 's0'",
"cast(a AS bigint) = 0",
"a + b > 0",
"coalesce(a, b) = 's0'",
"a > timestamp '2020-01-02 03:04:05'",
"a = 's0''s1'",
];
const CDC_DELETE_FILTER_SHAPES: &[&str] = &[
"a = true",
"a = true OR b is not null",
"a = true AND b = false",
"a IN ('s0','s1')",
"a IS NOT NULL",
"NOT a",
];
const CDC_ORDER_BY_SHAPES: &[&str] = &[
"a",
"a, b",
"a asc, b asc",
"a ASC",
"a desc",
"a ASC, b DESC",
"a NULLS FIRST",
"a ASC NULLS LAST",
"a DESC NULLS FIRST",
"a asc nulls last, b desc nulls first",
"a asc, b desc, c asc nulls last",
"a + b asc",
"a % 2 asc, b desc",
"lower(a) asc",
"abs(a) desc, b asc",
"cast(a AS bigint) asc",
"coalesce(a, b) asc, c desc",
"case when a then 0 else 1 end desc",
// Quoted identifier containing a space.
"\"a b\" asc",
];

let mut failures = Vec::new();
for expr in FILTER_SHAPES.iter().chain(CDC_DELETE_FILTER_SHAPES) {
if let Err(e) = validate_sql_expression(expr) {
failures.push(format!("predicate '{expr}' failed: {e}"));
}
}
for order_by in CDC_ORDER_BY_SHAPES {
if let Err(e) = validate_sql_order_by(order_by) {
failures.push(format!("cdc_order_by '{order_by}' failed: {e}"));
}
}

assert!(
failures.is_empty(),
"validation failures:\n{}",
failures.join("\n")
);
}
}
52 changes: 46 additions & 6 deletions crates/adapters/src/integrated/delta_table/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use dbsp::circuit::tokio::TOKIO;
use deltalake::datafusion::dataframe::DataFrame;
use deltalake::datafusion::execution::context::SQLOptions;
use deltalake::datafusion::logical_expr::SortExpr;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::datafusion::sql::sqlparser::dialect::GenericDialect;
use deltalake::datafusion::sql::sqlparser::parser::Parser;
use deltalake::datafusion::sql::sqlparser::tokenizer::Token;
use deltalake::kernel::Action;
use deltalake::logstore::{self, IORuntime};
use deltalake::table::builder::ensure_table_uri;
Expand All @@ -28,7 +32,8 @@ use feldera_adapterlib::metrics::{ConnectorMetrics, ValueType};
use feldera_adapterlib::transport::{InputQueueEntry, Resume, Watermark, parse_resume_info};
use feldera_adapterlib::utils::datafusion::{
array_to_string, create_session_context_with, execute_query_collect, execute_singleton_query,
timestamp_to_sql_expression, validate_sql_expression, validate_timestamp_column,
timestamp_to_sql_expression, validate_sql_expression, validate_sql_order_by,
validate_timestamp_column,
};
use feldera_storage::tokio::TOKIO_DEDICATED_IO;
use feldera_types::adapter_stats::ConnectorHealth;
Expand Down Expand Up @@ -199,14 +204,49 @@ pub(super) fn build_cdc_dataframe(
}
};

let order_by_expr = result_df
.parse_sql_expr(order_by)
.map_err(|e| anyhow!("invalid 'cdc_order_by' expression '{order_by}': {e}"))?;
result_df.sort_by(vec![order_by_expr]).map_err(|e| {
let sort_exprs = parse_cdc_order_by(&result_df, order_by)?;
result_df.sort(sort_exprs).map_err(|e| {
anyhow!("internal error processing {description}; {REPORT_ERROR}; error applying 'cdc_order_by': {e}")
})
}

/// Parse the `cdc_order_by` clause into DataFusion sort expressions resolved
/// against df's schema.
///
/// `cdc_order_by` is the body of an ORDER BY clause: a comma-separated list
/// of keys, each optionally annotated with ASC / DESC and NULLS

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate_sql_order_by lives in adapterlib::utils::datafusion and parse_cdc_order_by re-implements the same prefix here (Parser::new(GenericDialect).try_with_sql + parse_comma_separated + EOF check), just with deltalake's sqlparser re-export. They have to stay in lock-step for validate_cdc_order_by to actually pre-validate what build_cdc_dataframe will accept. Worth extracting the &str -> Vec<OrderByExpr> step into a shared helper in adapterlib so the two sites can never drift.

/// FIRST / NULLS LAST.
pub(super) fn parse_cdc_order_by(df: &DataFrame, order_by: &str) -> AnyResult<Vec<SortExpr>> {
let mut parser = Parser::new(&GenericDialect)
.try_with_sql(order_by)
.map_err(|e| anyhow!("invalid 'cdc_order_by' expression '{order_by}': {e}"))?;
let order_exprs = parser
.parse_comma_separated(Parser::parse_order_by_expr)
.map_err(|e| anyhow!("invalid 'cdc_order_by' expression '{order_by}': {e}"))?;
if parser.peek_token().token != Token::EOF {
bail!(
"invalid 'cdc_order_by' expression '{order_by}': unexpected trailing input near '{}'",
parser.peek_token()
);
}

order_exprs
.into_iter()
.map(|order_expr| {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df.parse_sql_expr(&order_expr.expr.to_string()) round-trips the key expression through a string render of sqlparser's AST and reparses it via DataFusion. That works for everything in the test ladder, but it is a fragile contract — anything that formats ambiguously (quoting rules across dialects, function-call serialization, etc.) can silently change semantics between the two parses. Since you already have an order_expr.expr: sqlparser::ast::Expr, the more direct path is to use df.parse_sql_expr only once on the whole clause (e.g. via a tiny helper that hands the sub-Expr to DataFusion's SQL-to-logical-expr translator), or to push the per-key reparse far enough to fail loudly if the round trip drops anything. Not a blocker, but worth a TODO so future-you doesn't have to discover this the hard way.

let key = df
.parse_sql_expr(&order_expr.expr.to_string())
.map_err(|e| anyhow!("invalid 'cdc_order_by' expression '{order_by}': {e}"))?;
// Match DataFusion's SQL planner defaults: a missing direction is
// ASC, and missing NULLS follows nulls_max (last for ASC).
// - https://datafusion.apache.org/user-guide/sql/select.html#order-by-clause
// - https://datafusion.apache.org/user-guide/configs.html
let asc = order_expr.options.asc.unwrap_or(true);
let nulls_first = order_expr.options.nulls_first.unwrap_or(!asc);
Ok(key.sort(asc, nulls_first))
})
.collect()
}

/// Integrated input connector that reads from a delta table.
pub struct DeltaTableInputEndpoint {
endpoint_name: String,
Expand Down Expand Up @@ -1985,7 +2025,7 @@ impl DeltaTableInputEndpointInner {
/// Validate the cdc_order_by expression.
fn validate_cdc_order_by(&self) -> Result<(), ControllerError> {
if let Some(order_by) = &self.config.cdc_order_by {
validate_sql_expression(order_by).map_err(|e| {
validate_sql_order_by(order_by).map_err(|e| {
ControllerError::invalid_transport_configuration(
&self.endpoint_name,
&format!("error parsing 'cdc_order_by' expression '{order_by}': {e}"),
Expand Down
Loading
Loading