Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ def __call__(self):
time: ddbtyp.TIME,
}
"""A mapping of Python types to the equivalent DuckDB types."""

DEFAULT_ISO_FORMATS: dict[type, str] = {
date: "YYYY-MM-DD",
datetime: "YYYY-MM-DDTHH:MM:SS",
time: "HH:MM:SS"
}
"""Mapping of default ISO formats to use when date format not supplied"""

def table_exists(connection: DuckDBPyConnection, table_name: str) -> bool:
"""check if a table exists in a given DuckDBPyConnection"""
Expand Down Expand Up @@ -378,9 +383,6 @@ def get_duckdb_cast_statement_from_annotation(
element_name: str,
type_annotation: Any,
parent_element: bool = True,
date_regex: str = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
timestamp_regex: str = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}((\+|\-)[0-9]{2}:[0-9]{2})?$", # pylint: disable=C0301
time_regex: str = r"^[0-9]{2}:[0-9]{2}:[0-9]{2}$",
) -> str:
"""Generate casting statements for duckdb relations from type annotations"""
type_origin = get_origin(type_annotation)
Expand All @@ -391,19 +393,19 @@ def get_duckdb_cast_statement_from_annotation(
if type_origin is Union:
python_type = _get_non_heterogenous_type(get_args(type_annotation))
return get_duckdb_cast_statement_from_annotation(
element_name, python_type, parent_element, date_regex, timestamp_regex
element_name, python_type, parent_element,
)

# Type hint is e.g. `List[str]`, check to ensure non-heterogenity.
if type_origin is list or (isinstance(type_origin, type) and issubclass(type_origin, list)):
element_type = _get_non_heterogenous_type(get_args(type_annotation))
stmt = f"list_transform({quoted_name}, x -> {get_duckdb_cast_statement_from_annotation('x',element_type, False, date_regex, timestamp_regex)})" # pylint: disable=C0301
stmt = f"list_transform({quoted_name}, x -> {get_duckdb_cast_statement_from_annotation('x',element_type, False,)})" # pylint: disable=C0301
return stmt if not parent_element else _cast_as_ddb_type(stmt, type_annotation)

if type_origin is Annotated:
python_type, *other_args = get_args(type_annotation) # pylint: disable=unused-variable
return get_duckdb_cast_statement_from_annotation(
element_name, python_type, parent_element, date_regex, timestamp_regex
element_name, python_type, parent_element,
) # add other expected params here
# Ensure that we have a concrete type at this point.
if not isinstance(type_annotation, type):
Expand All @@ -428,7 +430,7 @@ def get_duckdb_cast_statement_from_annotation(
continue

fields[field_name] = get_duckdb_cast_statement_from_annotation(
f"{element_name}.{field_name}", field_annotation, False, date_regex, timestamp_regex
f"{element_name}.{field_name}", field_annotation, False,
)

if not fields:
Expand All @@ -447,15 +449,21 @@ def get_duckdb_cast_statement_from_annotation(
raise ValueError(f"dict must be `typing.TypedDict` subclass, got {type_annotation!r}")

for type_ in type_annotation.mro():
_date_format = getattr(type_, "DATE_FORMAT", None)
if _date_format:
dt_cast_statement = f"try_strptime(TRIM({quoted_name}), '{_date_format}')"
else:
dt_cast_statement = f"try_strptime(TRIM({quoted_name}), '{DEFAULT_ISO_FORMATS.get(type_)}')"

# datetime is subclass of date, so needs to be handled first
if issubclass(type_, datetime):
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{timestamp_regex}') THEN TRY_CAST(TRIM({quoted_name}) as TIMESTAMP) ELSE NULL END" # pylint: disable=C0301
stmt = rf"TRY_CAST({dt_cast_statement} as TIMESTAMP)"
return stmt
if issubclass(type_, date):
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{date_regex}') THEN TRY_CAST(TRIM({quoted_name}) as DATE) ELSE NULL END" # pylint: disable=C0301
stmt = rf"TRY_CAST({dt_cast_statement} as DATE)"
return stmt
if issubclass(type_, time):
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{time_regex}') THEN TRY_CAST(TRIM({quoted_name}) as TIME) ELSE NULL END" # pylint: disable=C0301
stmt = rf"TRY_CAST({dt_cast_statement} as TIME)"
return stmt
duck_type = get_duckdb_type_from_annotation(type_)
if duck_type:
Expand Down
27 changes: 27 additions & 0 deletions tests/features/books.feature
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,30 @@ Feature: Pipeline tests using the books dataset
Then the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Then An error report is produced

Scenario: Validate complex nested XML data (spark)
Given I submit the books file nested_books.XML for processing
And A spark pipeline is configured with schema file 'nested_books.dischema.json'
And I add initial audit entries for the submission
Then the latest audit record for the submission is marked with processing status file_transformation
When I run the file transformation phase
Then the header entity is stored as a parquet after the file_transformation phase
And the nested_books entity is stored as a parquet after the file_transformation phase
And the latest audit record for the submission is marked with processing status data_contract
When I run the data contract phase
Then there is 1 record rejection from the data_contract phase
And the header entity is stored as a parquet after the data_contract phase
And the nested_books entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then The rules restrict "nested_books" to 3 qualifying records
And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books"
And the nested_books entity is stored as a parquet after the business_rules phase
And the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Then An error report is produced
And The statistics entry for the submission shows the following information
| parameter | value |
| record_count | 4 |
| number_record_rejections | 2 |
| number_warnings | 0 |
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ def test_duckdb_rel_to_dictionaries(temp_ddb_conn: DuckDBPyConnection,
@pytest.mark.parametrize("field_name,field_type,cast_statement",
[("str_test", str, "try_cast(trim(\"str_test\") as VARCHAR)"),
("int_test", int, "try_cast(trim(\"int_test\") as BIGINT)"),
("date_test", datetime.date,"CASE WHEN REGEXP_MATCHES(TRIM(\"date_test\"), '^[0-9]{4}-[0-9]{2}-[0-9]{2}$') THEN TRY_CAST(TRIM(\"date_test\") as DATE) ELSE NULL END"),
("timestamp_test", datetime.datetime,"CASE WHEN REGEXP_MATCHES(TRIM(\"timestamp_test\"), '^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}((\+|\-)[0-9]{2}:[0-9]{2})?$') THEN TRY_CAST(TRIM(\"timestamp_test\") as TIMESTAMP) ELSE NULL END"),
("date_test", datetime.date,"TRY_CAST(try_strptime(TRIM(\"date_test\"), 'YYYY-MM-DD') as DATE)"),
("timestamp_test", datetime.datetime, "TRY_CAST(try_strptime(TRIM(\"timestamp_test\"), 'YYYY-MM-DDTHH:MM:SS') as TIMESTAMP)"),
("list_int_field", list[int], "try_cast(list_transform(\"list_int_field\", x -> trim(\"x\")) as BIGINT[])"),
("basic_model", BasicModel, "try_cast(struct_pack(\"str_field\":= trim(\"basic_model\".str_field),\"date_field\":= CASE WHEN REGEXP_MATCHES(TRIM(\"basic_model\".date_field), '^[0-9]{4}-[0-9]{2}-[0-9]{2}$') THEN TRY_CAST(TRIM(\"basic_model\".date_field) as DATE) ELSE NULL END) as STRUCT(str_field VARCHAR, date_field DATE))"),
("another_model", AnotherModel, "try_cast(struct_pack(\"unique_id\":= trim(\"another_model\".unique_id),\"basic_models\":= list_transform(\"another_model\".basic_models, x -> struct_pack(\"str_field\":= trim(\"x\".str_field),\"date_field\":= CASE WHEN REGEXP_MATCHES(TRIM(\"x\".date_field), '^[0-9]{4}-[0-9]{2}-[0-9]{2}$') THEN TRY_CAST(TRIM(\"x\".date_field) as DATE) ELSE NULL END))) as STRUCT(unique_id BIGINT, basic_models STRUCT(str_field VARCHAR, date_field DATE)[]))")])
("basic_model", BasicModel, "try_cast(struct_pack(\"str_field\":= trim(\"basic_model\".str_field),\"date_field\":= TRY_CAST(try_strptime(TRIM(\"basic_model\".date_field), 'YYYY-MM-DD') as DATE)) as STRUCT(str_field VARCHAR, date_field DATE))"),
("another_model", AnotherModel, "try_cast(struct_pack(\"unique_id\":= trim(\"another_model\".unique_id),\"basic_models\":= list_transform(\"another_model\".basic_models, x -> struct_pack(\"str_field\":= trim(\"x\".str_field),\"date_field\":= TRY_CAST(try_strptime(TRIM(\"x\".date_field), 'YYYY-MM-DD') as DATE)))) as STRUCT(unique_id BIGINT, basic_models STRUCT(str_field VARCHAR, date_field DATE)[]))")])
def test_get_duckdb_cast_statement_from_annotation(field_name, field_type, cast_statement):
assert get_duckdb_cast_statement_from_annotation(field_name, field_type) == cast_statement

Expand Down
10 changes: 5 additions & 5 deletions tests/testdata/books/nested_books.XML
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<title>XML Developer's Guide</title>
<genre>Computer</genre>
<price>44.95</price>
<publish_date>2000-10-01</publish_date>
<publish_date>01-10-2000</publish_date>
<description>An in-depth look at creating applications
with XML.</description>
</book>
Expand All @@ -20,7 +20,7 @@
<title>Midnight Rain</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-12-16</publish_date>
<publish_date>16-12-2000</publish_date>
<description>A former architect battles corporate zombies,
an evil sorceress, and her own childhood to become queen
of the world.</description>
Expand All @@ -35,7 +35,7 @@
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
<publish_date>17-11-2000</publish_date>
<description>After the collapse of a nanotechnology
society in England, the young survivors lay the
foundation for a new society.</description>
Expand All @@ -44,7 +44,7 @@
<title>Oberon's Legacy</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2001-03-10</publish_date>
<publish_date>10-03-2001</publish_date>
<description>In post-apocalypse England, the mysterious
agent known only as Oberon helps to create a new life
for the inhabitants of London. Sequel to Maeve
Expand All @@ -54,7 +54,7 @@
<title>The Sundered Grail</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2001-09-10</publish_date>
<publish_date>10-09-2001</publish_date>
<description>The two daughters of Maeve, half-sisters,
battle one another for control of England. Sequel to
Oberon's Legacy.</description>
Expand Down
10 changes: 9 additions & 1 deletion tests/testdata/books/nested_books.dischema.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
{
"contract": {
"types": {
"non_iso_date": {
"callable": "conformatteddate",
"constraints": {
"date_format": "%d-%m-%Y"
}
}
},
"schemas": {
"book": {
"fields": {
"title": "str",
"genre": "str",
"price": "str",
"publish_date": "date",
"publish_date": "non_iso_date",
"description": "str"
},
"mandatory_fields": [
Expand Down
10 changes: 9 additions & 1 deletion tests/testdata/books/nested_books_ddb.dischema.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
{
"contract": {
"types": {
"non_iso_date": {
"callable": "conformatteddate",
"constraints": {
"date_format": "%d-%m-%Y"
}
}
},
"schemas": {
"book": {
"fields": {
"title": "str",
"genre": "str",
"price": "str",
"publish_date": "date",
"publish_date": "non_iso_date",
"description": "str"
},
"mandatory_fields": [
Expand Down
Loading