Skip to content

[core][spark][flink] Support sub-field-level data evolution for nested columns#8334

Open
zhuxiangyi wants to merge 5 commits into
apache:masterfrom
zhuxiangyi:feature/nested-subfield-data-evolution
Open

[core][spark][flink] Support sub-field-level data evolution for nested columns#8334
zhuxiangyi wants to merge 5 commits into
apache:masterfrom
zhuxiangyi:feature/nested-subfield-data-evolution

Conversation

@zhuxiangyi

Copy link
Copy Markdown

Motivation

Local, high-frequency updates on a wide nested struct are expensive under today's data evolution: because the smallest evolvable unit is a top-level column, changing one sub-field (nest.a) rewrites the entire nest column — including all the unchanged sub-fields — into a new column-group file. This causes significant write amplification and storage waste exactly in the workloads that update structs most often. This PR lowers the column-group granularity to the leaf field, so "update one sub-field" only incrementally writes that sub-field, eliminating this class of write amplification at the root.

Purpose

This PR pushes column-group granularity down to the leaf field: updating a single sub-field writes an incremental file containing only that leaf (a dotted write column like nest.a), aligned by row id; on read the sub-fields scattered across files are reassembled into the full struct.

Use cases — "wide nested struct + frequent local updates":

  1. Local update of a user/entity profile. A row holds a wide profile STRUCT<age, city, tags, last_login, score, ...>, but each operation only updates one or two sub-fields (login updates last_login, risk-control updates score).

    • Without this: every update rewrites the whole profile (dozens of unchanged sub-fields).
    • With this: only a profile.last_login incremental file is written, aligned by row id; the full profile is reassembled on read.
    • Benefit: write amplification drops sharply, especially for wide structs.
  2. Different pipelines/teams own different sub-fields of one struct. Pipeline A owns nest.a, pipeline B owns nest.b.

    • Each only incrementally writes its own part without rewriting the other's, and the full struct is merged back by row id on read.
    • Fits wide tables where a row is assembled by multiple owners.

Gated by a new table option data-evolution.nested-field.enabled (default false); when disabled the behavior is identical to before (whole-column rewrite). Engine entries: Spark MERGE INTO and Flink data_evolution_merge_into action.

Design (high level)

  • Encode writeCols as dotted paths (nest.a) instead of only top-level names — no DataFileMeta serialization change. New RowType.projectByPaths / leafPaths convert between a (partial) nested type and its dotted paths, preserving field ids.
  • Write: a partial-struct write records its real sub-field content as dotted writeCols.
  • Read (DataEvolutionSplitRead): match files at leaf field-id granularity and assemble a struct split across files sub-field by sub-field (latest-wins per leaf). DataEvolutionRow composes the struct from several source files.
  • Spark (MergeIntoPaimonDataEvolutionTable): prune the aligned update to only the changed leaves; fall back to whole-column write when not safely determinable.
  • Flink (DataEvolutionMergeIntoAction): parse dotted SET targets, rebuild a partial struct as CAST(ROW(...) AS ROW<...>), and write via projectByPaths. Reuses the existing top-level pipeline (row-id assign / shuffle / partial-write operator / commit).
  • Compaction works through the merged read unchanged.

Tests

  • core: NestedDataEvolutionTableTest (5), NestedSubfieldDataEvolutionTableTest (3) — sub-field groups assembled, late overwrite, projection, compaction merges sub-fields.
  • spark: NestedSubfieldMergeIntoTest — single sub-field incremental write, whole-struct write, flag-off fallback.
  • flink: NestedSubfieldMergeIntoActionITCase (5) — single/multiple sub-fields (asserting dotted writeCols), whole-struct, flag-off rejection, deeper-than-one-level rejection.

API and Format

  • New table option: data-evolution.nested-field.enabled (Boolean, default false).
  • No change to DataFileMeta / manifest format — writeCols semantics extended (a dotted entry means a written sub-field; a plain entry still means the whole column). Backward compatible with existing files.

Documentation

  • Regenerated docs/generated/core_configuration.html for the new option.

Limitations (follow-ups)

  • Cross-file struct assembly supports one level of ROW only; deeper splits are rejected (or fall back to whole-column write).
  • Global index on nested sub-fields is out of scope.
  • Predicate stats are skipped for partially-written nested struct files (correctness-safe; loses file skipping).
  • Columnar fast-path and escaping for column names containing . are follow-ups.

Append tables with row-tracking + data-evolution previously could only
write/merge whole top-level columns. This extends column groups down to
nested sub-field granularity, so a single sub-field of a ROW column (e.g.
nest.a) can be written into its own row-id-aligned file and merged back
into the full struct at read time.

Key changes:
- RowType.projectByPaths / leafPaths: project and describe a (possibly
  partial) nested row type via dotted paths, preserving field ids. This
  lets writeCols carry nested paths ("nest.a") with no DataFileMeta
  serialization change. TableSchema.project now uses projectByPaths.
- BaseAppendFileStoreWrite.withWriteType: derive writeCols as leaf paths
  so a partial-struct write records its real sub-field content.
- DataEvolutionSplitRead: match files at leaf-field-id granularity and
  build a tree-shaped assembly plan; a struct split across files is
  composed sub-field by sub-field. Format-reader cache key uses absolute
  paths to avoid collisions between files reading different sub-fields.
- DataEvolutionRow: assemble a nested struct from several source files
  (NestedField plan), with sub-field-level latest-wins.

Compaction works through the merged read unchanged. Global index on
nested sub-fields and columnar fast-path are left as follow-ups (see
nested-subfield-data-evolution-design.md).

Tests: NestedSubfieldDataEvolutionTableTest (sub-field groups assembled,
sub-field late overwrite, compaction merges sub-fields) and the existing
NestedDataEvolutionTableTest both pass.
When a MERGE INTO on a row-tracking + data-evolution table updates only a
sub-field of a nested struct column (e.g. SET t.nest.a = s.x), write an
incremental file containing only that leaf (dotted write column nest.a)
aligned by row-id, instead of rewriting the whole top-level column.

- MergeIntoPaimonDataEvolutionTable: prune the update output to only the
  changed leaves, build the dotted write paths, and project the write type
  via RowType.projectByPaths.
- DataEvolutionPaimonWriter: add a sub-field-aware writePartialFields
  overload that takes an already-pruned RowType.
- Add NestedSubfieldMergeIntoTest covering single sub-field and whole-struct
  updates.
…review findings

Add the data-evolution.nested-field.enabled table option (default off) to
gate sub-field-level data evolution, and fix issues found in review:

- RowType.leafPaths: skip field ids absent from the reference type (e.g. the
  _ROW_ID / _SEQUENCE_NUMBER special fields), so withWriteType no longer
  throws for row-tracking tables during append compaction.
- RowType.projectByPaths: prefer an exact field-name match before splitting on
  '.', preserving columns whose names contain a dot; reject ambiguous nested
  paths when a field name contains '.'.
- DataEvolutionSplitRead: only read a struct whole from a single file when that
  file covers all its leaves, otherwise compose from the provided sub-fields;
  restore the not-null check at sub-field level; reject deeper-than-one-level
  partial sub-structs explicitly.
- DataEvolutionRow: give a composed struct a defined RowKind when every source
  partial is null.
- BaseAppendFileStoreWrite.compactRewrite: encode writeCols via leafPaths to
  match the main write path.
- MergeIntoPaimonDataEvolutionTable: gate sub-field pruning on the new option
  and only prune one level deep (the depth the reader can compose).
- DataEvolutionFileStoreScan: document the intentional stats skip for
  partially-written nested struct files.
- Regenerate core_configuration.html for the new option.
Extend DataEvolutionMergeIntoAction so a SET that targets a nested
sub-field (e.g. T.nest.a = S.x) writes an incremental file containing only
that leaf (dotted write column nest.a) aligned by row id, instead of
rewriting the whole top-level column. Reuses the existing top-level
data-evolution pipeline; only the column granularity is generalized to
dotted paths.

- DataEvolutionMergeIntoAction.buildSource: parse dotted SET targets
  (stripping the table qualifier), group by top-level column, and rebuild a
  partially-updated struct as CAST(ROW(...) AS ROW<...>) with sub-fields in
  schema order; derive dotted writePaths and a pruned sourceType. Gate on
  data-evolution.nested-field.enabled and reject deeper-than-one-level paths.
  checkSchema now accepts a partial (subset) struct.
- DataEvolutionPartialWriteOperator: take writePaths and use
  projectByPaths for the write type; use the pruned source type directly.
- Add NestedSubfieldMergeIntoActionITCase (single/multiple sub-fields,
  whole-struct, disabled-throws, deeper-nesting-throws).
public class NestedSubfieldMergeIntoActionITCase extends ActionITCaseBase {

@Override
public void before() throws IOException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This override drops the @BeforeEach annotation from ActionITCaseBase.before(), so JUnit never runs the setup for this class. As a result warehouse/catalog are not initialized and ReadWriteTableTestUtil.init(warehouse) is not called; the new test class currently fails all five tests with NPE at the first sEnv.executeSql(...). Please add @BeforeEach here (as the other action ITs do) so both the base setup and init(warehouse) run before each test.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks! You're right — overriding before() without re-adding @BeforeEach means JUnit never runs the base setup, so warehouse/init(warehouse) were uninitialized. Fixed in 23766fd by adding @BeforeEach to the override.

sEnv.executeSql(
buildDdl(
"T",
Arrays.asList("id INT", "nest ROW<a INT, inner ROW<x INT, y INT>>"),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After adding the missing @BeforeEach locally to let this test class initialize, this DDL still fails before reaching the assertion: Flink's parser treats inner as a keyword (SQL parse failed. Encountered "inner" at line 1, column 41). Please quote the nested field name (and the matching CAST(ROW(... ) AS ROW<...>) below) or use a non-keyword name, otherwise testUpdateDeeplyNestedSubFieldThrows cannot exercise the intended deeper-than-one-level validation.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! inner collides with the Flink SQL reserved word and breaks DDL parsing. Renamed the nested sub-field innersub (in the DDL, the CAST(ROW(...)) and the SET target) in 23766fd, so testUpdateDeeplyNestedSubFieldThrows now reaches and exercises the deeper-than-one-level validation.

- spark NestedSubfieldMergeIntoTest: apply scalafmt (single-line test name)
  to satisfy spotless-check.
- flink NestedSubfieldMergeIntoActionITCase: add @beforeeach to the before()
  override so JUnit runs base setup + init(warehouse) (was NPE-ing all tests);
  rename the nested sub-field 'inner' to 'sub' to avoid the Flink SQL reserved
  word that broke DDL parsing.
@zhuxiangyi

Copy link
Copy Markdown
Author

Thanks for the review @JingsongLi! Addressed both points in 23766fd:

  • Added @BeforeEach to the before() override (tests were NPE-ing without base setup).
  • Renamed the nested field innersub to avoid the Flink SQL reserved word.

Also fixed the spotless-check failure (the spark-ut test wasn't formatted). CI is re-running.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants