Skip to content

Commit b84da38

Browse files
committed
address review comments, squash this commit before merging
1 parent 9f975ab commit b84da38

3 files changed

Lines changed: 70 additions & 33 deletions

File tree

python/tests/platform/fixtures/column_mapping.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
"""Build a column-mapped, schema-evolved Delta table fixture with PySpark.
22
33
``tests.utils.ensure_delta_spark_fixture`` runs this as a subprocess
4-
(``uv run --with "delta-spark>=4.2,<5" python column_mapping.py <output_dir>``).
5-
PySpark is the only writer that can produce a Delta table with column mapping
6-
enabled *and* perform the rename / drop schema-evolution operations that make
7-
logical column names diverge from the physical Parquet column names
8-
(``col-<uuid>``); neither ``delta-rs`` nor the ``deltalake`` Python wheel can.
9-
PySpark imports stay function-local so importing this module (for
4+
(``uv run --with "delta-spark>=4.2,<5" python column_mapping.py <output_dir>
5+
<mode>``), where ``<mode>`` is a ``delta.columnMapping.mode`` value: ``name``
6+
(physical Parquet columns are renamed to ``col-<uuid>``) or ``id`` (columns are
7+
matched by Parquet field ID). PySpark is currently the only writer that can
8+
produce a Delta table with column mapping enabled *and* perform the rename /
9+
drop schema-evolution operations that make logical column names diverge from
10+
the physical ones; neither ``delta-rs`` nor the ``deltalake`` Python wheel can.
11+
DROP COLUMN under column mapping requires delta-spark 4.x (Delta writer v5,
12+
reader v2). PySpark imports stay function-local so importing this module (for
1013
``EXPECTED_ROWS``) never pulls in the JVM/Spark stack.
1114
15+
Changing this builder changes the fixture it produces, but a cached fixture is
16+
reused based on its path alone — bump ``FIXTURE_VERSION`` in
17+
``test_delta_input_column_mapping.py`` on any builder change.
18+
1219
The resulting table's history (one commit per step):
1320
14-
* ``v0`` CREATE TABLE with ``delta.columnMapping.mode = 'name'``
21+
* ``v0`` CREATE TABLE with ``delta.columnMapping.mode = '<mode>'``
1522
* ``v1`` INSERT two rows under the original ``(id, name, amount)`` schema
1623
* ``v2`` RENAME COLUMN ``name`` -> ``full_name``
1724
* ``v3`` ADD COLUMN ``country``
@@ -40,8 +47,13 @@
4047
]
4148

4249

43-
def build(table_path: str) -> None:
44-
"""Create the column-mapped, schema-evolved table at ``table_path``."""
50+
def build(table_path: str, mode: str = "name") -> None:
51+
"""Create the column-mapped, schema-evolved table at ``table_path``.
52+
53+
:param mode: ``delta.columnMapping.mode`` for the table: ``name`` or ``id``.
54+
"""
55+
if mode not in ("name", "id"):
56+
raise ValueError(f"unsupported column-mapping mode: {mode!r}")
4557
from delta import configure_spark_with_delta_pip
4658
from pyspark.sql import SparkSession
4759

@@ -64,7 +76,7 @@ def build(table_path: str) -> None:
6476
f"""
6577
CREATE TABLE {t} (id BIGINT, name STRING, amount DOUBLE)
6678
USING delta
67-
TBLPROPERTIES ('delta.columnMapping.mode' = 'name',
79+
TBLPROPERTIES ('delta.columnMapping.mode' = '{mode}',
6880
'delta.minReaderVersion' = '2',
6981
'delta.minWriterVersion' = '5')
7082
"""
@@ -80,6 +92,6 @@ def build(table_path: str) -> None:
8092

8193

8294
if __name__ == "__main__":
83-
if len(sys.argv) != 2:
84-
raise SystemExit("usage: column_mapping.py <output_dir>")
85-
build(sys.argv[1])
95+
if len(sys.argv) not in (2, 3):
96+
raise SystemExit("usage: column_mapping.py <output_dir> [name|id]")
97+
build(*sys.argv[1:])

python/tests/platform/test_delta_input_column_mapping.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
"""Snapshot read of a column-mapped Delta table after schema evolution.
22
33
A table with ``delta.columnMapping.mode = 'name'`` stores opaque physical
4-
Parquet column names (``col-<uuid>``) that differ from the logical names, so a
4+
Parquet column names (``col-<uuid>``) that differ from the logical names; with
5+
``mode = 'id'`` columns are matched by Parquet field ID instead. Either way, a
56
correct snapshot read must resolve every value through the column-mapping
6-
metadata in the Delta log. PySpark seeds the fixture (it is the only writer
7-
that supports column-mapping rename/drop); the builder lives in
7+
metadata in the Delta log. PySpark seeds the fixture (it is currently the only
8+
writer that supports column-mapping rename/drop); the builder lives in
89
``fixtures/column_mapping.py`` and runs via ``uv run --with delta-spark`` so
910
the Spark/JVM wheels are only fetched on cache miss.
1011
"""
@@ -25,7 +26,7 @@
2526
TABLE = "t"
2627
CONNECTOR = "delta_in"
2728
# Bump to invalidate cached MinIO copies when the fixture definition changes.
28-
FIXTURE_VERSION = "column_mapping_v1"
29+
FIXTURE_VERSION = "v1"
2930

3031
# Spark builder that writes the column-mapped, schema-evolved table. It runs in
3132
# a subprocess (see ensure_delta_spark_fixture) rather than being imported here.
@@ -65,16 +66,22 @@ def _snapshot_rows(pipeline) -> list[dict]:
6566
)
6667

6768

68-
def test_delta_input_column_mapping_snapshot(pipeline_name):
69+
def _run_column_mapping_snapshot_test(pipeline_name: str, mapping_mode: str) -> None:
6970
"""A snapshot read of a column-mapped, schema-evolved table returns the
70-
final logical schema and correctly resolves physical column names."""
71+
final logical schema and correctly resolves physical column names.
72+
73+
Driven by one wrapper test per ``delta.columnMapping.mode`` (rather than
74+
``pytest.mark.parametrize``) so each case gets a distinct pipeline name —
75+
the ``pipeline_name`` fixture derives the name from the test function, and
76+
parametrized cases sharing one name could collide under ``pytest -n``.
77+
"""
7178
loc = DeltaTestLocation.create(
7279
pipeline_name,
7380
mode="snapshot",
74-
stable_subpath=FIXTURE_VERSION,
81+
stable_subpath=f"column_mapping_{mapping_mode}_{FIXTURE_VERSION}",
7582
)
7683
try:
77-
ensure_delta_spark_fixture(loc, _FIXTURE_BUILDER)
84+
ensure_delta_spark_fixture(loc, _FIXTURE_BUILDER, builder_args=(mapping_mode,))
7885

7986
pipeline = PipelineBuilder(
8087
TEST_CLIENT,
@@ -99,3 +106,13 @@ def test_delta_input_column_mapping_snapshot(pipeline_name):
99106
pipeline.stop(force=True)
100107
finally:
101108
loc.cleanup()
109+
110+
111+
def test_delta_input_column_mapping_name_snapshot(pipeline_name):
112+
"""Snapshot read with ``delta.columnMapping.mode = 'name'``."""
113+
_run_column_mapping_snapshot_test(pipeline_name, "name")
114+
115+
116+
def test_delta_input_column_mapping_id_snapshot(pipeline_name):
117+
"""Snapshot read with ``delta.columnMapping.mode = 'id'``."""
118+
_run_column_mapping_snapshot_test(pipeline_name, "id")

python/tests/utils.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ def row_count(self, missing_ok: bool = False) -> int:
250250
raise
251251
return dt.count()
252252

253-
def table_exists(self) -> bool:
253+
def delta_log_exists(self) -> bool:
254254
"""Return True when a Delta log is present at this location.
255255
256256
Used to decide whether a cached fixture (see ``stable_subpath``) can
@@ -262,11 +262,13 @@ def table_exists(self) -> bool:
262262
return False
263263

264264
def _place_tree(self, staging: pathlib.Path) -> None:
265-
"""Place a locally-built Delta table tree onto this location's backend.
265+
"""Copy a locally-built Delta table tree to where this location stores
266+
its data: the local directory, or the S3/MinIO bucket, depending on
267+
how this location was created.
266268
267269
Some fixtures can only be produced on the local filesystem (e.g. a
268-
PySpark-written table) yet must be read back from the configured
269-
backend. Local targets are replaced wholesale; S3/MinIO targets get
270+
PySpark-written table). For a local target, any existing content at
271+
``self.local_dir`` is deleted before the copy. S3/MinIO targets get
270272
the data files first and ``_delta_log`` last, so a reader observing
271273
the upload mid-flight never sees a log referencing a missing parquet.
272274
"""
@@ -316,23 +318,28 @@ def ensure_delta_spark_fixture(
316318
wheel. This builds such a fixture once and reuses it:
317319
318320
* If the fixture is already present (``is_present``, defaulting to
319-
:meth:`DeltaTestLocation.table_exists`), do nothing — so a
321+
:meth:`DeltaTestLocation.delta_log_exists`), do nothing — so a
320322
``stable_subpath`` cache is reused across runs.
321323
* Otherwise run ``builder_script`` in an isolated environment
322-
(``uv run --with <delta_spark_spec> python <builder_script> <staging>
323-
*builder_args``), staging into a temp dir so a half-finished build can
324-
never leak into the upload, then place the tree onto ``loc``'s backend.
324+
(``uv run --no-project --with <delta_spark_spec> python <builder_script>
325+
<staging> *builder_args``), staging into a temp dir so a half-finished
326+
build can never leak into the upload, then place the tree onto ``loc``'s
327+
backend. ``--no-project`` keeps the builder hermetic: it depends only on
328+
``delta_spark_spec``, never on building the enclosing project.
325329
326330
The heavy PySpark + JVM stack is pulled only on this rare rebuild path.
327331
328332
:param builder_script: Path to a standalone script that writes a Delta
329333
table to the directory given as its first argument.
330334
:param builder_args: Extra positional arguments passed after the staging
331-
directory (stringified).
335+
directory. Each is stringified verbatim with ``str()`` — pass
336+
primitives; ``None`` would become the literal string ``"None"``.
332337
:param is_present: Predicate deciding whether the fixture already exists;
333338
also re-checked after upload to catch partial uploads.
334339
"""
335-
present = is_present if is_present is not None else DeltaTestLocation.table_exists
340+
present = (
341+
is_present if is_present is not None else DeltaTestLocation.delta_log_exists
342+
)
336343
if present(loc):
337344
return
338345

@@ -348,6 +355,7 @@ def ensure_delta_spark_fixture(
348355
[
349356
"uv",
350357
"run",
358+
"--no-project",
351359
"--with",
352360
delta_spark_spec,
353361
"python",
@@ -363,8 +371,8 @@ def ensure_delta_spark_fixture(
363371

364372
if not present(loc):
365373
raise RuntimeError(
366-
f"Delta fixture at {loc.uri} is still absent after upload — "
367-
"partial upload, or content-stripping middleware?"
374+
f"Delta fixture at {loc.uri} is still absent after the builder "
375+
"ran and the tree was uploaded."
368376
)
369377

370378

0 commit comments

Comments
 (0)