Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,12 @@
<td>Boolean</td>
<td>Whether to persist source when process merge into action on data evolution table.</td>
</tr>
<tr>
<td><h5>data-evolution.nested-field.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable sub-field-level data evolution for nested (struct) columns. When enabled, an update that only touches some sub-fields of a nested column writes an incremental file containing just those sub-fields (aligned by row id); when disabled, the whole top-level column is rewritten. Requires data-evolution.enabled=true.</td>
</tr>
<tr>
<td><h5>data-file.external-paths</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
16 changes: 16 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2310,6 +2310,18 @@ public InlineElement getDescription() {
.defaultValue(false)
.withDescription("Whether enable data evolution for row tracking table.");

public static final ConfigOption<Boolean> DATA_EVOLUTION_NESTED_FIELD_ENABLED =
key("data-evolution.nested-field.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable sub-field-level data evolution for nested (struct) "
+ "columns. When enabled, an update that only touches some "
+ "sub-fields of a nested column writes an incremental file "
+ "containing just those sub-fields (aligned by row id); when "
+ "disabled, the whole top-level column is rewritten. Requires "
+ "data-evolution.enabled=true.");

public static final ConfigOption<Boolean> DATA_EVOLUTION_MERGE_INTO_FILE_PRUNING =
key("data-evolution.merge-into.file-pruning")
.booleanType()
Expand Down Expand Up @@ -3860,6 +3872,10 @@ public boolean dataEvolutionEnabled() {
return options.get(DATA_EVOLUTION_ENABLED);
}

public boolean dataEvolutionNestedFieldEnabled() {
return options.get(DATA_EVOLUTION_NESTED_FIELD_ENABLED);
}

public boolean dataEvolutionMergeIntoFilePruning() {
return options.get(DATA_EVOLUTION_MERGE_INTO_FILE_PRUNING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ public TableSchema project(@Nullable List<String> writeCols) {
return new TableSchema(
version,
id,
new RowType(fields).project(writeCols).getFields(),
// writeCols may contain nested dotted paths (e.g. "nest.a") for sub-field-level
// data evolution; projectByPaths handles both plain top-level names and paths
new RowType(fields).projectByPaths(writeCols).getFields(),
highestFieldId,
partitionKeys,
primaryKeys,
Expand Down
138 changes: 138 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/types/RowType.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,144 @@ public RowType project(String... names) {
return project(Arrays.asList(names));
}

/**
* Project this row type by a list of (possibly nested) dotted paths, e.g. {@code ["f0",
* "nest.a"]}. A path without a dot selects the whole top-level field (same as {@link
* #project(List)}); a dotted path selects only the addressed sub-field of a nested {@link
* RowType}, preserving field ids and nullability of every level. Schema field order is
* preserved. This is used by data evolution to reconstruct the partial nested schema of a
* column-group file from its {@code writeCols}.
*/
public RowType projectByPaths(List<String> paths) {
return projectTypeByPaths(this, paths);
}

private static RowType projectTypeByPaths(RowType type, List<String> paths) {
// group paths by their immediate child name; a child appearing without a tail (or also with
// a tail) is selected as a whole field
Map<String, List<String>> childToSubPaths = new HashMap<>();
Set<String> wholeChildren = new HashSet<>();
Set<String> fieldNames = new HashSet<>();
for (DataField field : type.getFields()) {
fieldNames.add(field.name());
}
for (String path : paths) {
int dot = path.indexOf('.');
// Prefer an exact field-name match so a column whose name itself contains a dot (and
// any
// plain top-level name) is selected whole; only split into head.tail for genuine nested
// sub-field paths that do not name a field directly. This keeps backward compatibility
// with the legacy exact-name project(List).
if (dot < 0 || fieldNames.contains(path)) {
childToSubPaths.computeIfAbsent(path, k -> new ArrayList<>());
wholeChildren.add(path);
} else {
String head = path.substring(0, dot);
String tail = path.substring(dot + 1);
childToSubPaths.computeIfAbsent(head, k -> new ArrayList<>()).add(tail);
}
}

Set<String> matched = new HashSet<>();
List<DataField> result = new ArrayList<>();
for (DataField field : type.getFields()) {
List<String> subPaths = childToSubPaths.get(field.name());
if (subPaths == null) {
continue;
}
matched.add(field.name());
if (wholeChildren.contains(field.name())
|| subPaths.isEmpty()
|| !(field.type() instanceof RowType)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reject dotted paths when the selected head is not a ROW? With the current branch, projectByPaths(Collections.singletonList("id.a")) falls into this arm and returns the whole id field. That makes invalid dotted writeCols look valid to callers such as the conflict checker, and in the Flink action an invalid SET target under a scalar can pass path resolution before failing later with a less helpful error. Since dotted paths now encode physical sub-fields, this should throw unless the head field is a ROW, or the whole path matched an exact top-level field name.

result.add(field);
} else {
RowType prunedChild =
projectTypeByPaths((RowType) field.type(), subPaths)
.copy(field.type().isNullable());
result.add(field.newType(prunedChild));
}
}
if (!matched.containsAll(childToSubPaths.keySet())) {
Set<String> unknown = new HashSet<>(childToSubPaths.keySet());
unknown.removeAll(matched);
throw new IllegalArgumentException(
"Cannot project by paths, unknown field(s) " + unknown + " in " + type);
}
return new RowType(type.isNullable(), result);
}

/**
* Compute the dotted paths describing this (possibly partially nested) write type relative to a
* full row type. A top-level field, or a nested field whose structure fully covers the
* corresponding field in {@code fullType}, is emitted by its name; a nested field that only
* covers some sub-fields is expanded into dotted leaf paths. This is the inverse of {@link
* #projectByPaths(List)} and is used to derive {@code writeCols}.
*/
public List<String> leafPaths(RowType fullType) {
List<String> result = new ArrayList<>();
collectLeafPaths(getFields(), fullType, "", result);
return result;
}

private static void collectLeafPaths(
List<DataField> writeFields, RowType fullType, String prefix, List<String> out) {
for (DataField writeField : writeFields) {
String path = prefix.isEmpty() ? writeField.name() : prefix + "." + writeField.name();
// A field absent from the reference type (e.g. the _ROW_ID / _SEQUENCE_NUMBER special
// fields added by row tracking, which are not part of the table's logical row type) has
// no sub-field split: emit it whole by name, matching the legacy getFieldNames()
// output.
if (!fullType.containsField(writeField.id())) {
out.add(path);
continue;
}
DataField fullField = fullType.getField(writeField.id());
boolean willExpand =
writeField.type() instanceof RowType
&& fullField.type() instanceof RowType
&& !coversFully(
(RowType) writeField.type(), (RowType) fullField.type());
// A dotted path is only unambiguous if no name segment contains a literal '.'. A name
// with a dot is fine when emitted whole at top level (projectByPaths matches it
// exactly),
// but not when it participates in a multi-segment nested path.
if (writeField.name().indexOf('.') >= 0 && (!prefix.isEmpty() || willExpand)) {
throw new UnsupportedOperationException(
"Sub-field-level data evolution does not support a nested field whose name "
+ "contains '.': "
+ path);
}
if (willExpand) {
collectLeafPaths(
((RowType) writeField.type()).getFields(),
(RowType) fullField.type(),
path,
out);
} else {
out.add(path);
}
}
}

/** Whether {@code part} contains every (recursively nested) field of {@code full}. */
private static boolean coversFully(RowType part, RowType full) {
if (part.getFieldCount() != full.getFieldCount()) {
return false;
}
for (DataField fullField : full.getFields()) {
if (!part.containsField(fullField.id())) {
return false;
}
DataField partField = part.getField(fullField.id());
if (partField.type() instanceof RowType && fullField.type() instanceof RowType) {
if (!coversFully((RowType) partField.type(), (RowType) fullField.type())) {
return false;
}
}
}
return true;
}

private Map<String, DataField> nameToField() {
Map<String, DataField> nameToField = this.laziedNameToField;
if (nameToField == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,18 @@ public class DataEvolutionFileReader implements RecordReader<InternalRow> {
private final int[] rowOffsets;
private final int[] fieldOffsets;
private final RecordReader<InternalRow>[] readers;
@Nullable private final DataEvolutionRow.NestedField[] nested;

public DataEvolutionFileReader(
int[] rowOffsets, int[] fieldOffsets, RecordReader<InternalRow>[] readers) {
this(rowOffsets, fieldOffsets, readers, null);
}

public DataEvolutionFileReader(
int[] rowOffsets,
int[] fieldOffsets,
RecordReader<InternalRow>[] readers,
@Nullable DataEvolutionRow.NestedField[] nested) {
checkArgument(rowOffsets != null, "Row offsets must not be null");
checkArgument(fieldOffsets != null, "Field offsets must not be null");
checkArgument(
Expand All @@ -70,12 +79,14 @@ public DataEvolutionFileReader(
this.rowOffsets = rowOffsets;
this.fieldOffsets = fieldOffsets;
this.readers = readers;
this.nested = nested;
}

@Override
@Nullable
public RecordIterator<InternalRow> readBatch() throws IOException {
DataEvolutionRow row = new DataEvolutionRow(readers.length, rowOffsets, fieldOffsets);
row.setNested(nested);
RecordIterator<InternalRow>[] iterators = new RecordIterator[readers.length];
for (int i = 0; i < readers.length; i++) {
RecordReader<InternalRow> reader = readers[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ public class DataEvolutionRow implements InternalRow {
private final InternalRow[] rows;
private final int[] rowOffsets;
private final int[] fieldOffsets;

/**
* Optional per-top-level-field plan to assemble a nested struct whose sub-fields are spread
* across several source files (sub-field-level data evolution). {@code null} (or a {@code null}
* entry) means the field is taken whole from a single source row (the common case).
*/
private NestedField[] nested;

private RowKind rowKind;

public DataEvolutionRow(int rowNumber, int[] rowOffsets, int[] fieldOffsets) {
Expand All @@ -43,6 +51,10 @@ public DataEvolutionRow(int rowNumber, int[] rowOffsets, int[] fieldOffsets) {
this.fieldOffsets = fieldOffsets;
}

public void setNested(NestedField[] nested) {
this.nested = nested;
}

public int rowNumber() {
return rows.length;
}
Expand All @@ -59,6 +71,21 @@ public void setRow(int pos, InternalRow row) {
}
}

private void setRowsAllowNull(InternalRow[] newRows) {
for (int i = 0; i < newRows.length; i++) {
this.rows[i] = newRows[i];
if (rowKind == null && newRows[i] != null) {
this.rowKind = newRows[i].getRowKind();
}
}
if (rowKind == null) {
// a composed struct whose every source partial is null still needs a defined kind so
// getRowKind() never returns null; the kind of an assembled struct value is not
// meaningful, so default to INSERT
this.rowKind = RowKind.INSERT;
}
}

public void setRows(InternalRow[] rows) {
if (rows.length != this.rows.length) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -97,10 +124,22 @@ public void setRowKind(RowKind kind) {

@Override
public boolean isNullAt(int pos) {
if (nested != null && nested[pos] != null) {
// a composed struct is null only when none of its source files provide it
NestedField nf = nested[pos];
for (int k = 0; k < nf.numPartials; k++) {
InternalRow src = rows[nf.partialReader[k]];
if (src != null && !src.isNullAt(nf.partialOffset[k])) {
return false;
}
}
return true;
}
if (rowOffsets[pos] < 0) {
return true;
}
return chooseRow(pos).isNullAt(offsetInRow(pos));
InternalRow row = chooseRow(pos);
return row == null || row.isNullAt(offsetInRow(pos));
}

@Override
Expand Down Expand Up @@ -185,6 +224,54 @@ public InternalMap getMap(int pos) {

@Override
public InternalRow getRow(int pos, int numFields) {
if (nested != null && nested[pos] != null) {
NestedField nf = nested[pos];
InternalRow[] partials = new InternalRow[nf.numPartials];
for (int k = 0; k < nf.numPartials; k++) {
InternalRow src = rows[nf.partialReader[k]];
partials[k] =
(src == null || src.isNullAt(nf.partialOffset[k]))
? null
: src.getRow(nf.partialOffset[k], nf.partialSize[k]);
}
DataEvolutionRow composed =
new DataEvolutionRow(nf.numPartials, nf.subRowOffsets, nf.subFieldOffsets);
composed.setRowsAllowNull(partials);
return composed;
}
return chooseRow(pos).getRow(offsetInRow(pos), numFields);
}

/**
* Plan to assemble one nested struct from sub-fields spread across several source files. A
* "partial" is the projection of the struct read from a single source file; each output
* sub-field is sourced from one partial via {@code subRowOffsets}/{@code subFieldOffsets}.
*/
public static class NestedField {

final int numPartials;
// per partial struct: the source reader, the struct's position in that reader's row, and
// the
// number of fields of the struct as read from that file
final int[] partialReader;
final int[] partialOffset;
final int[] partialSize;
// per output sub-field: which partial it comes from, and its offset within that partial
final int[] subRowOffsets;
final int[] subFieldOffsets;

public NestedField(
int[] partialReader,
int[] partialOffset,
int[] partialSize,
int[] subRowOffsets,
int[] subFieldOffsets) {
this.numPartials = partialReader.length;
this.partialReader = partialReader;
this.partialOffset = partialOffset;
this.partialSize = partialSize;
this.subRowOffsets = subRowOffsets;
this.subFieldOffsets = subFieldOffsets;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,10 @@ public void withWriteType(RowType writeType) {
if (blobContext != null) {
blobContext = blobContext.withWriteType(writeType);
}
int fullCount = rowType.getFieldCount();
List<String> fullNames = rowType.getFieldNames();
this.writeCols = writeType.getFieldNames();
// writeCols carries (possibly nested) dotted paths, e.g. ["f0", "nest.a"]; a plain
// top-level name means the whole column, a dotted path means only that sub-field is written
this.writeCols = writeType.leafPaths(rowType);
// optimize writeCols to null in following cases:
// writeType contains all columns (without _ROW_ID and _SEQUENCE_NUMBER)
if (writeCols.equals(fullNames)) {
Expand Down Expand Up @@ -277,7 +278,9 @@ private RowDataRollingFileWriter createRollingFileWriter(
FileSource.COMPACT,
options.asyncFileWrite(),
options.statsDenseStore(),
rowType.equals(writeType) ? null : writeType.getFieldNames());
// use the same dotted-leaf-path encoding as withWriteType so a partial nested
// writeType records its real sub-field content consistently across write paths
rowType.equals(writeType) ? null : writeType.leafPaths(rowType));
}

private RecordReaderIterator<InternalRow> createFilesIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,15 @@ static EvolutionStats evolutionStats(
for (int k = 0; k < fieldIdsWithStats.length; k++) {
if (fieldId == fieldIdsWithStats[k]) {
DataType fileType = dataFileSchemaWithStats.fields().get(k).type();
// A sub-field-level data evolution file may store only part of a
// nested struct (e.g. nest<a> of nest<a,b>); its file type then
// does
// not equal the full target struct. We intentionally skip stats in
// that case (leaving the field as "no stats" so no file is wrongly
// pruned) rather than composing partial-struct stats across files;
// struct columns rarely carry useful min/max and data evolution
// does
// not push predicates down, so the lost benefit is negligible.
if (!fileType.equalsIgnoreFieldId(targetType)) {
continue loop1;
}
Expand Down
Loading
Loading