From d8a7957707126d51eb5dd0943179bc006959b7a7 Mon Sep 17 00:00:00 2001 From: Xiangyi Zhu <82511136+zhuxiangyi@users.noreply.github.com> Date: Fri, 19 Jun 2026 01:35:24 +0800 Subject: [PATCH 1/9] [core] Support sub-field-level data evolution for nested columns Append tables with row-tracking + data-evolution previously could only write/merge whole top-level columns. This extends column groups down to nested sub-field granularity, so a single sub-field of a ROW column (e.g. nest.a) can be written into its own row-id-aligned file and merged back into the full struct at read time. Key changes: - RowType.projectByPaths / leafPaths: project and describe a (possibly partial) nested row type via dotted paths, preserving field ids. This lets writeCols carry nested paths ("nest.a") with no DataFileMeta serialization change. TableSchema.project now uses projectByPaths. - BaseAppendFileStoreWrite.withWriteType: derive writeCols as leaf paths so a partial-struct write records its real sub-field content. - DataEvolutionSplitRead: match files at leaf-field-id granularity and build a tree-shaped assembly plan; a struct split across files is composed sub-field by sub-field. Format-reader cache key uses absolute paths to avoid collisions between files reading different sub-fields. - DataEvolutionRow: assemble a nested struct from several source files (NestedField plan), with sub-field-level latest-wins. Compaction works through the merged read unchanged. Global index on nested sub-fields and columnar fast-path are left as follow-ups (see nested-subfield-data-evolution-design.md). Tests: NestedSubfieldDataEvolutionTableTest (sub-field groups assembled, sub-field late overwrite, compaction merges sub-fields) and the existing NestedDataEvolutionTableTest both pass. --- .../org/apache/paimon/schema/TableSchema.java | 4 +- .../java/org/apache/paimon/types/RowType.java | 108 +++++ .../reader/DataEvolutionFileReader.java | 11 + .../paimon/reader/DataEvolutionRow.java | 83 +++- .../operation/BaseAppendFileStoreWrite.java | 5 +- .../operation/DataEvolutionSplitRead.java | 311 ++++++++++++--- .../table/NestedDataEvolutionTableTest.java | 374 ++++++++++++++++++ .../NestedSubfieldDataEvolutionTableTest.java | 300 ++++++++++++++ 8 files changed, 1130 insertions(+), 66 deletions(-) create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/NestedDataEvolutionTableTest.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/NestedSubfieldDataEvolutionTableTest.java diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java index 8fee740fd1cd..9a429f54da28 100644 --- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -288,7 +288,9 @@ public TableSchema project(@Nullable List writeCols) { return new TableSchema( version, id, - new RowType(fields).project(writeCols).getFields(), + // writeCols may contain nested dotted paths (e.g. "nest.a") for sub-field-level + // data evolution; projectByPaths handles both plain top-level names and paths + new RowType(fields).projectByPaths(writeCols).getFields(), highestFieldId, partitionKeys, primaryKeys, diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java index e592ff71619b..edf705602fa8 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java @@ -312,6 +312,114 @@ public RowType project(String... names) { return project(Arrays.asList(names)); } + /** + * Project this row type by a list of (possibly nested) dotted paths, e.g. {@code ["f0", + * "nest.a"]}. A path without a dot selects the whole top-level field (same as {@link + * #project(List)}); a dotted path selects only the addressed sub-field of a nested {@link + * RowType}, preserving field ids and nullability of every level. Schema field order is + * preserved. This is used by data evolution to reconstruct the partial nested schema of a + * column-group file from its {@code writeCols}. + */ + public RowType projectByPaths(List paths) { + return projectTypeByPaths(this, paths); + } + + private static RowType projectTypeByPaths(RowType type, List paths) { + // group paths by their immediate child name; a child appearing without a tail (or also with + // a tail) is selected as a whole field + Map> childToSubPaths = new HashMap<>(); + Set wholeChildren = new HashSet<>(); + for (String path : paths) { + int dot = path.indexOf('.'); + if (dot < 0) { + childToSubPaths.computeIfAbsent(path, k -> new ArrayList<>()); + wholeChildren.add(path); + } else { + String head = path.substring(0, dot); + String tail = path.substring(dot + 1); + childToSubPaths.computeIfAbsent(head, k -> new ArrayList<>()).add(tail); + } + } + + Set matched = new HashSet<>(); + List result = new ArrayList<>(); + for (DataField field : type.getFields()) { + List subPaths = childToSubPaths.get(field.name()); + if (subPaths == null) { + continue; + } + matched.add(field.name()); + if (wholeChildren.contains(field.name()) + || subPaths.isEmpty() + || !(field.type() instanceof RowType)) { + result.add(field); + } else { + RowType prunedChild = + projectTypeByPaths((RowType) field.type(), subPaths) + .copy(field.type().isNullable()); + result.add(field.newType(prunedChild)); + } + } + if (!matched.containsAll(childToSubPaths.keySet())) { + Set unknown = new HashSet<>(childToSubPaths.keySet()); + unknown.removeAll(matched); + throw new IllegalArgumentException( + "Cannot project by paths, unknown field(s) " + unknown + " in " + type); + } + return new RowType(type.isNullable(), result); + } + + /** + * Compute the dotted paths describing this (possibly partially nested) write type relative to a + * full row type. A top-level field, or a nested field whose structure fully covers the + * corresponding field in {@code fullType}, is emitted by its name; a nested field that only + * covers some sub-fields is expanded into dotted leaf paths. This is the inverse of {@link + * #projectByPaths(List)} and is used to derive {@code writeCols}. + */ + public List leafPaths(RowType fullType) { + List result = new ArrayList<>(); + collectLeafPaths(getFields(), fullType, "", result); + return result; + } + + private static void collectLeafPaths( + List writeFields, RowType fullType, String prefix, List out) { + for (DataField writeField : writeFields) { + DataField fullField = fullType.getField(writeField.id()); + String path = prefix.isEmpty() ? writeField.name() : prefix + "." + writeField.name(); + if (writeField.type() instanceof RowType + && fullField.type() instanceof RowType + && !coversFully((RowType) writeField.type(), (RowType) fullField.type())) { + collectLeafPaths( + ((RowType) writeField.type()).getFields(), + (RowType) fullField.type(), + path, + out); + } else { + out.add(path); + } + } + } + + /** Whether {@code part} contains every (recursively nested) field of {@code full}. */ + private static boolean coversFully(RowType part, RowType full) { + if (part.getFieldCount() != full.getFieldCount()) { + return false; + } + for (DataField fullField : full.getFields()) { + if (!part.containsField(fullField.id())) { + return false; + } + DataField partField = part.getField(fullField.id()); + if (partField.type() instanceof RowType && fullField.type() instanceof RowType) { + if (!coversFully((RowType) partField.type(), (RowType) fullField.type())) { + return false; + } + } + } + return true; + } + private Map nameToField() { Map nameToField = this.laziedNameToField; if (nameToField == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java index 53766440ff01..76e00e049eb5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java @@ -57,9 +57,18 @@ public class DataEvolutionFileReader implements RecordReader { private final int[] rowOffsets; private final int[] fieldOffsets; private final RecordReader[] readers; + @Nullable private final DataEvolutionRow.NestedField[] nested; public DataEvolutionFileReader( int[] rowOffsets, int[] fieldOffsets, RecordReader[] readers) { + this(rowOffsets, fieldOffsets, readers, null); + } + + public DataEvolutionFileReader( + int[] rowOffsets, + int[] fieldOffsets, + RecordReader[] readers, + @Nullable DataEvolutionRow.NestedField[] nested) { checkArgument(rowOffsets != null, "Row offsets must not be null"); checkArgument(fieldOffsets != null, "Field offsets must not be null"); checkArgument( @@ -70,12 +79,14 @@ public DataEvolutionFileReader( this.rowOffsets = rowOffsets; this.fieldOffsets = fieldOffsets; this.readers = readers; + this.nested = nested; } @Override @Nullable public RecordIterator readBatch() throws IOException { DataEvolutionRow row = new DataEvolutionRow(readers.length, rowOffsets, fieldOffsets); + row.setNested(nested); RecordIterator[] iterators = new RecordIterator[readers.length]; for (int i = 0; i < readers.length; i++) { RecordReader reader = readers[i]; diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java index 08c6d24d2b79..cd02972b569e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java @@ -35,6 +35,14 @@ public class DataEvolutionRow implements InternalRow { private final InternalRow[] rows; private final int[] rowOffsets; private final int[] fieldOffsets; + + /** + * Optional per-top-level-field plan to assemble a nested struct whose sub-fields are spread + * across several source files (sub-field-level data evolution). {@code null} (or a {@code null} + * entry) means the field is taken whole from a single source row (the common case). + */ + private NestedField[] nested; + private RowKind rowKind; public DataEvolutionRow(int rowNumber, int[] rowOffsets, int[] fieldOffsets) { @@ -43,6 +51,10 @@ public DataEvolutionRow(int rowNumber, int[] rowOffsets, int[] fieldOffsets) { this.fieldOffsets = fieldOffsets; } + public void setNested(NestedField[] nested) { + this.nested = nested; + } + public int rowNumber() { return rows.length; } @@ -59,6 +71,15 @@ public void setRow(int pos, InternalRow row) { } } + private void setRowsAllowNull(InternalRow[] newRows) { + for (int i = 0; i < newRows.length; i++) { + this.rows[i] = newRows[i]; + if (rowKind == null && newRows[i] != null) { + this.rowKind = newRows[i].getRowKind(); + } + } + } + public void setRows(InternalRow[] rows) { if (rows.length != this.rows.length) { throw new IllegalArgumentException( @@ -97,10 +118,22 @@ public void setRowKind(RowKind kind) { @Override public boolean isNullAt(int pos) { + if (nested != null && nested[pos] != null) { + // a composed struct is null only when none of its source files provide it + NestedField nf = nested[pos]; + for (int k = 0; k < nf.numPartials; k++) { + InternalRow src = rows[nf.partialReader[k]]; + if (src != null && !src.isNullAt(nf.partialOffset[k])) { + return false; + } + } + return true; + } if (rowOffsets[pos] < 0) { return true; } - return chooseRow(pos).isNullAt(offsetInRow(pos)); + InternalRow row = chooseRow(pos); + return row == null || row.isNullAt(offsetInRow(pos)); } @Override @@ -185,6 +218,54 @@ public InternalMap getMap(int pos) { @Override public InternalRow getRow(int pos, int numFields) { + if (nested != null && nested[pos] != null) { + NestedField nf = nested[pos]; + InternalRow[] partials = new InternalRow[nf.numPartials]; + for (int k = 0; k < nf.numPartials; k++) { + InternalRow src = rows[nf.partialReader[k]]; + partials[k] = + (src == null || src.isNullAt(nf.partialOffset[k])) + ? null + : src.getRow(nf.partialOffset[k], nf.partialSize[k]); + } + DataEvolutionRow composed = + new DataEvolutionRow(nf.numPartials, nf.subRowOffsets, nf.subFieldOffsets); + composed.setRowsAllowNull(partials); + return composed; + } return chooseRow(pos).getRow(offsetInRow(pos), numFields); } + + /** + * Plan to assemble one nested struct from sub-fields spread across several source files. A + * "partial" is the projection of the struct read from a single source file; each output + * sub-field is sourced from one partial via {@code subRowOffsets}/{@code subFieldOffsets}. + */ + public static class NestedField { + + final int numPartials; + // per partial struct: the source reader, the struct's position in that reader's row, and + // the + // number of fields of the struct as read from that file + final int[] partialReader; + final int[] partialOffset; + final int[] partialSize; + // per output sub-field: which partial it comes from, and its offset within that partial + final int[] subRowOffsets; + final int[] subFieldOffsets; + + public NestedField( + int[] partialReader, + int[] partialOffset, + int[] partialSize, + int[] subRowOffsets, + int[] subFieldOffsets) { + this.numPartials = partialReader.length; + this.partialReader = partialReader; + this.partialOffset = partialOffset; + this.partialSize = partialSize; + this.subRowOffsets = subRowOffsets; + this.subFieldOffsets = subFieldOffsets; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java index 94620205a232..63ae5de32a79 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java @@ -163,9 +163,10 @@ public void withWriteType(RowType writeType) { if (blobContext != null) { blobContext = blobContext.withWriteType(writeType); } - int fullCount = rowType.getFieldCount(); List fullNames = rowType.getFieldNames(); - this.writeCols = writeType.getFieldNames(); + // writeCols carries (possibly nested) dotted paths, e.g. ["f0", "nest.a"]; a plain + // top-level name means the whole column, a dotted path means only that sub-field is written + this.writeCols = writeType.leafPaths(rowType); // optimize writeCols to null in following cases: // writeType contains all columns (without _ROW_ID and _SEQUENCE_NUMBER) if (writeCols.equals(fullNames)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index 145fdc9ad4af..ca312a370cef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -37,12 +37,12 @@ import org.apache.paimon.partition.PartitionUtils; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.DataEvolutionFileReader; +import org.apache.paimon.reader.DataEvolutionRow; import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.types.DataField; @@ -63,13 +63,16 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.TreeMap; import java.util.function.Function; import java.util.function.ToLongFunction; -import java.util.stream.Collectors; import static java.lang.String.format; import static java.util.Collections.reverseOrder; @@ -249,86 +252,270 @@ private DataEvolutionFileReader createUnionReader( // Init all we need to create a compound reader List allReadFields = readRowType.getFields(); - RecordReader[] fileRecordReaders = new RecordReader[fieldsFiles.size()]; - int[] readFieldIndex = allReadFields.stream().mapToInt(DataField::id).toArray(); - // which row the read field index belongs to - int[] rowOffsets = new int[allReadFields.size()]; - // which field index in the reading row - int[] fieldOffsets = new int[allReadFields.size()]; + int numFields = allReadFields.size(); + int numBunches = fieldsFiles.size(); + RecordReader[] fileRecordReaders = new RecordReader[numBunches]; + + // Step 1: for each bunch (file), gather its (possibly nested) data row type and the set of + // leaf field ids it physically provides. writeCols may carry nested dotted paths. + long[] bunchSchemaId = new long[numBunches]; + List> bunchLeaves = new ArrayList<>(); + for (int i = 0; i < numBunches; i++) { + DataFileMeta firstFile = fieldsFiles.get(i).files().get(0); + long schemaId = firstFile.schemaId(); + bunchSchemaId[i] = schemaId; + TableSchema dataSchema = schemaFetcher.apply(schemaId).project(firstFile.writeCols()); + RowType avail = rowTypeWithRowTracking(dataSchema.logicalRowType()); + Set leaves = new HashSet<>(); + collectLeafIds(avail.getFields(), leaves); + bunchLeaves.add(leaves); + } + + // Step 2: decide, per read field, whether it is taken whole from one file or composed from + // several files at sub-field granularity. Files are already sorted latest-first, so the + // first bunch providing a leaf wins (latest-wins semantics, now at sub-field level). + // selection per bunch: topFieldId -> null (whole) or set of selected sub-field ids + List>> bunchSelection = new ArrayList<>(); + for (int i = 0; i < numBunches; i++) { + bunchSelection.add(new LinkedHashMap<>()); + } + + int[] rowOffsets = new int[numFields]; + int[] fieldOffsets = new int[numFields]; Arrays.fill(rowOffsets, -1); Arrays.fill(fieldOffsets, -1); + DataEvolutionRow.NestedField[] nested = new DataEvolutionRow.NestedField[numFields]; + boolean[] composite = new boolean[numFields]; + int[] wholeBunch = new int[numFields]; + Arrays.fill(wholeBunch, -1); + + for (int j = 0; j < numFields; j++) { + DataField rf = allReadFields.get(j); + List leaves = leafIdsOf(rf); + Map leafProvider = new HashMap<>(); + Set providers = new HashSet<>(); + for (int leaf : leaves) { + int p = providerOf(leaf, bunchLeaves); + if (p >= 0) { + leafProvider.put(leaf, p); + providers.add(p); + } + } + if (providers.isEmpty()) { + // no file provides this field; it stays null (nullability checked below) + continue; + } + if (providers.size() == 1) { + int b = providers.iterator().next(); + bunchSelection.get(b).put(rf.id(), null); + wholeBunch[j] = b; + } else { + checkArgument( + rf.type() instanceof RowType, + "Field %s is split across files but is not a struct.", + rf.name()); + composite[j] = true; + for (DataField sub : ((RowType) rf.type()).getFields()) { + Set subProviders = new HashSet<>(); + for (int leaf : leafIdsOf(sub)) { + int p = leafProvider.getOrDefault(leaf, -1); + if (p >= 0) { + subProviders.add(p); + } + } + if (subProviders.size() > 1) { + throw new UnsupportedOperationException( + "Sub-field-level data evolution does not yet support splitting a " + + "nested sub-field (" + + rf.name() + + "." + + sub.name() + + ") across multiple files."); + } + if (subProviders.size() == 1) { + int b = subProviders.iterator().next(); + bunchSelection + .get(b) + .computeIfAbsent(rf.id(), k -> new LinkedHashSet<>()) + .add(sub.id()); + } + // else: sub-field absent everywhere -> stays null + } + } + } - for (int i = 0; i < fieldsFiles.size(); i++) { - FieldBunch bunch = fieldsFiles.get(i); - DataFileMeta firstFile = bunch.files().get(0); - String formatIdentifier = DataFilePathFactory.formatIdentifier(firstFile.fileName()); - long schemaId = firstFile.schemaId(); - TableSchema dataSchema = schemaFetcher.apply(schemaId).project(firstFile.writeCols()); - int[] fieldIds = - SpecialFields.rowTypeWithRowTracking(dataSchema.logicalRowType()).getFields() - .stream() - .mapToInt(DataField::id) - .toArray(); + // Step 3: materialize each bunch's partial read row type and the offset maps. + List> bunchReadFields = new ArrayList<>(); + List> bunchTopOffset = new ArrayList<>(); + List>> bunchSubOffset = new ArrayList<>(); + for (int i = 0; i < numBunches; i++) { + Map> sel = bunchSelection.get(i); List readFields = new ArrayList<>(); - for (int j = 0; j < readFieldIndex.length; j++) { - for (int fieldId : fieldIds) { - // Check if the read field index matches the file field - // index - if (readFieldIndex[j] == fieldId) { - // If the row offset is not set, set it to the current - // file reader - if (rowOffsets[j] == -1) { - // "i" is the reader index, and "readFields.size()" - // is the offset the that row - rowOffsets[j] = i; - fieldOffsets[j] = readFields.size(); - readFields.add(allReadFields.get(j)); + Map topOffset = new HashMap<>(); + Map> subOffset = new HashMap<>(); + for (Map.Entry> e : sel.entrySet()) { + int topId = e.getKey(); + Set subs = e.getValue(); + DataField readTop = readRowType.getField(topId); + if (subs == null) { + readFields.add(readTop); + topOffset.put(topId, readFields.size() - 1); + } else { + RowType readStruct = (RowType) readTop.type(); + List chosen = new ArrayList<>(); + Map subToIdx = new HashMap<>(); + for (DataField s : readStruct.getFields()) { + if (subs.contains(s.id())) { + subToIdx.put(s.id(), chosen.size()); + chosen.add(s); } - break; } + RowType partial = new RowType(readStruct.isNullable(), chosen); + readFields.add(readTop.newType(partial)); + topOffset.put(topId, readFields.size() - 1); + subOffset.put(topId, subToIdx); } } + bunchReadFields.add(readFields); + bunchTopOffset.add(topOffset); + bunchSubOffset.add(subOffset); + } + + // Step 4: wire output offsets (whole fields) and nested composition plans (split structs). + for (int j = 0; j < numFields; j++) { + DataField rf = allReadFields.get(j); + if (composite[j]) { + List subFields = ((RowType) rf.type()).getFields(); + int subCount = subFields.size(); + int[] subRowOffsets = new int[subCount]; + int[] subFieldOffsets = new int[subCount]; + Arrays.fill(subRowOffsets, -1); + Arrays.fill(subFieldOffsets, -1); + Map bunchToPartial = new LinkedHashMap<>(); + List partials = new ArrayList<>(); + for (int s = 0; s < subCount; s++) { + int subId = subFields.get(s).id(); + int b = findSubProvider(rf.id(), subId, bunchSubOffset); + if (b < 0) { + continue; + } + Integer pIdx = bunchToPartial.get(b); + if (pIdx == null) { + int topOff = bunchTopOffset.get(b).get(rf.id()); + int size = bunchSubOffset.get(b).get(rf.id()).size(); + pIdx = partials.size(); + bunchToPartial.put(b, pIdx); + partials.add(new int[] {b, topOff, size}); + } + subRowOffsets[s] = pIdx; + subFieldOffsets[s] = bunchSubOffset.get(b).get(rf.id()).get(subId); + } + int p = partials.size(); + int[] pr = new int[p]; + int[] po = new int[p]; + int[] ps = new int[p]; + for (int k = 0; k < p; k++) { + pr[k] = partials.get(k)[0]; + po[k] = partials.get(k)[1]; + ps[k] = partials.get(k)[2]; + } + nested[j] = + new DataEvolutionRow.NestedField( + pr, po, ps, subRowOffsets, subFieldOffsets); + } else if (wholeBunch[j] >= 0) { + int b = wholeBunch[j]; + rowOffsets[j] = b; + fieldOffsets[j] = bunchTopOffset.get(b).get(rf.id()); + } + } + // Step 5: build the per-bunch readers from the materialized partial read row types. + for (int i = 0; i < numBunches; i++) { + List readFields = bunchReadFields.get(i); if (readFields.isEmpty()) { fileRecordReaders[i] = null; - } else { - // create new FormatReaderMapping for read partial fields - List readFieldNames = - readFields.stream().map(DataField::name).collect(Collectors.toList()); - FormatReaderMapping formatReaderMapping = - formatReaderMappings.computeIfAbsent( - new FormatKey(schemaId, formatIdentifier, readFieldNames), - key -> - formatBuilder.build( - formatIdentifier, - schema, - dataSchema, - readFields, - false)); - RowType partialReadRowType = new RowType(readFields); - fileRecordReaders[i] = - new ForceSingleBatchReader( - createFieldBunchReader( - partition, - bunch, - dataFilePathFactory, - formatReaderMapping, - rowRanges, - partialReadRowType)); + continue; } + FieldBunch bunch = fieldsFiles.get(i); + DataFileMeta firstFile = bunch.files().get(0); + String formatIdentifier = DataFilePathFactory.formatIdentifier(firstFile.fileName()); + long schemaId = bunchSchemaId[i]; + TableSchema dataSchema = schemaFetcher.apply(schemaId).project(firstFile.writeCols()); + RowType partialReadRowType = new RowType(readFields); + // cache key must use paths relative to the full schema so that two files reading + // different sub-fields of the same struct (e.g. nest.a vs nest.b) do not collide + RowType fullRef = + rowTypeWithRowTracking(schemaFetcher.apply(schemaId).logicalRowType()); + List readFieldNames = partialReadRowType.leafPaths(fullRef); + FormatReaderMapping formatReaderMapping = + formatReaderMappings.computeIfAbsent( + new FormatKey(schemaId, formatIdentifier, readFieldNames), + key -> + formatBuilder.build( + formatIdentifier, + schema, + dataSchema, + readFields, + false)); + fileRecordReaders[i] = + new ForceSingleBatchReader( + createFieldBunchReader( + partition, + bunch, + dataFilePathFactory, + formatReaderMapping, + rowRanges, + partialReadRowType)); } - for (int i = 0; i < rowOffsets.length; i++) { - if (rowOffsets[i] == -1) { + for (int j = 0; j < numFields; j++) { + if (rowOffsets[j] == -1 && nested[j] == null) { checkArgument( - allReadFields.get(i).type().isNullable(), + allReadFields.get(j).type().isNullable(), format( "Field %s is not null but can't find any file contains it.", - allReadFields.get(i))); + allReadFields.get(j))); } } - return new DataEvolutionFileReader(rowOffsets, fieldOffsets, fileRecordReaders); + return new DataEvolutionFileReader(rowOffsets, fieldOffsets, fileRecordReaders, nested); + } + + /** Collect (recursively) the leaf field ids of {@code fields}; only ROW types recurse. */ + private static void collectLeafIds(List fields, java.util.Collection out) { + for (DataField f : fields) { + if (f.type() instanceof RowType) { + collectLeafIds(((RowType) f.type()).getFields(), out); + } else { + out.add(f.id()); + } + } + } + + private static List leafIdsOf(DataField field) { + List result = new ArrayList<>(); + collectLeafIds(Collections.singletonList(field), result); + return result; + } + + private static int providerOf(int leafId, List> bunchLeaves) { + for (int i = 0; i < bunchLeaves.size(); i++) { + if (bunchLeaves.get(i).contains(leafId)) { + return i; + } + } + return -1; + } + + private static int findSubProvider( + int topId, int subId, List>> bunchSubOffset) { + for (int b = 0; b < bunchSubOffset.size(); b++) { + Map sm = bunchSubOffset.get(b).get(topId); + if (sm != null && sm.containsKey(subId)) { + return b; + } + } + return -1; } private RecordReader createFieldBunchReader( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/NestedDataEvolutionTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/NestedDataEvolutionTableTest.java new file mode 100644 index 000000000000..e507343f38b6 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/NestedDataEvolutionTableTest.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.DataEvolutionFileReader; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.source.EndOfScanException; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for data-evolution + row-tracking with nested columns (ROW / ARRAY / MAP). + * + *

These tests verify that a nested column is handled as a single, indivisible top-level field by + * data evolution: it can live in its own column-group file and be merged back by field id, while + * {@code _ROW_ID} / {@code _SEQUENCE_NUMBER} stay aligned by {@code firstRowId + position}. They + * also pin down the limitation that sub-fields of a nested ROW cannot be split into a separate + * column group (the minimum granularity is a top-level column). + */ +public class NestedDataEvolutionTableTest extends DataEvolutionTestBase { + + // f0(0) INT, f1(1) STRING, nest(2) ROW, arr(3) ARRAY, mp(4) + // MAP + @Override + protected Schema schemaDefault() { + Schema.Builder b = Schema.newBuilder(); + b.column("f0", DataTypes.INT()); + b.column("f1", DataTypes.STRING()); + b.column("nest", DataTypes.ROW(DataTypes.INT(), DataTypes.STRING())); + b.column("arr", DataTypes.ARRAY(DataTypes.INT())); + b.column("mp", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())); + b.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + b.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + return b.build(); + } + + private static GenericRow nestOf(int a, String bStr) { + return GenericRow.of(a, BinaryString.fromString(bStr)); + } + + private static GenericArray arrOf(int... values) { + return new GenericArray(values); + } + + private static GenericMap mapOf(String k, int v) { + Map m = new HashMap<>(); + m.put(BinaryString.fromString(k), v); + return new GenericMap(m); + } + + /** G1 + G2: nested column as its own column group, merged by field id, row-id aligned. */ + @Test + public void testNestedColumnGroupMerge() throws Exception { + createTableDefault(); + Schema schema = schemaDefault(); + int n = 50; + + RowType cgA = schema.rowType().project(Arrays.asList("f0", "f1")); + RowType cgB = schema.rowType().project(Collections.singletonList("nest")); + RowType cgC = schema.rowType().project(Arrays.asList("arr", "mp")); + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + + // column group A: {f0, f1} + try (BatchTableWrite w = builder.newWrite().withWriteType(cgA)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(i, BinaryString.fromString("a" + i))); + } + BatchTableCommit commit = builder.newCommit(); + commit.commit(w.prepareCommit()); + } + + // column group B: {nest} + try (BatchTableWrite w = builder.newWrite().withWriteType(cgB)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(nestOf(i, "n" + i))); + } + BatchTableCommit commit = builder.newCommit(); + List commitables = w.prepareCommit(); + setFirstRowId(commitables, 0L); + commit.commit(commitables); + } + + // column group C: {arr, mp} + try (BatchTableWrite w = builder.newWrite().withWriteType(cgC)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(arrOf(i, i + 1), mapOf("k" + i, i))); + } + BatchTableCommit commit = builder.newCommit(); + List commitables = w.prepareCommit(); + setFirstRowId(commitables, 0L); + commit.commit(commitables); + } + + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + assertThat(reader).isInstanceOf(DataEvolutionFileReader.class); + + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + assertThat(r.getInt(0)).isEqualTo(i); + assertThat(r.getString(1).toString()).isEqualTo("a" + i); + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i); + assertThat(nest.getString(1).toString()).isEqualTo("n" + i); + assertThat(r.getArray(3).getInt(0)).isEqualTo(i); + assertThat(r.getArray(3).getInt(1)).isEqualTo(i + 1); + assertThat(r.getMap(4).keyArray().getString(0).toString()).isEqualTo("k" + i); + assertThat(r.getMap(4).valueArray().getInt(0)).isEqualTo(i); + }); + assertThat(idx.get()).isEqualTo(n); + + // _ROW_ID is contiguous 0..n-1 across the merged column groups. + List rowIds = readRowIds(); + assertThat(rowIds).hasSize(n); + for (int i = 0; i < n; i++) { + assertThat(rowIds.get(i)).isEqualTo((long) i); + } + } + + /** + * G3: a later snapshot rewrites only the nested column group; other columns keep old values. + */ + @Test + public void testNestedColumnLateOverwrite() throws Exception { + createTableDefault(); + Schema schema = schemaDefault(); + int n = 20; + + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + // full write + try (BatchTableWrite w = builder.newWrite().withWriteType(schema.rowType())) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, + BinaryString.fromString("a" + i), + nestOf(i, "old" + i), + arrOf(i), + mapOf("k" + i, i))); + } + BatchTableCommit commit = builder.newCommit(); + commit.commit(w.prepareCommit()); + } + + // later snapshot: overwrite only the nest column group, aligned to the same row-id range + RowType cgNest = schema.rowType().project(Collections.singletonList("nest")); + try (BatchTableWrite w = builder.newWrite().withWriteType(cgNest)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(nestOf(i * 10, "new" + i))); + } + BatchTableCommit commit = builder.newCommit(); + List commitables = w.prepareCommit(); + setFirstRowId(commitables, 0L); + commit.commit(commitables); + } + + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + // untouched columns keep old values + assertThat(r.getInt(0)).isEqualTo(i); + assertThat(r.getString(1).toString()).isEqualTo("a" + i); + assertThat(r.getArray(3).getInt(0)).isEqualTo(i); + // nest column reflects the new values + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i * 10); + assertThat(nest.getString(1).toString()).isEqualTo("new" + i); + }); + assertThat(idx.get()).isEqualTo(n); + + // row ids unchanged + List rowIds = readRowIds(); + for (int i = 0; i < n; i++) { + assertThat(rowIds.get(i)).isEqualTo((long) i); + } + } + + /** G4: projection covering special fields and nested columns. */ + @Test + public void testProjectionWithNested() throws Exception { + createTableDefault(); + Schema schema = schemaDefault(); + int n = 10; + + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + try (BatchTableWrite w = builder.newWrite().withWriteType(schema.rowType())) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, + BinaryString.fromString("a" + i), + nestOf(i, "n" + i), + arrOf(i), + mapOf("k" + i, i))); + } + BatchTableCommit commit = builder.newCommit(); + commit.commit(w.prepareCommit()); + } + + // project only the nested column (keep the real "nest" field id via project) + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder + .withReadType( + getTableDefault() + .rowType() + .project(Collections.singletonList("nest"))) + .newRead() + .createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + InternalRow nest = r.getRow(0, 2); + assertThat(nest.getInt(0)).isEqualTo(i); + assertThat(nest.getString(1).toString()).isEqualTo("n" + i); + }); + assertThat(idx.get()).isEqualTo(n); + + // project only _ROW_ID + assertThat(readRowIds()).hasSize(n); + } + + /** + * G5 (limitation): a sub-field of a nested ROW cannot be addressed as a top-level write column, + * because data evolution splits at top-level column granularity. {@code RowType.project} + * matches only top-level field names, so a nested sub-field name has no match. + */ + @Test + public void testNestedSubFieldCannotBeSplit() { + RowType rowType = schemaDefault().rowType(); + // "a" / "b" are sub-fields inside "nest"; they are not top-level columns. + assertThatThrownBy(() -> rowType.project(Collections.singletonList("a"))) + .isInstanceOf(IndexOutOfBoundsException.class); + // a nested ROW can only be projected as a whole top-level column. + assertThat(rowType.project(Collections.singletonList("nest")).getFieldNames()) + .containsExactly("nest"); + } + + /** + * G6: compacting column-group files (one of which is the nested column) merges them into a + * single file; data values and row ids survive the {@code DataEvolutionRowIdReassigner}. + */ + @Test + public void testCompactionWithNested() throws Exception { + createTableDefault(); + Schema schema = schemaDefault(); + int n = 30; + + RowType cgA = schema.rowType().project(Arrays.asList("f0", "f1", "arr", "mp")); + RowType cgB = schema.rowType().project(Collections.singletonList("nest")); + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + + try (BatchTableWrite w = builder.newWrite().withWriteType(cgA)) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, BinaryString.fromString("a" + i), arrOf(i), mapOf("k" + i, i))); + } + builder.newCommit().commit(w.prepareCommit()); + } + try (BatchTableWrite w = builder.newWrite().withWriteType(cgB)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(nestOf(i, "n" + i))); + } + List commitables = w.prepareCommit(); + setFirstRowId(commitables, 0L); + builder.newCommit().commit(commitables); + } + + // run data-evolution compaction via the coordinator (merges the two column groups) + FileStoreTable table = getTableDefault(); + DataEvolutionCompactCoordinator coordinator = + new DataEvolutionCompactCoordinator(table, false, false); + List commitMessages = new ArrayList<>(); + List tasks; + try { + while (!(tasks = coordinator.plan()).isEmpty()) { + for (DataEvolutionCompactTask task : tasks) { + commitMessages.add(task.doCompact(table, "nested-compact")); + } + } + } catch (EndOfScanException ignore) { + } + if (!commitMessages.isEmpty()) { + table.newBatchWriteBuilder().newCommit().commit(commitMessages); + } + + // data and row ids must be unchanged after compaction + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + assertThat(r.getInt(0)).isEqualTo(i); + assertThat(r.getString(1).toString()).isEqualTo("a" + i); + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i); + assertThat(nest.getString(1).toString()).isEqualTo("n" + i); + assertThat(r.getArray(3).getInt(0)).isEqualTo(i); + }); + assertThat(idx.get()).isEqualTo(n); + + List rowIds = readRowIds(); + assertThat(rowIds).hasSize(n); + for (int i = 0; i < n; i++) { + assertThat(rowIds.get(i)).isEqualTo((long) i); + } + } + + private List readRowIds() throws Exception { + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder + .withReadType(RowType.of(SpecialFields.ROW_ID)) + .newRead() + .createReader(readBuilder.newScan().plan()); + List rowIds = new ArrayList<>(); + reader.forEachRemaining(r -> rowIds.add(r.getLong(0))); + return rowIds; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/NestedSubfieldDataEvolutionTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/NestedSubfieldDataEvolutionTableTest.java new file mode 100644 index 000000000000..0e33b1c46e41 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/NestedSubfieldDataEvolutionTableTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.source.EndOfScanException; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for sub-field-level data evolution + row-tracking: updating a single sub-field of a + * nested ROW column by writing an incremental file that only contains that sub-field, aligned by + * row-id, and reading the struct back by assembling sub-fields from several files. + */ +public class NestedSubfieldDataEvolutionTableTest extends DataEvolutionTestBase { + + // f0(0) INT, f1(1) STRING, nest(2) ROW, arr(3) ARRAY, mp(4) + // MAP + @Override + protected Schema schemaDefault() { + Schema.Builder b = Schema.newBuilder(); + b.column("f0", DataTypes.INT()); + b.column("f1", DataTypes.STRING()); + b.column( + "nest", + DataTypes.ROW( + DataTypes.FIELD(0, "a", DataTypes.INT()), + DataTypes.FIELD(1, "b", DataTypes.STRING()))); + b.column("arr", DataTypes.ARRAY(DataTypes.INT())); + b.column("mp", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())); + b.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + b.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + return b.build(); + } + + private static GenericArray arrOf(int v) { + return new GenericArray(new int[] {v}); + } + + private static GenericMap mapOf(String k, int v) { + Map m = new HashMap<>(); + m.put(BinaryString.fromString(k), v); + return new GenericMap(m); + } + + private void commit(BatchWriteBuilder builder, List messages) throws Exception { + try (BatchTableCommit commit = builder.newCommit()) { + commit.commit(messages); + } + } + + /** + * Core: write sub-field {@code nest.a} and {@code nest.b} into two separate files (plus the + * rest of the columns in a third), then read the full struct assembled from all three. + */ + @Test + public void testSubFieldGroupsAssembled() throws Exception { + createTableDefault(); + RowType full = getTableDefault().rowType(); + int n = 40; + + RowType cgRest = full.projectByPaths(Arrays.asList("f0", "f1", "arr", "mp")); + RowType cgA = full.projectByPaths(Collections.singletonList("nest.a")); + RowType cgB = full.projectByPaths(Collections.singletonList("nest.b")); + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + + try (BatchTableWrite w = builder.newWrite().withWriteType(cgRest)) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, BinaryString.fromString("a" + i), arrOf(i), mapOf("k" + i, i))); + } + commit(builder, w.prepareCommit()); + } + try (BatchTableWrite w = builder.newWrite().withWriteType(cgA)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(GenericRow.of(i * 2))); + } + List messages = w.prepareCommit(); + // a file that only writes nest.a must record a nested dotted path + assertThat(((CommitMessageImpl) messages.get(0)).newFilesIncrement().newFiles()) + .allSatisfy(f -> assertThat(f.writeCols()).containsExactly("nest.a")); + setFirstRowId(messages, 0L); + commit(builder, messages); + } + try (BatchTableWrite w = builder.newWrite().withWriteType(cgB)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(GenericRow.of(BinaryString.fromString("b" + i)))); + } + List messages = w.prepareCommit(); + setFirstRowId(messages, 0L); + commit(builder, messages); + } + + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + assertThat(r.getInt(0)).isEqualTo(i); + assertThat(r.getString(1).toString()).isEqualTo("a" + i); + // nest assembled: a from file A, b from file B + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i * 2); + assertThat(nest.getString(1).toString()).isEqualTo("b" + i); + assertThat(r.getArray(3).getInt(0)).isEqualTo(i); + assertThat(r.getMap(4).valueArray().getInt(0)).isEqualTo(i); + }); + assertThat(idx.get()).isEqualTo(n); + + assertThat(readRowIds()).containsExactlyElementsOf(rangeLongs(n)); + } + + /** + * A later snapshot updates only {@code nest.a} via an incremental sub-field file; {@code + * nest.b} and all other columns keep their original values, row-ids unchanged. + */ + @Test + public void testSubFieldLateOverwrite() throws Exception { + createTableDefault(); + RowType full = getTableDefault().rowType(); + int n = 25; + + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + // full write: nest = (i, "old"+i) + try (BatchTableWrite w = builder.newWrite().withWriteType(full)) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, + BinaryString.fromString("a" + i), + GenericRow.of(i, BinaryString.fromString("old" + i)), + arrOf(i), + mapOf("k" + i, i))); + } + commit(builder, w.prepareCommit()); + } + + // incremental: overwrite only nest.a + RowType cgA = full.projectByPaths(Collections.singletonList("nest.a")); + try (BatchTableWrite w = builder.newWrite().withWriteType(cgA)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(GenericRow.of(i + 1000))); + } + List messages = w.prepareCommit(); + setFirstRowId(messages, 0L); + commit(builder, messages); + } + + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + assertThat(r.getInt(0)).isEqualTo(i); + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i + 1000); // updated sub-field + assertThat(nest.getString(1).toString()).isEqualTo("old" + i); // kept + }); + assertThat(idx.get()).isEqualTo(n); + assertThat(readRowIds()).containsExactlyElementsOf(rangeLongs(n)); + } + + /** Compaction merges sub-field files into one full file; values and row-ids are preserved. */ + @Test + public void testCompactionMergesSubFields() throws Exception { + createTableDefault(); + RowType full = getTableDefault().rowType(); + int n = 20; + + RowType cgRest = full.projectByPaths(Arrays.asList("f0", "f1", "arr", "mp")); + RowType cgA = full.projectByPaths(Collections.singletonList("nest.a")); + RowType cgB = full.projectByPaths(Collections.singletonList("nest.b")); + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + + try (BatchTableWrite w = builder.newWrite().withWriteType(cgRest)) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, BinaryString.fromString("a" + i), arrOf(i), mapOf("k" + i, i))); + } + commit(builder, w.prepareCommit()); + } + try (BatchTableWrite w = builder.newWrite().withWriteType(cgA)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(GenericRow.of(i * 2))); + } + List messages = w.prepareCommit(); + setFirstRowId(messages, 0L); + commit(builder, messages); + } + try (BatchTableWrite w = builder.newWrite().withWriteType(cgB)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(GenericRow.of(BinaryString.fromString("b" + i)))); + } + List messages = w.prepareCommit(); + setFirstRowId(messages, 0L); + commit(builder, messages); + } + + FileStoreTable table = getTableDefault(); + DataEvolutionCompactCoordinator coordinator = + new DataEvolutionCompactCoordinator(table, false, false); + List compactMessages = new ArrayList<>(); + List tasks; + try { + while (!(tasks = coordinator.plan()).isEmpty()) { + for (DataEvolutionCompactTask task : tasks) { + compactMessages.add(task.doCompact(table, "subfield-compact")); + } + } + } catch (EndOfScanException ignore) { + } + if (!compactMessages.isEmpty()) { + commit(table.newBatchWriteBuilder(), compactMessages); + } + + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + assertThat(r.getInt(0)).isEqualTo(i); + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i * 2); + assertThat(nest.getString(1).toString()).isEqualTo("b" + i); + }); + assertThat(idx.get()).isEqualTo(n); + assertThat(readRowIds()).containsExactlyElementsOf(rangeLongs(n)); + } + + private List rangeLongs(int n) { + List out = new ArrayList<>(); + for (long i = 0; i < n; i++) { + out.add(i); + } + return out; + } + + private List readRowIds() throws Exception { + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder + .withReadType(RowType.of(SpecialFields.ROW_ID)) + .newRead() + .createReader(readBuilder.newScan().plan()); + List rowIds = new ArrayList<>(); + reader.forEachRemaining(r -> rowIds.add(r.getLong(0))); + return rowIds; + } +} From 1d8ffac501148600b5e8340c40d386a3009a793f Mon Sep 17 00:00:00 2001 From: Xiangyi Zhu <82511136+zhuxiangyi@users.noreply.github.com> Date: Sun, 21 Jun 2026 22:25:14 +0800 Subject: [PATCH 2/9] [spark] Support sub-field-level data evolution via MERGE INTO When a MERGE INTO on a row-tracking + data-evolution table updates only a sub-field of a nested struct column (e.g. SET t.nest.a = s.x), write an incremental file containing only that leaf (dotted write column nest.a) aligned by row-id, instead of rewriting the whole top-level column. - MergeIntoPaimonDataEvolutionTable: prune the update output to only the changed leaves, build the dotted write paths, and project the write type via RowType.projectByPaths. - DataEvolutionPaimonWriter: add a sub-field-aware writePartialFields overload that takes an already-pruned RowType. - Add NestedSubfieldMergeIntoTest covering single sub-field and whole-struct updates. --- .../commands/DataEvolutionPaimonWriter.scala | 17 +- .../MergeIntoPaimonDataEvolutionTable.scala | 221 +++++++++++++++++- .../sql/NestedSubfieldMergeIntoTest.scala | 101 ++++++++ 3 files changed, 331 insertions(+), 8 deletions(-) create mode 100644 paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala index 1c93bdcb754d..3b80bd96bdb1 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -26,6 +26,7 @@ import org.apache.paimon.table.sink._ import org.apache.paimon.table.source.DataSplit import org.apache.paimon.types.DataType import org.apache.paimon.types.DataTypeRoot.BLOB +import org.apache.paimon.types.RowType import org.apache.paimon.types.VectorType.isVectorStoreFile import org.apache.paimon.utils.SerializationUtils @@ -44,14 +45,26 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se override val table: FileStoreTable = paimonTable.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G")) + // Whole top-level column write (kept for callers that only update full columns). def writePartialFields( data: DataFrame, columnNames: Seq[String], rawBlobPlaceholderMarkerColumns: Map[String, String] = Map.empty): Seq[CommitMessage] = { + writePartialFields( + data, + table.rowType().projectByPaths(columnNames.asJava), + rawBlobPlaceholderMarkerColumns) + } + + // Sub-field-aware write: writeType is already pruned to the written top-level columns and + // (possibly) nested sub-fields via dotted paths. + def writePartialFields( + data: DataFrame, + writeType: RowType, + rawBlobPlaceholderMarkerColumns: Map[String, String]): Seq[CommitMessage] = { val sparkSession = data.sparkSession import sparkSession.implicits._ - assert(data.columns.length == columnNames.size + 2 + rawBlobPlaceholderMarkerColumns.size) - val writeType = table.rowType().project(columnNames.asJava) + assert(data.columns.length == writeType.getFieldCount + 2 + rawBlobPlaceholderMarkerColumns.size) val options = new CoreOptions(table.schema().options()) val blobInlineFields = options.blobInlineField().asScala.toSeq diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 321eb6f1478f..1d5d3888e9a1 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -43,14 +43,14 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.PaimonUtils._ import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver -import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, EqualTo, Expression, ExprId, Literal, Or, PythonUDF, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, CreateNamedStruct, EqualTo, Expression, ExprId, GetStructField, Literal, Or, PythonUDF, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.paimon.shims.SparkShimLoader -import org.apache.spark.sql.types.{BooleanType, StructType} +import org.apache.spark.sql.types.{BooleanType, DataType, StructType} import scala.collection.{immutable, mutable} import scala.collection.JavaConverters._ @@ -384,7 +384,52 @@ case class MergeIntoPaimonDataEvolutionTable( val rawBlobMarkerAttributes = rawBlobUpdateColumns.map( attr => AttributeReference(rawBlobMarkerNamesByColumn(attr.name), BooleanType, nullable = false)()) - val mergeOutput = updateColumnsSorted ++ metadataColumns ++ rawBlobMarkerAttributes + + // Sub-field-level pruning: for a struct column whose SET only touches some sub-fields, only the + // changed leaves are written (an incremental column-group file containing the partial struct); + // the rest are copied from the target. Falls back to whole-column write when the changed leaves + // cannot be safely determined, so behaviour never regresses. + val matchedUpdateActions = matchedActions.collect { case ua: UpdateAction => ua } + val prunedByExprId: Map[ExprId, (Seq[Seq[String]], StructType)] = + updateColumnsSorted.flatMap { + attr => + if (rawBlobUpdateColumns.exists(_.sameRef(attr))) { + None + } else { + attr.dataType match { + case st: StructType => + val perAction = matchedUpdateActions.flatMap { + ua => + ua.assignments + .find( + a => + isModifiedAssignment(a) && assignmentKeyAttribute(a).sameRef(attr)) + .map(a => changedLeaves(a.value, st, attr)) + } + if (perAction.isEmpty || perAction.exists(_.isEmpty)) { + None + } else { + val union = perAction.flatten.flatten.map(_._1).distinct + // prune only when it is a strict subset of all leaves of the struct + if (union.isEmpty || union.map(_.size).sum >= leafCount(st)) { + None + } else { + Some(attr.exprId -> (union, prunedStructType(st, union))) + } + } + case _ => None + } + } + }.toMap + + val mergeOutput = updateColumnsSorted.map { + attr => + prunedByExprId.get(attr.exprId) match { + case Some((_, prunedType)) => + AttributeReference(attr.name, prunedType, attr.nullable)() + case None => attr + } + } ++ metadataColumns ++ rawBlobMarkerAttributes val realUpdateActions = matchedActions .map(s => s.asInstanceOf[UpdateAction]) @@ -437,7 +482,20 @@ case class MergeIntoPaimonDataEvolutionTable( ) { Literal(null, attr.dataType) } else { - assignmentValue(action, attr) + prunedByExprId.get(attr.exprId) match { + case Some((paths, _)) => + val st = attr.dataType.asInstanceOf[StructType] + val actionMap = + changedLeaves(assignmentValue(action, attr), st, attr) + .getOrElse(Seq.empty) + .toMap + buildPrunedStruct( + st, + Nil, + paths, + p => actionMap.getOrElse(p, passthroughExpr(attr, st, p))) + case None => assignmentValue(action, attr) + } } } val metadata = metadataColumns.map(attr => assignmentValue(action, attr)) @@ -459,7 +517,12 @@ case class MergeIntoPaimonDataEvolutionTable( if (rawBlobUpdateColumns.exists(_.sameRef(attr))) { Literal(null, attr.dataType) } else { - attr + prunedByExprId.get(attr.exprId) match { + case Some((paths, _)) => + val st = attr.dataType.asInstanceOf[StructType] + buildPrunedStruct(st, Nil, paths, p => passthroughExpr(attr, st, p)) + case None => attr + } } } copiedColumns ++ metadataColumns ++ rawBlobUpdateColumns.map(_ => TrueLiteral) @@ -610,10 +673,20 @@ case class MergeIntoPaimonDataEvolutionTable( .sortWithinPartitions(FIRST_ROW_ID_NAME, ROW_ID_NAME) } + // dotted write paths: a whole column -> its name; a pruned struct -> "col.subfield..." leaves + val writePaths = updateColumnsSorted.flatMap { + attr => + prunedByExprId.get(attr.exprId) match { + case Some((paths, _)) => paths.map(p => (attr.name +: p).mkString(".")) + case None => Seq(attr.name) + } + } + val writeType = table.rowType().projectByPaths(writePaths.asJava) + val writer = DataEvolutionPaimonWriter(table, dataSplits) writer.writePartialFields( toWrite, - updateColumnsSorted.map(_.name), + writeType, rawBlobUpdateColumns.map(attr => attr.name -> rawBlobMarkerNamesByColumn(attr.name)).toMap) } @@ -856,6 +929,142 @@ object MergeIntoPaimonDataEvolutionTable { col("`" + name.replace("`", "``") + "`") } + /** Number of leaf fields (recursing into structs) of a data type. */ + private def leafCount(dt: DataType): Int = dt match { + case st: StructType => st.fields.map(f => leafCount(f.dataType)).sum + case _ => 1 + } + + /** + * For an aligned UPDATE value of a struct column, return the changed leaf paths (relative to the + * column) paired with their new value expression, or `None` if the value is not a recognizable + * named-struct rebuild (in which case the whole column must be written). + */ + private[commands] def changedLeaves( + value: Expression, + structType: StructType, + base: Expression): Option[Seq[(Seq[String], Expression)]] = { + val out = mutable.LinkedHashMap.empty[Seq[String], Expression] + if (collectChanges(value, structType, base, Nil, out)) Some(out.toSeq) else None + } + + private def collectChanges( + value: Expression, + structType: StructType, + base: Expression, + prefix: Seq[String], + out: mutable.LinkedHashMap[Seq[String], Expression]): Boolean = { + value match { + case cns: CreateNamedStruct => + val pairs = cns.children.grouped(2).toSeq + if (pairs.exists(_.size != 2)) { + return false + } + var ok = true + pairs.foreach { + pair => + val nameExpr = pair.head + val v = pair(1) + nameExpr match { + case Literal(nameVal, _) if nameVal != null => + val name = nameVal.toString + val ordinalOpt = { + val i = structType.fieldNames.indexOf(name) + if (i >= 0) Some(i) else None + } + ordinalOpt match { + case Some(ordinal) => + val fieldType = structType(ordinal).dataType + val passthrough = GetStructField(base, ordinal, Some(name)) + if (!v.semanticEquals(passthrough)) { + (fieldType, v) match { + case (st: StructType, inner: CreateNamedStruct) => + ok = collectChanges( + inner, + st, + GetStructField(base, ordinal, Some(name)), + prefix :+ name, + out) && ok + case _ => + out.put(prefix :+ name, v) + } + } + case None => ok = false + } + case _ => ok = false + } + } + ok + case _ => false + } + } + + /** Build a Spark StructType containing only the given (possibly nested) leaf paths. */ + private[commands] def prunedStructType( + structType: StructType, + paths: Seq[Seq[String]]): StructType = { + val byHead = paths.filter(_.nonEmpty).groupBy(_.head) + val fields = structType.fields.filter(f => byHead.contains(f.name)).map { + f => + val sub = byHead(f.name) + if (sub.exists(_.size == 1)) { + f + } else { + f.copy(dataType = + prunedStructType(f.dataType.asInstanceOf[StructType], sub.map(_.tail))) + } + } + StructType(fields) + } + + /** Build a named_struct over the given leaf paths; each terminal leaf value comes from valueFn. */ + private[commands] def buildPrunedStruct( + structType: StructType, + prefix: Seq[String], + paths: Seq[Seq[String]], + valueFn: Seq[String] => Expression): CreateNamedStruct = { + val byHead = paths.filter(_.nonEmpty).groupBy(_.head) + val args = structType.fields.flatMap { + f => + byHead.get(f.name) match { + case None => Seq.empty[Expression] + case Some(sub) => + val fieldPath = prefix :+ f.name + val expr = + if (sub.exists(_.size == 1)) { + valueFn(fieldPath) + } else { + buildPrunedStruct( + f.dataType.asInstanceOf[StructType], + fieldPath, + sub.map(_.tail), + valueFn) + } + Seq(Literal(f.name): Expression, expr) + } + } + CreateNamedStruct(args.toSeq) + } + + /** Read a (possibly nested) leaf from a base expression via GetStructField chain. */ + private[commands] def passthroughExpr( + base: Expression, + structType: StructType, + path: Seq[String]): Expression = { + if (path.isEmpty) { + base + } else { + val head = path.head + val ordinal = structType.fieldIndex(head) + val child = GetStructField(base, ordinal, Some(head)) + if (path.tail.isEmpty) { + child + } else { + passthroughExpr(child, structType(ordinal).dataType.asInstanceOf[StructType], path.tail) + } + } + } + private def sameAttributeReference(left: Expression, right: Expression): Boolean = { (left, right) match { case (leftAttr: AttributeReference, rightAttr: AttributeReference) => diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala new file mode 100644 index 000000000000..d22483cd4999 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.table.source.DataSplit + +import org.apache.spark.sql.Row + +import scala.collection.JavaConverters._ + +/** + * End-to-end tests for sub-field-level data evolution via Spark `MERGE INTO`: updating a single + * sub-field of a nested struct column should write an incremental file containing only that + * sub-field (a dotted write column like `nest.a`), aligned by row-id, while the rest of the struct + * is read back from the original file. + */ +class NestedSubfieldMergeIntoTest extends PaimonSparkTestBase { + + import testImplicits._ + + private def latestDeltaWriteCols(tableName: String): Seq[Seq[String]] = { + val t = loadTable(tableName) + val splits = t.newSnapshotReader().read().splits().asScala + splits + .flatMap(_.asInstanceOf[DataSplit].dataFiles().asScala) + .map(f => Option(f.writeCols()).map(_.asScala.toSeq).getOrElse(Seq.empty)) + .toSeq + } + + test("Sub-field data evolution: MERGE INTO updating one struct sub-field writes only that leaf") { + withTable("s", "t") { + sql(s""" + |CREATE TABLE t (id INT, nest STRUCT) TBLPROPERTIES ( + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + |""".stripMargin) + sql( + "INSERT INTO t VALUES (1, named_struct('a', 10, 'b', 'x')), " + + "(2, named_struct('a', 20, 'b', 'y'))") + + Seq((1, 100)).toDF("id", "newa").createOrReplaceTempView("s") + + sql(s""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.nest.a = s.newa + |""".stripMargin).collect() + + // correctness: nest.a updated for id=1, nest.b preserved, other row untouched + checkAnswer( + sql("SELECT id, nest.a, nest.b FROM t ORDER BY id"), + Seq(Row(1, 100, "x"), Row(2, 20, "y"))) + + // feature engaged: the incremental file written by the merge only contains nest.a + val deltaCols = latestDeltaWriteCols("t") + assert( + deltaCols.exists(cols => cols == Seq("nest.a")), + s"expected an incremental file with writeCols == [nest.a], got: $deltaCols") + } + } + + test("Sub-field data evolution: updating whole struct still writes the whole column") { + withTable("s", "t") { + sql(s""" + |CREATE TABLE t (id INT, nest STRUCT) TBLPROPERTIES ( + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, named_struct('a', 10, 'b', 'x'))") + + Seq((1, 100, "z")).toDF("id", "newa", "newb").createOrReplaceTempView("s") + + sql(s""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.nest = named_struct('a', s.newa, 'b', s.newb) + |""".stripMargin).collect() + + checkAnswer(sql("SELECT id, nest.a, nest.b FROM t"), Seq(Row(1, 100, "z"))) + } + } +} From 276247cd4382a78cd00b65542fddd8cb74890a4b Mon Sep 17 00:00:00 2001 From: Xiangyi Zhu <82511136+zhuxiangyi@users.noreply.github.com> Date: Tue, 23 Jun 2026 15:46:32 +0800 Subject: [PATCH 3/9] [core][spark] Add nested-field flag and fix sub-field data-evolution review findings Add the data-evolution.nested-field.enabled table option (default off) to gate sub-field-level data evolution, and fix issues found in review: - RowType.leafPaths: skip field ids absent from the reference type (e.g. the _ROW_ID / _SEQUENCE_NUMBER special fields), so withWriteType no longer throws for row-tracking tables during append compaction. - RowType.projectByPaths: prefer an exact field-name match before splitting on '.', preserving columns whose names contain a dot; reject ambiguous nested paths when a field name contains '.'. - DataEvolutionSplitRead: only read a struct whole from a single file when that file covers all its leaves, otherwise compose from the provided sub-fields; restore the not-null check at sub-field level; reject deeper-than-one-level partial sub-structs explicitly. - DataEvolutionRow: give a composed struct a defined RowKind when every source partial is null. - BaseAppendFileStoreWrite.compactRewrite: encode writeCols via leafPaths to match the main write path. - MergeIntoPaimonDataEvolutionTable: gate sub-field pruning on the new option and only prune one level deep (the depth the reader can compose). - DataEvolutionFileStoreScan: document the intentional stats skip for partially-written nested struct files. - Regenerate core_configuration.html for the new option. --- docs/generated/core_configuration.html | 6 ++ .../java/org/apache/paimon/CoreOptions.java | 16 ++++ .../java/org/apache/paimon/types/RowType.java | 40 ++++++++-- .../paimon/reader/DataEvolutionRow.java | 6 ++ .../operation/BaseAppendFileStoreWrite.java | 4 +- .../operation/DataEvolutionFileStoreScan.java | 9 +++ .../operation/DataEvolutionSplitRead.java | 32 +++++++- .../commands/DataEvolutionPaimonWriter.scala | 3 +- .../MergeIntoPaimonDataEvolutionTable.scala | 74 ++++++++++--------- .../sql/NestedSubfieldMergeIntoTest.scala | 37 +++++++++- 10 files changed, 181 insertions(+), 46 deletions(-) diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index c17b6a500787..4c94a297322c 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -482,6 +482,12 @@ Boolean Whether to persist source when process merge into action on data evolution table. + +

data-evolution.nested-field.enabled
+ false + Boolean + Whether to enable sub-field-level data evolution for nested (struct) columns. When enabled, an update that only touches some sub-fields of a nested column writes an incremental file containing just those sub-fields (aligned by row id); when disabled, the whole top-level column is rewritten. Requires data-evolution.enabled=true. +
data-file.external-paths
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 81bf1fe0e0d0..49b644ac53d9 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2310,6 +2310,18 @@ public InlineElement getDescription() { .defaultValue(false) .withDescription("Whether enable data evolution for row tracking table."); + public static final ConfigOption DATA_EVOLUTION_NESTED_FIELD_ENABLED = + key("data-evolution.nested-field.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable sub-field-level data evolution for nested (struct) " + + "columns. When enabled, an update that only touches some " + + "sub-fields of a nested column writes an incremental file " + + "containing just those sub-fields (aligned by row id); when " + + "disabled, the whole top-level column is rewritten. Requires " + + "data-evolution.enabled=true."); + public static final ConfigOption DATA_EVOLUTION_MERGE_INTO_FILE_PRUNING = key("data-evolution.merge-into.file-pruning") .booleanType() @@ -3860,6 +3872,10 @@ public boolean dataEvolutionEnabled() { return options.get(DATA_EVOLUTION_ENABLED); } + public boolean dataEvolutionNestedFieldEnabled() { + return options.get(DATA_EVOLUTION_NESTED_FIELD_ENABLED); + } + public boolean dataEvolutionMergeIntoFilePruning() { return options.get(DATA_EVOLUTION_MERGE_INTO_FILE_PRUNING); } diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java index edf705602fa8..8b94ccb18322 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java @@ -329,9 +329,18 @@ private static RowType projectTypeByPaths(RowType type, List paths) { // a tail) is selected as a whole field Map> childToSubPaths = new HashMap<>(); Set wholeChildren = new HashSet<>(); + Set fieldNames = new HashSet<>(); + for (DataField field : type.getFields()) { + fieldNames.add(field.name()); + } for (String path : paths) { int dot = path.indexOf('.'); - if (dot < 0) { + // Prefer an exact field-name match so a column whose name itself contains a dot (and + // any + // plain top-level name) is selected whole; only split into head.tail for genuine nested + // sub-field paths that do not name a field directly. This keeps backward compatibility + // with the legacy exact-name project(List). + if (dot < 0 || fieldNames.contains(path)) { childToSubPaths.computeIfAbsent(path, k -> new ArrayList<>()); wholeChildren.add(path); } else { @@ -385,11 +394,32 @@ public List leafPaths(RowType fullType) { private static void collectLeafPaths( List writeFields, RowType fullType, String prefix, List out) { for (DataField writeField : writeFields) { - DataField fullField = fullType.getField(writeField.id()); String path = prefix.isEmpty() ? writeField.name() : prefix + "." + writeField.name(); - if (writeField.type() instanceof RowType - && fullField.type() instanceof RowType - && !coversFully((RowType) writeField.type(), (RowType) fullField.type())) { + // A field absent from the reference type (e.g. the _ROW_ID / _SEQUENCE_NUMBER special + // fields added by row tracking, which are not part of the table's logical row type) has + // no sub-field split: emit it whole by name, matching the legacy getFieldNames() + // output. + if (!fullType.containsField(writeField.id())) { + out.add(path); + continue; + } + DataField fullField = fullType.getField(writeField.id()); + boolean willExpand = + writeField.type() instanceof RowType + && fullField.type() instanceof RowType + && !coversFully( + (RowType) writeField.type(), (RowType) fullField.type()); + // A dotted path is only unambiguous if no name segment contains a literal '.'. A name + // with a dot is fine when emitted whole at top level (projectByPaths matches it + // exactly), + // but not when it participates in a multi-segment nested path. + if (writeField.name().indexOf('.') >= 0 && (!prefix.isEmpty() || willExpand)) { + throw new UnsupportedOperationException( + "Sub-field-level data evolution does not support a nested field whose name " + + "contains '.': " + + path); + } + if (willExpand) { collectLeafPaths( ((RowType) writeField.type()).getFields(), (RowType) fullField.type(), diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java index cd02972b569e..da36b9acd0bf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java @@ -78,6 +78,12 @@ private void setRowsAllowNull(InternalRow[] newRows) { this.rowKind = newRows[i].getRowKind(); } } + if (rowKind == null) { + // a composed struct whose every source partial is null still needs a defined kind so + // getRowKind() never returns null; the kind of an assembled struct value is not + // meaningful, so default to INSERT + this.rowKind = RowKind.INSERT; + } } public void setRows(InternalRow[] rows) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java index 63ae5de32a79..ecc3b5e28f95 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java @@ -278,7 +278,9 @@ private RowDataRollingFileWriter createRollingFileWriter( FileSource.COMPACT, options.asyncFileWrite(), options.statsDenseStore(), - rowType.equals(writeType) ? null : writeType.getFieldNames()); + // use the same dotted-leaf-path encoding as withWriteType so a partial nested + // writeType records its real sub-field content consistently across write paths + rowType.equals(writeType) ? null : writeType.leafPaths(rowType)); } private RecordReaderIterator createFilesIterator( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java index b5ea80b0606e..a16c19cda5ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java @@ -333,6 +333,15 @@ static EvolutionStats evolutionStats( for (int k = 0; k < fieldIdsWithStats.length; k++) { if (fieldId == fieldIdsWithStats[k]) { DataType fileType = dataFileSchemaWithStats.fields().get(k).type(); + // A sub-field-level data evolution file may store only part of a + // nested struct (e.g. nest
of nest); its file type then + // does + // not equal the full target struct. We intentionally skip stats in + // that case (leaving the field as "no stats" so no file is wrongly + // pruned) rather than composing partial-struct stats across files; + // struct columns rarely carry useful min/max and data evolution + // does + // not push predicates down, so the lost benefit is negligible. if (!fileType.equalsIgnoreFieldId(targetType)) { continue loop1; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index ca312a370cef..dc02679cb40d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -305,7 +305,13 @@ private DataEvolutionFileReader createUnionReader( // no file provides this field; it stays null (nullability checked below) continue; } - if (providers.size() == 1) { + // Only read a field whole from a single file when that file covers ALL of its leaves. + // If a single file provides only some leaves of a struct (the rest absent everywhere), + // we must prune to the provided sub-fields so the reader is not asked for sub-fields + // the + // file does not physically contain; the missing ones stay null via the composite plan. + boolean allLeavesCovered = leafProvider.size() == leaves.size(); + if (providers.size() == 1 && allLeavesCovered) { int b = providers.iterator().next(); bunchSelection.get(b).put(rf.id(), null); wholeBunch[j] = b; @@ -316,11 +322,14 @@ private DataEvolutionFileReader createUnionReader( rf.name()); composite[j] = true; for (DataField sub : ((RowType) rf.type()).getFields()) { + List subLeaves = leafIdsOf(sub); Set subProviders = new HashSet<>(); - for (int leaf : leafIdsOf(sub)) { + int coveredSubLeaves = 0; + for (int leaf : subLeaves) { int p = leafProvider.getOrDefault(leaf, -1); if (p >= 0) { subProviders.add(p); + coveredSubLeaves++; } } if (subProviders.size() > 1) { @@ -333,6 +342,19 @@ private DataEvolutionFileReader createUnionReader( + ") across multiple files."); } if (subProviders.size() == 1) { + if (sub.type() instanceof RowType && coveredSubLeaves < subLeaves.size()) { + // the single provider holds only part of this nested sub-struct; + // reading + // it whole would request leaves it lacks, and one-level composition + // cannot prune deeper than this level yet + throw new UnsupportedOperationException( + "Sub-field-level data evolution does not yet support reading a " + + "partially-written nested sub-field (" + + rf.name() + + "." + + sub.name() + + ") deeper than one level."); + } int b = subProviders.iterator().next(); bunchSelection .get(b) @@ -397,6 +419,12 @@ private DataEvolutionFileReader createUnionReader( int subId = subFields.get(s).id(); int b = findSubProvider(rf.id(), subId, bunchSubOffset); if (b < 0) { + // no file provides this sub-field; it stays null, so it must be nullable + checkArgument( + subFields.get(s).type().isNullable(), + "Sub-field %s.%s is not null but can't find any file contains it.", + rf.name(), + subFields.get(s).name()); continue; } Integer pIdx = bunchToPartial.get(b); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala index 3b80bd96bdb1..84054b554db8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -64,7 +64,8 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se rawBlobPlaceholderMarkerColumns: Map[String, String]): Seq[CommitMessage] = { val sparkSession = data.sparkSession import sparkSession.implicits._ - assert(data.columns.length == writeType.getFieldCount + 2 + rawBlobPlaceholderMarkerColumns.size) + assert( + data.columns.length == writeType.getFieldCount + 2 + rawBlobPlaceholderMarkerColumns.size) val options = new CoreOptions(table.schema().options()) val blobInlineFields = options.blobInlineField().asScala.toSeq diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 1d5d3888e9a1..2124e81afb33 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.paimon.shims.SparkShimLoader -import org.apache.spark.sql.types.{BooleanType, DataType, StructType} +import org.apache.spark.sql.types.{BooleanType, StructType} import scala.collection.{immutable, mutable} import scala.collection.JavaConverters._ @@ -390,37 +390,48 @@ case class MergeIntoPaimonDataEvolutionTable( // the rest are copied from the target. Falls back to whole-column write when the changed leaves // cannot be safely determined, so behaviour never regresses. val matchedUpdateActions = matchedActions.collect { case ua: UpdateAction => ua } + // Gated by data-evolution.nested-field.enabled (default off): when disabled, no column is + // pruned, so every struct column is rewritten whole (behaviour identical to before this + // feature). When enabled, struct columns whose SET only touches some sub-fields are pruned. + val nestedFieldEnabled = table.coreOptions().dataEvolutionNestedFieldEnabled() val prunedByExprId: Map[ExprId, (Seq[Seq[String]], StructType)] = - updateColumnsSorted.flatMap { - attr => - if (rawBlobUpdateColumns.exists(_.sameRef(attr))) { - None - } else { - attr.dataType match { - case st: StructType => - val perAction = matchedUpdateActions.flatMap { - ua => - ua.assignments - .find( - a => - isModifiedAssignment(a) && assignmentKeyAttribute(a).sameRef(attr)) - .map(a => changedLeaves(a.value, st, attr)) - } - if (perAction.isEmpty || perAction.exists(_.isEmpty)) { - None - } else { - val union = perAction.flatten.flatten.map(_._1).distinct - // prune only when it is a strict subset of all leaves of the struct - if (union.isEmpty || union.map(_.size).sum >= leafCount(st)) { + if (!nestedFieldEnabled) Map.empty + else + updateColumnsSorted.flatMap { + attr => + if (rawBlobUpdateColumns.exists(_.sameRef(attr))) { + None + } else { + attr.dataType match { + case st: StructType => + val perAction = matchedUpdateActions.flatMap { + ua => + ua.assignments + .find( + a => isModifiedAssignment(a) && assignmentKeyAttribute(a).sameRef(attr)) + .map(a => changedLeaves(a.value, st, attr)) + } + if (perAction.isEmpty || perAction.exists(_.isEmpty)) { None } else { - Some(attr.exprId -> (union, prunedStructType(st, union))) + val union = perAction.flatten.flatten.map(_._1).distinct + // The reader composes a split struct only one level deep, so only prune when + // every change addresses a direct sub-field of the top-level struct (depth 1). + // A deeper change (e.g. nest.inner.x) would split an inner struct across files, + // which the read path does not support, so fall back to a whole-column write. + // Prune only when the changed direct sub-fields are a strict subset of all of + // them (otherwise the whole column is rewritten anyway). + val allDepthOne = union.forall(_.size == 1) + if (union.isEmpty || !allDepthOne || union.size >= st.fields.length) { + None + } else { + Some(attr.exprId -> (union, prunedStructType(st, union))) + } } - } - case _ => None + case _ => None + } } - } - }.toMap + }.toMap val mergeOutput = updateColumnsSorted.map { attr => @@ -929,12 +940,6 @@ object MergeIntoPaimonDataEvolutionTable { col("`" + name.replace("`", "``") + "`") } - /** Number of leaf fields (recursing into structs) of a data type. */ - private def leafCount(dt: DataType): Int = dt match { - case st: StructType => st.fields.map(f => leafCount(f.dataType)).sum - case _ => 1 - } - /** * For an aligned UPDATE value of a struct column, return the changed leaf paths (relative to the * column) paired with their new value expression, or `None` if the value is not a recognizable @@ -1010,8 +1015,7 @@ object MergeIntoPaimonDataEvolutionTable { if (sub.exists(_.size == 1)) { f } else { - f.copy(dataType = - prunedStructType(f.dataType.asInstanceOf[StructType], sub.map(_.tail))) + f.copy(dataType = prunedStructType(f.dataType.asInstanceOf[StructType], sub.map(_.tail))) } } StructType(fields) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala index d22483cd4999..48418bb49a2f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala @@ -49,7 +49,8 @@ class NestedSubfieldMergeIntoTest extends PaimonSparkTestBase { sql(s""" |CREATE TABLE t (id INT, nest STRUCT) TBLPROPERTIES ( | 'row-tracking.enabled' = 'true', - | 'data-evolution.enabled' = 'true') + | 'data-evolution.enabled' = 'true', + | 'data-evolution.nested-field.enabled' = 'true') |""".stripMargin) sql( "INSERT INTO t VALUES (1, named_struct('a', 10, 'b', 'x')), " + @@ -82,7 +83,8 @@ class NestedSubfieldMergeIntoTest extends PaimonSparkTestBase { sql(s""" |CREATE TABLE t (id INT, nest STRUCT) TBLPROPERTIES ( | 'row-tracking.enabled' = 'true', - | 'data-evolution.enabled' = 'true') + | 'data-evolution.enabled' = 'true', + | 'data-evolution.nested-field.enabled' = 'true') |""".stripMargin) sql("INSERT INTO t VALUES (1, named_struct('a', 10, 'b', 'x'))") @@ -98,4 +100,35 @@ class NestedSubfieldMergeIntoTest extends PaimonSparkTestBase { checkAnswer(sql("SELECT id, nest.a, nest.b FROM t"), Seq(Row(1, 100, "z"))) } } + + test( + "Sub-field data evolution: disabled by default, sub-field update rewrites the whole column") { + withTable("s", "t") { + // data-evolution.nested-field.enabled is left at its default (false) + sql(s""" + |CREATE TABLE t (id INT, nest STRUCT) TBLPROPERTIES ( + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, named_struct('a', 10, 'b', 'x'))") + + Seq((1, 100)).toDF("id", "newa").createOrReplaceTempView("s") + + sql(s""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.nest.a = s.newa + |""".stripMargin).collect() + + // correctness still holds: nest.a updated, nest.b preserved + checkAnswer(sql("SELECT id, nest.a, nest.b FROM t"), Seq(Row(1, 100, "x"))) + + // but no sub-field incremental file is produced: the whole nest column is rewritten + val deltaCols = latestDeltaWriteCols("t") + assert( + !deltaCols.exists(cols => cols.contains("nest.a")), + s"expected no dotted (sub-field) writeCols when feature is disabled, got: $deltaCols") + } + } } From d7c857f0e4a973baf6f5237d72682f1c22536036 Mon Sep 17 00:00:00 2001 From: Xiangyi Zhu <82511136+zhuxiangyi@users.noreply.github.com> Date: Tue, 23 Jun 2026 18:54:21 +0800 Subject: [PATCH 4/9] [flink] Support sub-field-level data evolution via MERGE INTO action Extend DataEvolutionMergeIntoAction so a SET that targets a nested sub-field (e.g. T.nest.a = S.x) writes an incremental file containing only that leaf (dotted write column nest.a) aligned by row id, instead of rewriting the whole top-level column. Reuses the existing top-level data-evolution pipeline; only the column granularity is generalized to dotted paths. - DataEvolutionMergeIntoAction.buildSource: parse dotted SET targets (stripping the table qualifier), group by top-level column, and rebuild a partially-updated struct as CAST(ROW(...) AS ROW<...>) with sub-fields in schema order; derive dotted writePaths and a pruned sourceType. Gate on data-evolution.nested-field.enabled and reject deeper-than-one-level paths. checkSchema now accepts a partial (subset) struct. - DataEvolutionPartialWriteOperator: take writePaths and use projectByPaths for the write type; use the pruned source type directly. - Add NestedSubfieldMergeIntoActionITCase (single/multiple sub-fields, whole-struct, disabled-throws, deeper-nesting-throws). --- .../action/DataEvolutionMergeIntoAction.java | 233 +++++++++++-- .../DataEvolutionPartialWriteOperator.java | 19 +- .../NestedSubfieldMergeIntoActionITCase.java | 330 ++++++++++++++++++ 3 files changed, 542 insertions(+), 40 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java index 40c89113803c..2e7fd58adce7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java @@ -66,8 +66,10 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -129,6 +131,10 @@ public class DataEvolutionMergeIntoAction extends TableActionBase { // the snapshot id this action based on private long baseSnapshotId; + // columns written by this merge, as (possibly nested) dotted paths, e.g. ["id", "nest.a"]; + // derived in buildSource() and consumed by writePartialColumns(). + private List writePaths; + public DataEvolutionMergeIntoAction( String databaseName, String tableName, Map catalogConfig) { super(databaseName, tableName, catalogConfig); @@ -250,32 +256,15 @@ public Tuple2, RowType> buildSource() { handleSqls(); // assign row id for each source row + boolean updateAll = matchedUpdateSet.equals("*"); List project; - if (matchedUpdateSet.equals("*")) { + if (updateAll) { // if sourceName is qualified like 'default.S', we should build a project like S.* project = Collections.singletonList(sourceTableName() + ".*"); } else { - // validate upsert changes - Map changes = parseCommaSeparatedKeyValues(matchedUpdateSet); - for (String targetField : changes.keySet()) { - if (!targetFieldNames.contains(extractFieldName(targetField))) { - throw new RuntimeException( - String.format( - "Invalid column reference '%s' of table '%s' at matched-upsert action.", - targetField, identifier.getFullName())); - } - } - - // rename source table's selected columns according to SET statement - project = - changes.entrySet().stream() - .map( - entry -> - String.format( - "%s AS `%s`", - entry.getValue(), - extractFieldName(entry.getKey()))) - .collect(Collectors.toList()); + // validate upsert changes and build the projection (top-level columns and, for + // sub-field-level data evolution, partial nested structs via dotted paths) + project = buildExplicitProject(); } String query; @@ -316,13 +305,169 @@ public Tuple2, RowType> buildSource() { Table source = batchTEnv.sqlQuery(query); checkSchema(source); - RowType sourceType = - SpecialFields.rowTypeWithRowId(table.rowType()) - .project(source.getResolvedSchema().getColumnNames()); + + RowType sourceType; + if (updateAll) { + List columnNames = source.getResolvedSchema().getColumnNames(); + sourceType = SpecialFields.rowTypeWithRowId(table.rowType()).project(columnNames); + writePaths = + columnNames.stream() + .filter(name -> !SpecialFields.ROW_ID.name().equals(name)) + .collect(Collectors.toList()); + } else { + // build the source type manually so _ROW_ID is first and the column order matches the + // SQL projection order; for nested columns the field is the partial (pruned) struct. + RowType pruned = table.rowType().projectByPaths(writePaths); + List srcFields = new ArrayList<>(); + srcFields.add(SpecialFields.ROW_ID); + for (String topCol : explicitTopColumnOrder(writePaths)) { + srcFields.add(pruned.getField(table.rowType().getField(topCol).id())); + } + sourceType = new RowType(srcFields); + } return Tuple2.of(toDataStream(source), sourceType); } + /** + * Validate the SET targets and build the SQL projection list. A target may address a top-level + * column ({@code col} / {@code T.col}) or, for sub-field-level data evolution, a nested + * sub-field ({@code nest.a} / {@code T.nest.a}). A partially-updated struct column is rebuilt + * as a partial {@code CAST(ROW(...) AS ROW<...>)} so only the touched sub-fields are written. + * Also sets {@link #writePaths}. + */ + private List buildExplicitProject() { + Map changes = parseCommaSeparatedKeyValues(matchedUpdateSet); + + // group by top-level column, preserving first-seen order + Map wholeCols = new LinkedHashMap<>(); + Map> nestedCols = new LinkedHashMap<>(); + List order = new ArrayList<>(); + + for (Map.Entry entry : changes.entrySet()) { + List path = parseTargetPath(entry.getKey()); + String topCol = path.get(0); + if (!targetFieldNames.contains(topCol)) { + throw new RuntimeException( + String.format( + "Invalid column reference '%s' of table '%s' at matched-upsert action.", + entry.getKey(), identifier.getFullName())); + } + if (!order.contains(topCol)) { + order.add(topCol); + } + if (path.size() == 1) { + // whole top-level column + if (nestedCols.containsKey(topCol) || wholeCols.containsKey(topCol)) { + throw new RuntimeException( + "Conflicting updates for column '" + topCol + "' in SET clause."); + } + wholeCols.put(topCol, entry.getValue()); + } else { + // nested sub-field update + if (!coreOptions.dataEvolutionNestedFieldEnabled()) { + throw new UnsupportedOperationException( + "Updating a nested sub-field ('" + + entry.getKey() + + "') requires '" + + CoreOptions.DATA_EVOLUTION_NESTED_FIELD_ENABLED.key() + + "=true'."); + } + if (path.size() > 2) { + throw new UnsupportedOperationException( + "Sub-field-level data evolution only supports one level of nesting, " + + "but got '" + + entry.getKey() + + "'."); + } + if (wholeCols.containsKey(topCol)) { + throw new RuntimeException( + "Conflicting updates for column '" + topCol + "' in SET clause."); + } + String subName = path.get(1); + LinkedHashMap subs = + nestedCols.computeIfAbsent(topCol, k -> new LinkedHashMap<>()); + if (subs.containsKey(subName)) { + throw new RuntimeException( + "Duplicated update for sub-field '" + + topCol + + "." + + subName + + "' in SET clause."); + } + subs.put(subName, entry.getValue()); + } + } + + // first pass: writePaths (so projectByPaths can build the pruned nested types) + writePaths = new ArrayList<>(); + for (String topCol : order) { + if (wholeCols.containsKey(topCol)) { + writePaths.add(topCol); + } else { + for (String subName : nestedCols.get(topCol).keySet()) { + writePaths.add(topCol + "." + subName); + } + } + } + + // second pass: build projection expressions + RowType pruned = table.rowType().projectByPaths(writePaths); + List project = new ArrayList<>(); + for (String topCol : order) { + if (wholeCols.containsKey(topCol)) { + project.add(String.format("%s AS `%s`", wholeCols.get(topCol), topCol)); + } else { + LinkedHashMap subs = nestedCols.get(topCol); + DataType prunedColType = + pruned.getField(table.rowType().getField(topCol).id()).type(); + // value order must match the pruned struct's schema field order + List values = new ArrayList<>(); + for (DataField subField : ((RowType) prunedColType).getFields()) { + String value = subs.get(subField.name()); + Preconditions.checkState( + value != null, + "Missing value for sub-field '%s.%s', it's a bug.", + topCol, + subField.name()); + values.add(value); + } + String typeStr = + LogicalTypeConversion.toLogicalType(prunedColType).asSerializableString(); + project.add( + String.format( + "CAST(ROW(%s) AS %s) AS `%s`", + String.join(", ", values), typeStr, topCol)); + } + } + return project; + } + + /** The first-seen order of top-level columns present in the (dotted) write paths. */ + private List explicitTopColumnOrder(List paths) { + List order = new ArrayList<>(); + for (String path : paths) { + int dot = path.indexOf('.'); + String topCol = dot < 0 ? path : path.substring(0, dot); + if (!order.contains(topCol)) { + order.add(topCol); + } + } + return order; + } + + /** + * Parse a SET target into a path relative to the target table: strip an optional leading + * table-qualifier segment (the target table name/alias), leaving {@code [topColumn, sub...]}. + */ + private List parseTargetPath(String target) { + List segs = new ArrayList<>(Arrays.asList(target.split("\\."))); + if (segs.size() > 1 && segs.get(0).equals(targetTableName())) { + segs.remove(0); + } + return segs; + } + public DataStream> shuffleByFirstRowId( DataStream source, RowType sourceType) { Transformation sourceTransformation = source.getTransformation(); @@ -389,7 +534,7 @@ public DataStream writePartialColumns( "PARTIAL WRITE COLUMNS", new CommittableTypeInfo(), new DataEvolutionPartialWriteOperator( - (FileStoreTable) table, rowType, baseSnapshotId)) + (FileStoreTable) table, rowType, writePaths, baseSnapshotId)) .setParallelism(sinkParallelism); } @@ -526,7 +671,17 @@ private void checkSchema(Table source) { .getTypeRoot() .getFamilies() .contains(DataTypeFamily.BINARY_STRING); + // For sub-field-level data evolution, a struct column's source value is a partial + // (subset) ROW carrying only the updated sub-fields, which is not directly + // cast-compatible with the full target struct. Accept it when every source + // sub-field exists in the target struct with a compatible cast. + boolean partialStructCompatible = + paimonType instanceof RowType + && targetField.type() instanceof RowType + && isCompatiblePartialStruct( + (RowType) paimonType, (RowType) targetField.type()); if (!blobCompatible + && !partialStructCompatible && !DataTypeCasts.supportsCompatibleCast(paimonType, targetField.type())) { throw new IllegalStateException( String.format( @@ -540,6 +695,27 @@ private void checkSchema(Table source) { } } + /** + * Whether {@code part} is a valid partial (subset) of the full struct {@code full}: every + * sub-field of {@code part} must exist in {@code full} (by name) with a compatible cast. + */ + private boolean isCompatiblePartialStruct(RowType part, RowType full) { + for (DataField partField : part.getFields()) { + if (!full.containsField(partField.name())) { + return false; + } + DataType fullSubType = full.getField(partField.name()).type(); + if (partField.type() instanceof RowType && fullSubType instanceof RowType) { + if (!isCompatiblePartialStruct((RowType) partField.type(), (RowType) fullSubType)) { + return false; + } + } else if (!DataTypeCasts.supportsCompatibleCast(partField.type(), fullSubType)) { + return false; + } + } + return true; + } + private void handleSqls() { // NOTE: sql may change current catalog and database if (sourceSqls != null) { @@ -570,11 +746,6 @@ private String escapedSourceName() { .collect(Collectors.joining(".")); } - private String extractFieldName(String sourceField) { - String[] fieldPath = sourceField.split("\\."); - return fieldPath[fieldPath.length - 1]; - } - private String escapedRowTrackingTargetName() { return String.format( "`%s`.`%s`.`%s$row_tracking`", diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java index f57393b7ca27..5561c9581b7a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java @@ -54,7 +54,6 @@ import java.util.List; import java.util.Map; import java.util.TreeSet; -import java.util.stream.Collectors; import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.types.VectorType.isVectorStoreFile; @@ -93,17 +92,19 @@ public class DataEvolutionPartialWriteOperator private transient Writer writer; public DataEvolutionPartialWriteOperator( - FileStoreTable table, RowType dataType, Long baseSnapshotId) { + FileStoreTable table, + RowType sourceType, + List writePaths, + Long baseSnapshotId) { this.table = table.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G")); this.baseSnapshotId = baseSnapshotId; - List fieldNames = - dataType.getFieldNames().stream() - .filter(name -> !SpecialFields.ROW_ID.name().equals(name)) - .collect(Collectors.toList()); - this.writeType = table.rowType().project(fieldNames); - this.dataType = - SpecialFields.rowTypeWithRowId(table.rowType()).project(dataType.getFieldNames()); + // writePaths may carry nested dotted paths (e.g. "nest.a") for sub-field-level data + // evolution; projectByPaths handles both plain top-level names and nested paths. + this.writeType = table.rowType().projectByPaths(writePaths); + // sourceType is already pruned to the written columns (with partial nested structs) and + // carries the table's field ids, so it is used directly as the read/data type. + this.dataType = sourceType; this.rowIdIndex = this.dataType.getFieldIndex(SpecialFields.ROW_ID.name()); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java new file mode 100644 index 000000000000..966a96ea8306 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.table.FileStoreTable; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; +import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED; +import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_NESTED_FIELD_ENABLED; +import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildDdl; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.sEnv; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * ITCase for sub-field-level data evolution via {@link DataEvolutionMergeIntoAction}: updating a + * single sub-field of a nested struct column should write an incremental file containing only that + * sub-field (a dotted write column like {@code nest.a}) aligned by row id, while the rest of the + * struct is read back from the original file. + */ +public class NestedSubfieldMergeIntoActionITCase extends ActionITCaseBase { + + @Override + public void before() throws IOException { + super.before(); + init(warehouse); + } + + private void prepareNestedTarget(boolean nestedFieldEnabled) throws Exception { + sEnv.executeSql( + buildDdl( + "T", + Arrays.asList("id INT", "nest ROW"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + if (nestedFieldEnabled) { + put(DATA_EVOLUTION_NESTED_FIELD_ENABLED.key(), "true"); + } + } + })); + insertInto( + "T", + "(1, CAST(ROW(10, 'x') AS ROW))", + "(2, CAST(ROW(20, 'y') AS ROW))"); + } + + private void prepareSubFieldSource() throws Exception { + sEnv.executeSql( + buildDdl( + "S", + Arrays.asList("id INT", "newa INT"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + } + })); + insertInto("S", "(1, 100)"); + } + + @Test + public void testUpdateSingleSubFieldWritesOnlyThatLeaf() throws Exception { + prepareNestedTarget(true); + prepareSubFieldSource(); + + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest.a=S.newa") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run(); + + // correctness: nest.a updated for id=1, nest.b preserved, other row untouched + testBatchRead( + "SELECT id, nest.a, nest.b FROM T", + Arrays.asList(changelogRow("+I", 1, 100, "x"), changelogRow("+I", 2, 20, "y"))); + + // feature engaged: an incremental file written by the merge only contains nest.a + assertThat(deltaWriteCols("T")).contains(Collections.singletonList("nest.a")); + } + + @Test + public void testUpdateSubFieldDisabledThrows() throws Exception { + // data-evolution.nested-field.enabled left at its default (false) + prepareNestedTarget(false); + prepareSubFieldSource(); + + assertThatThrownBy( + () -> + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest.a=S.newa") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run()) + .hasMessageContaining(DATA_EVOLUTION_NESTED_FIELD_ENABLED.key()); + } + + @Test + public void testUpdateWholeStructStillWorks() throws Exception { + prepareNestedTarget(true); + sEnv.executeSql( + buildDdl( + "S", + Arrays.asList("id INT", "nest ROW"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + } + })); + insertInto("S", "(1, CAST(ROW(100, 'z') AS ROW))"); + + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest=S.nest") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run(); + + testBatchRead( + "SELECT id, nest.a, nest.b FROM T", + Arrays.asList(changelogRow("+I", 1, 100, "z"), changelogRow("+I", 2, 20, "y"))); + } + + @Test + public void testUpdateMultipleSubFieldsWritesOnlyThoseLeaves() throws Exception { + // a 3-field struct so that updating two sub-fields is a strict subset (stays + // sub-field-level + // rather than collapsing to a whole-column write) + sEnv.executeSql( + buildDdl( + "T", + Arrays.asList("id INT", "nest ROW"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + put(DATA_EVOLUTION_NESTED_FIELD_ENABLED.key(), "true"); + } + })); + insertInto( + "T", + "(1, CAST(ROW(10, 'x', 30) AS ROW))", + "(2, CAST(ROW(20, 'y', 40) AS ROW))"); + + sEnv.executeSql( + buildDdl( + "S", + Arrays.asList("id INT", "newa INT", "newc INT"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + } + })); + insertInto("S", "(1, 100, 300)"); + + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest.a=S.newa,T.nest.c=S.newc") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run(); + + // correctness: a and c updated for id=1, b preserved, other row untouched + testBatchRead( + "SELECT id, nest.a, nest.b, nest.c FROM T", + Arrays.asList( + changelogRow("+I", 1, 100, "x", 300), changelogRow("+I", 2, 20, "y", 40))); + + // feature engaged: the incremental file contains exactly the two touched leaves, in schema + // order (a before c) + assertThat(deltaWriteCols("T")).contains(Arrays.asList("nest.a", "nest.c")); + } + + @Test + public void testUpdateDeeplyNestedSubFieldThrows() throws Exception { + sEnv.executeSql( + buildDdl( + "T", + Arrays.asList("id INT", "nest ROW>"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + put(DATA_EVOLUTION_NESTED_FIELD_ENABLED.key(), "true"); + } + })); + insertInto("T", "(1, CAST(ROW(10, ROW(1, 2)) AS ROW>))"); + + sEnv.executeSql( + buildDdl( + "S", + Arrays.asList("id INT", "newx INT"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + } + })); + insertInto("S", "(1, 100)"); + + // deeper-than-one-level sub-field updates are rejected (the reader composes only one level) + assertThatThrownBy( + () -> + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest.inner.x=S.newx") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run()) + .hasMessageContaining("one level"); + } + + /** The write columns of every data file in the latest snapshot of {@code tableName}. */ + private List> deltaWriteCols(String tableName) throws Exception { + FileStoreTable table = getFileStoreTable(tableName); + List> result = new ArrayList<>(); + for (ManifestEntry entry : table.store().newScan().plan().files()) { + DataFileMeta file = entry.file(); + result.add(file.writeCols()); + } + return result; + } + + private DataEvolutionMergeIntoActionBuilder builder( + String warehouse, String database, String table) { + return new DataEvolutionMergeIntoActionBuilder(warehouse, database, table); + } + + private static class DataEvolutionMergeIntoActionBuilder { + private final List args; + + DataEvolutionMergeIntoActionBuilder(String warehouse, String database, String table) { + this.args = + new ArrayList<>( + Arrays.asList( + "data_evolution_merge_into", + "--warehouse", + warehouse, + "--database", + database, + "--table", + table)); + } + + DataEvolutionMergeIntoActionBuilder withSourceTable(String sourceTable) { + args.add("--source_table"); + args.add(sourceTable); + return this; + } + + DataEvolutionMergeIntoActionBuilder withMergeCondition(String mergeCondition) { + args.add("--on"); + args.add(mergeCondition); + return this; + } + + DataEvolutionMergeIntoActionBuilder withMatchedUpdateSet(String matchedUpdateSet) { + args.add("--matched_update_set"); + args.add(matchedUpdateSet); + return this; + } + + DataEvolutionMergeIntoActionBuilder withSinkParallelism(int sinkParallelism) { + args.add("--sink_parallelism"); + args.add(String.valueOf(sinkParallelism)); + return this; + } + + DataEvolutionMergeIntoAction build() { + return (DataEvolutionMergeIntoAction) + ActionFactory.createAction(args.toArray(new String[0])) + .orElseThrow(RuntimeException::new); + } + } +} From 23766fd716ebd2e76f3d21be65c95d12307de135 Mon Sep 17 00:00:00 2001 From: Xiangyi Zhu <82511136+zhuxiangyi@users.noreply.github.com> Date: Tue, 23 Jun 2026 22:04:57 +0800 Subject: [PATCH 5/9] [test] Fix CI and review findings on nested sub-field tests - spark NestedSubfieldMergeIntoTest: apply scalafmt (single-line test name) to satisfy spotless-check. - flink NestedSubfieldMergeIntoActionITCase: add @BeforeEach to the before() override so JUnit runs base setup + init(warehouse) (was NPE-ing all tests); rename the nested sub-field 'inner' to 'sub' to avoid the Flink SQL reserved word that broke DDL parsing. --- .../flink/action/NestedSubfieldMergeIntoActionITCase.java | 8 +++++--- .../paimon/spark/sql/NestedSubfieldMergeIntoTest.scala | 3 +-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java index 966a96ea8306..a8ca3ef0fae4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java @@ -22,6 +22,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.table.FileStoreTable; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -51,6 +52,7 @@ */ public class NestedSubfieldMergeIntoActionITCase extends ActionITCaseBase { + @BeforeEach @Override public void before() throws IOException { super.before(); @@ -226,7 +228,7 @@ public void testUpdateDeeplyNestedSubFieldThrows() throws Exception { sEnv.executeSql( buildDdl( "T", - Arrays.asList("id INT", "nest ROW>"), + Arrays.asList("id INT", "nest ROW>"), Collections.emptyList(), Collections.emptyList(), new HashMap() { @@ -236,7 +238,7 @@ public void testUpdateDeeplyNestedSubFieldThrows() throws Exception { put(DATA_EVOLUTION_NESTED_FIELD_ENABLED.key(), "true"); } })); - insertInto("T", "(1, CAST(ROW(10, ROW(1, 2)) AS ROW>))"); + insertInto("T", "(1, CAST(ROW(10, ROW(1, 2)) AS ROW>))"); sEnv.executeSql( buildDdl( @@ -257,7 +259,7 @@ public void testUpdateDeeplyNestedSubFieldThrows() throws Exception { () -> builder(warehouse, database, "T") .withMergeCondition("T.id=S.id") - .withMatchedUpdateSet("T.nest.inner.x=S.newx") + .withMatchedUpdateSet("T.nest.sub.x=S.newx") .withSourceTable("S") .withSinkParallelism(2) .build() diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala index 48418bb49a2f..d1cbec5d1315 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala @@ -101,8 +101,7 @@ class NestedSubfieldMergeIntoTest extends PaimonSparkTestBase { } } - test( - "Sub-field data evolution: disabled by default, sub-field update rewrites the whole column") { + test("Sub-field data evolution: disabled by default, sub-field update rewrites the whole column") { withTable("s", "t") { // data-evolution.nested-field.enabled is left at its default (false) sql(s""" From 075ac36e75fec223a2111054016ca1f70aea7a31 Mon Sep 17 00:00:00 2001 From: Xiangyi Zhu <82511136+zhuxiangyi@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:14:50 +0800 Subject: [PATCH 6/9] [core] Resolve nested dotted writeCols in row-id conflict checker RowIdColumnConflictChecker mapped each writeCol to a single top-level field id, so a dotted sub-field write column (e.g. nest.a) produced by sub-field data evolution threw 'Cannot find write column'. Resolve a write column to the set of leaf field ids it covers (a whole struct expands to all its leaves), so commit-time row-id conflict detection works at sub-field granularity: writes to different sub-fields no longer falsely conflict, while a whole-struct or full-schema write still conflicts with a sub-field write of the same struct. Add nested sub-field cases to RowIdColumnConflictCheckerTest. --- .../commit/RowIdColumnConflictChecker.java | 71 +++++++++++-------- .../RowIdColumnConflictCheckerTest.java | 54 ++++++++++++++ 2 files changed, 96 insertions(+), 29 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java index b2f8740f5269..30a3999b3e6c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java @@ -22,6 +22,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Range; import org.apache.paimon.utils.RangeHelper; @@ -50,7 +51,7 @@ public class RowIdColumnConflictChecker { private final SchemaManager schemaManager; private final List writeRanges; - private final Map> fieldIdByNameCache = new HashMap<>(); + private final Map rowTypeCache = new HashMap<>(); private RowIdColumnConflictChecker(SchemaManager schemaManager, List deltaFiles) { this.schemaManager = schemaManager; @@ -96,18 +97,13 @@ private List buildWriteRanges(List deltaFiles) { private void addWriteFieldIds(Set fieldIds, DataFileMeta file) { List writeCols = file.writeCols(); if (writeCols == null) { - fieldIds.addAll( - fieldIdByNameCache - .computeIfAbsent(file.schemaId(), this::fieldIdByName) - .values()); + // full-schema write touches every leaf field + collectLeafIds(rowType(file.schemaId()).getFields(), fieldIds); return; } for (String writeCol : writeCols) { - Integer fieldId = fieldId(file, writeCol); - if (fieldId != null) { - fieldIds.add(fieldId); - } + fieldIds.addAll(leafFieldIds(file.schemaId(), writeCol)); } } @@ -185,37 +181,54 @@ private boolean containsAnyWriteField(Set fieldIds, DataFileMeta file) } for (String writeCol : writeCols) { - Integer fieldId = fieldId(file, writeCol); - if (fieldId != null && fieldIds.contains(fieldId)) { - return true; + for (Integer fieldId : leafFieldIds(file.schemaId(), writeCol)) { + if (fieldIds.contains(fieldId)) { + return true; + } } } return false; } - private Integer fieldId(DataFileMeta file, String writeCol) { - Integer fieldId = - fieldIdByNameCache - .computeIfAbsent(file.schemaId(), this::fieldIdByName) - .get(writeCol); - if (fieldId == null) { - if (SpecialFields.isSystemField(writeCol)) { - return null; - } + /** + * Resolve a (possibly nested, dotted) write column such as {@code "nest.a"} to the set of leaf + * field ids it covers. A whole top-level struct column (e.g. {@code "nest"}) expands to all of + * its leaf ids, so a whole-struct write and a sub-field write of the same struct still + * conflict. + */ + private List leafFieldIds(long schemaId, String writeCol) { + if (SpecialFields.isSystemField(writeCol)) { + return Collections.emptyList(); + } + // projectByPaths handles both plain top-level names and dotted nested paths, and throws if + // the path does not exist in the schema + RowType projected; + try { + projected = rowType(schemaId).projectByPaths(Collections.singletonList(writeCol)); + } catch (IllegalArgumentException e) { throw new RuntimeException( String.format( - "Cannot find write column '%s' in schema %s.", - writeCol, file.schemaId())); + "Cannot find write column '%s' in schema %s.", writeCol, schemaId), + e); } - return fieldId; + List ids = new ArrayList<>(); + collectLeafIds(projected.getFields(), ids); + return ids; } - private Map fieldIdByName(long schemaId) { - Map fieldIdByName = new HashMap<>(); - for (DataField field : schemaManager.schema(schemaId).logicalRowType().getFields()) { - fieldIdByName.put(field.name(), field.id()); + private static void collectLeafIds(List fields, java.util.Collection out) { + for (DataField field : fields) { + if (field.type() instanceof RowType) { + collectLeafIds(((RowType) field.type()).getFields(), out); + } else { + out.add(field.id()); + } } - return fieldIdByName; + } + + private RowType rowType(long schemaId) { + return rowTypeCache.computeIfAbsent( + schemaId, id -> schemaManager.schema(id).logicalRowType()); } /** Range and field id Set. */ diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java index 8c45bfbb33ad..76ef065b6268 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java @@ -114,6 +114,43 @@ void testFailsOnUnknownNonSystemWriteColumn() { .hasMessageContaining("Cannot find write column 'missing'"); } + @Test + void testSubFieldDisjointLeavesDoNotConflict() { + // schema 2: id INT, nest ROW + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 2L, Arrays.asList("nest.a"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 2L, Arrays.asList("nest.b")))) + .isFalse(); + } + + @Test + void testSubFieldSameLeafConflicts() { + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 2L, Arrays.asList("nest.a"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 2L, Arrays.asList("nest.a")))) + .isTrue(); + } + + @Test + void testWholeStructConflictsWithSubField() { + // a whole-struct write expands to all of its leaves, so it conflicts with a sub-field write + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 2L, Arrays.asList("nest"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 2L, Arrays.asList("nest.a")))) + .isTrue(); + } + + @Test + void testFullSchemaWriteConflictsWithSubField() { + RowIdColumnConflictChecker checker = checker(file("current", 0L, 10L, 2L, null)); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 2L, Arrays.asList("nest.a")))) + .isTrue(); + } + private RowIdColumnConflictChecker checker(DataFileMeta... files) { return RowIdColumnConflictChecker.fromDataFiles( createSchemaManager(), Arrays.asList(files)); @@ -170,6 +207,23 @@ private SchemaManager createSchemaManager() { Collections.singletonList("id"), Collections.emptyMap(), ""))); + schemas.put( + 2L, + org.apache.paimon.schema.TableSchema.create( + 2L, + new Schema( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField( + 1, + "nest", + DataTypes.ROW( + new DataField(2, "a", DataTypes.INT()), + new DataField(3, "b", DataTypes.INT())))), + Collections.emptyList(), + Collections.singletonList("id"), + Collections.emptyMap(), + ""))); return new TestingSchemaManager( new Path("/tmp/row-id-column-conflict-checker-test"), schemas); } From ef144accb1d24693869a77b55c1c74107ede6c85 Mon Sep 17 00:00:00 2001 From: Xiangyi Zhu <82511136+zhuxiangyi@users.noreply.github.com> Date: Wed, 24 Jun 2026 22:43:26 +0800 Subject: [PATCH 7/9] [core][flink] Address review: stricter dotted-path projection and scoped partial-struct check - RowType.projectByPaths: reject a dotted path whose head field is not a ROW (e.g. 'id.a' on a scalar) instead of silently selecting the whole field, so invalid nested writeCols/SET targets fail early. Add DataTypesTest coverage. - DataEvolutionMergeIntoAction.checkSchema: only accept a partial (subset) struct source for columns actually written through dotted sub-field paths; whole-column assignments (e.g. T.nest=S.nest) stay on the full-type compatibility check so a narrower source struct is rejected rather than written as an incomplete whole-struct file. --- .../java/org/apache/paimon/types/RowType.java | 16 ++++-- .../apache/paimon/types/DataTypesTest.java | 49 +++++++++++++++++++ .../action/DataEvolutionMergeIntoAction.java | 25 ++++++++-- 3 files changed, 81 insertions(+), 9 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java index 8b94ccb18322..850d61867032 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java @@ -358,15 +358,23 @@ private static RowType projectTypeByPaths(RowType type, List paths) { continue; } matched.add(field.name()); - if (wholeChildren.contains(field.name()) - || subPaths.isEmpty() - || !(field.type() instanceof RowType)) { + if (wholeChildren.contains(field.name()) || subPaths.isEmpty()) { result.add(field); - } else { + } else if (field.type() instanceof RowType) { RowType prunedChild = projectTypeByPaths((RowType) field.type(), subPaths) .copy(field.type().isNullable()); result.add(field.newType(prunedChild)); + } else { + // a dotted path addresses a sub-field, but this field is not a ROW; reject rather + // than silently selecting the whole field, so invalid dotted paths surface early + throw new IllegalArgumentException( + "Cannot project sub-field(s) " + + subPaths + + " of non-ROW field '" + + field.name() + + "' in " + + type); } } if (!matched.containsAll(childToSubPaths.keySet())) { diff --git a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java index df4cf0679be0..d914de709ae6 100644 --- a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java @@ -206,6 +206,55 @@ void testRowType() { .isInstanceOf(IllegalArgumentException.class); } + @Test + void testProjectByPaths() { + RowType type = + new RowType( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField( + 1, + "nest", + DataTypes.ROW( + new DataField(2, "a", DataTypes.INT()), + new DataField(3, "b", DataTypes.STRING()))))); + + // dotted path selects only the addressed sub-field, preserving ids + RowType onlyNestA = type.projectByPaths(Collections.singletonList("nest.a")); + Assertions.assertThat(onlyNestA.getFieldNames()).containsExactly("nest"); + RowType nestSub = (RowType) onlyNestA.getField("nest").type(); + Assertions.assertThat(nestSub.getFieldNames()).containsExactly("a"); + Assertions.assertThat(nestSub.getField("a").id()).isEqualTo(2); + + // a plain top-level name selects the whole field + RowType wholeNest = + (RowType) + type.projectByPaths(Collections.singletonList("nest")) + .getField("nest") + .type(); + Assertions.assertThat(wholeNest.getFieldNames()).containsExactly("a", "b"); + + // mixing a whole column and a sub-field + Assertions.assertThat(type.projectByPaths(Arrays.asList("id", "nest.b")).getFieldNames()) + .containsExactly("id", "nest"); + + // a dotted path whose head is not a ROW is rejected (not silently widened to the scalar) + assertThatThrownBy(() -> type.projectByPaths(Collections.singletonList("id.a"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("non-ROW field 'id'"); + + // an unknown path is rejected + assertThatThrownBy(() -> type.projectByPaths(Collections.singletonList("missing"))) + .isInstanceOf(IllegalArgumentException.class); + + // a column whose name itself contains a dot is matched exactly (not split) + RowType dotted = + new RowType(Collections.singletonList(new DataField(0, "a.b", DataTypes.INT()))); + Assertions.assertThat( + dotted.projectByPaths(Collections.singletonList("a.b")).getFieldNames()) + .containsExactly("a.b"); + } + // -------------------------------------------------------------------------------------------- private static ThrowingConsumer baseAssertions(String sqlString, DataType otherType) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java index 2e7fd58adce7..e352495c0a96 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java @@ -671,12 +671,16 @@ private void checkSchema(Table source) { .getTypeRoot() .getFamilies() .contains(DataTypeFamily.BINARY_STRING); - // For sub-field-level data evolution, a struct column's source value is a partial - // (subset) ROW carrying only the updated sub-fields, which is not directly - // cast-compatible with the full target struct. Accept it when every source - // sub-field exists in the target struct with a compatible cast. + // For sub-field-level data evolution, a struct column written through dotted paths + // (e.g. nest.a) has a partial (subset) ROW source carrying only the updated + // sub-fields, which is not directly cast-compatible with the full target struct. + // Accept it only for such sub-field writes (not whole-column assignments, which + // must + // stay on the full-type check), when every source sub-field exists in the target + // struct with a compatible cast. boolean partialStructCompatible = - paimonType instanceof RowType + isSubFieldWrite(flinkColumn.getName()) + && paimonType instanceof RowType && targetField.type() instanceof RowType && isCompatiblePartialStruct( (RowType) paimonType, (RowType) targetField.type()); @@ -716,6 +720,17 @@ private boolean isCompatiblePartialStruct(RowType part, RowType full) { return true; } + /** + * Whether the given top-level column is written through dotted sub-field paths (e.g. nest.a). + */ + private boolean isSubFieldWrite(String topColumn) { + if (writePaths == null) { + return false; + } + String prefix = topColumn + "."; + return writePaths.stream().anyMatch(p -> p.startsWith(prefix)); + } + private void handleSqls() { // NOTE: sql may change current catalog and database if (sourceSqls != null) { From 13bc9d824b6b158618b5f6c903bc1e32bdd67459 Mon Sep 17 00:00:00 2001 From: Xiangyi Zhu <82511136+zhuxiangyi@users.noreply.github.com> Date: Wed, 24 Jun 2026 23:49:38 +0800 Subject: [PATCH 8/9] [flink] Fix whole-struct compatibility check for nested column assignment The previous review fix made whole-column struct assignments fall back to DataTypeCasts.supportsCompatibleCast, which does not support ROW-to-ROW casts and so rejected even an exact-matching whole-struct assignment (testUpdateWholeStructStillWorks). Use a structural check instead: a sub-field (dotted) write accepts a subset source struct, while a whole-column assignment requires the source to fully cover the target struct (a narrower source is rejected). Add a test for the narrower-source rejection. --- .../action/DataEvolutionMergeIntoAction.java | 53 ++++++++++++++----- .../NestedSubfieldMergeIntoActionITCase.java | 31 +++++++++++ 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java index e352495c0a96..dca1cea0eaed 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java @@ -671,21 +671,23 @@ private void checkSchema(Table source) { .getTypeRoot() .getFamilies() .contains(DataTypeFamily.BINARY_STRING); - // For sub-field-level data evolution, a struct column written through dotted paths - // (e.g. nest.a) has a partial (subset) ROW source carrying only the updated - // sub-fields, which is not directly cast-compatible with the full target struct. - // Accept it only for such sub-field writes (not whole-column assignments, which - // must - // stay on the full-type check), when every source sub-field exists in the target - // struct with a compatible cast. - boolean partialStructCompatible = - isSubFieldWrite(flinkColumn.getName()) - && paimonType instanceof RowType - && targetField.type() instanceof RowType - && isCompatiblePartialStruct( - (RowType) paimonType, (RowType) targetField.type()); + // Struct columns need a structural compatibility check: DataTypeCasts does not + // support ROW-to-ROW casts. For a sub-field write (dotted paths like nest.a) the + // source is a partial (subset) struct carrying only the updated sub-fields, so a + // subset check is correct. For a whole-column assignment (e.g. T.nest=S.nest) the + // source must fully cover the target struct, so a narrower source is rejected + // instead of being written as an incomplete whole-struct file. + boolean structCompatible = false; + if (paimonType instanceof RowType && targetField.type() instanceof RowType) { + RowType sourceStruct = (RowType) paimonType; + RowType targetStruct = (RowType) targetField.type(); + structCompatible = + isSubFieldWrite(flinkColumn.getName()) + ? isCompatiblePartialStruct(sourceStruct, targetStruct) + : isFullyCompatibleStruct(sourceStruct, targetStruct); + } if (!blobCompatible - && !partialStructCompatible + && !structCompatible && !DataTypeCasts.supportsCompatibleCast(paimonType, targetField.type())) { throw new IllegalStateException( String.format( @@ -720,6 +722,29 @@ private boolean isCompatiblePartialStruct(RowType part, RowType full) { return true; } + /** + * Whether {@code source} fully covers the target struct {@code target} for a whole-column + * assignment: every target sub-field must exist in {@code source} (by name) with a compatible + * cast. A source missing a target sub-field (a narrower struct) is rejected. + */ + private boolean isFullyCompatibleStruct(RowType source, RowType target) { + for (DataField targetField : target.getFields()) { + if (!source.containsField(targetField.name())) { + return false; + } + DataType sourceSubType = source.getField(targetField.name()).type(); + DataType targetSubType = targetField.type(); + if (sourceSubType instanceof RowType && targetSubType instanceof RowType) { + if (!isFullyCompatibleStruct((RowType) sourceSubType, (RowType) targetSubType)) { + return false; + } + } else if (!DataTypeCasts.supportsCompatibleCast(sourceSubType, targetSubType)) { + return false; + } + } + return true; + } + /** * Whether the given top-level column is written through dotted sub-field paths (e.g. nest.a). */ diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java index a8ca3ef0fae4..86f7f36ccdda 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java @@ -167,6 +167,37 @@ public void testUpdateWholeStructStillWorks() throws Exception { Arrays.asList(changelogRow("+I", 1, 100, "z"), changelogRow("+I", 2, 20, "y"))); } + @Test + public void testWholeStructAssignmentWithNarrowerSourceThrows() throws Exception { + // target nest is ROW; a whole-column assignment from a narrower source ROW must be + // rejected (it would otherwise be written as an incomplete whole-struct file) + prepareNestedTarget(true); + sEnv.executeSql( + buildDdl( + "S", + Arrays.asList("id INT", "nest ROW"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + } + })); + insertInto("S", "(1, CAST(ROW(100) AS ROW))"); + + assertThatThrownBy( + () -> + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest=S.nest") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run()) + .hasMessageContaining("incompatible"); + } + @Test public void testUpdateMultipleSubFieldsWritesOnlyThoseLeaves() throws Exception { // a 3-field struct so that updating two sub-fields is a strict subset (stays From 7211c708add4be97a3d196210f8e82ad4f0ce16d Mon Sep 17 00:00:00 2001 From: Xiangyi Zhu <82511136+zhuxiangyi@users.noreply.github.com> Date: Thu, 25 Jun 2026 16:45:36 +0800 Subject: [PATCH 9/9] [core] Reject deeper-than-one-level partial nested writeCols at write time The data-evolution read path composes only one nested level, but the write side (BaseAppendFileStoreWrite.withWriteType -> RowType.leafPaths) would happily persist a deeper partial path such as nest.sub.x. A low-level caller using BatchTableWrite.withWriteType(rowType.projectByPaths(["nest.sub.x"])) could therefore commit a file that makes later full-table reads throw. Make leafPaths fail-fast when a partial struct is nested inside another partial struct, so such a file can never be written/committed. Add DataTypesTest coverage. --- .../java/org/apache/paimon/types/RowType.java | 13 ++++++ .../apache/paimon/types/DataTypesTest.java | 42 +++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java index 850d61867032..87d28db02151 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java @@ -428,6 +428,19 @@ private static void collectLeafPaths( + path); } if (willExpand) { + // A partial struct nested inside another partial struct (a path deeper than one + // level, e.g. nest.sub.x) cannot be composed back on read — the data-evolution read + // path only assembles one nested level. Reject it here so such a file is never + // written/committed and later breaks full-table reads. + if (!prefix.isEmpty()) { + throw new UnsupportedOperationException( + "Sub-field-level data evolution supports only one level of partial " + + "nesting; the nested sub-field '" + + path + + "' cannot be partially written. Write the whole '" + + path + + "' sub-field instead."); + } collectLeafPaths( ((RowType) writeField.type()).getFields(), (RowType) fullField.type(), diff --git a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java index d914de709ae6..5d4c9c0f1964 100644 --- a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java @@ -255,6 +255,48 @@ void testProjectByPaths() { .containsExactly("a.b"); } + @Test + void testLeafPaths() { + RowType full = + new RowType( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField( + 1, + "nest", + DataTypes.ROW( + new DataField(2, "a", DataTypes.INT()), + new DataField( + 3, + "sub", + DataTypes.ROW( + new DataField( + 4, "x", DataTypes.INT()), + new DataField( + 5, + "y", + DataTypes.INT()))))))); + + // a full write collapses to top-level names (no dotted paths) + Assertions.assertThat(full.leafPaths(full)).containsExactly("id", "nest"); + + // one level of partial nesting: a direct sub-field of a top-level struct + Assertions.assertThat( + full.projectByPaths(Collections.singletonList("nest.a")).leafPaths(full)) + .containsExactly("nest.a"); + + // a whole sub-struct under a partial top-level struct is still one level + Assertions.assertThat( + full.projectByPaths(Collections.singletonList("nest.sub")).leafPaths(full)) + .containsExactly("nest.sub"); + + // deeper than one level (a partial sub-struct) is rejected so it can never be committed + RowType deepPartial = full.projectByPaths(Collections.singletonList("nest.sub.x")); + assertThatThrownBy(() -> deepPartial.leafPaths(full)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("one level"); + } + // -------------------------------------------------------------------------------------------- private static ThrowingConsumer baseAssertions(String sqlString, DataType otherType) {