diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingColumnAllocator.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingColumnAllocator.java new file mode 100644 index 000000000000..9a3a9c896556 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingColumnAllocator.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * Per-row physical column allocator for one shared-shredding MAP column. + * + *

This is a simple temporary implementation which assigns fields to physical columns by row + * order. A later version will use a more sophisticated LRU-style allocator to improve column reuse + * across rows. + */ +public class MapSharedShreddingColumnAllocator { + + private final int numColumns; + private final Map> fieldToColumns = new TreeMap<>(); + private final Set overflowFieldSet = new TreeSet<>(); + private int maxRowWidth = 0; + + public MapSharedShreddingColumnAllocator(int numColumns) { + this.numColumns = numColumns; + } + + public RowAllocation allocateRow(List fieldIds) { + maxRowWidth = Math.max(maxRowWidth, fieldIds.size()); + + int[] colToField = new int[numColumns]; + for (int i = 0; i < numColumns; i++) { + colToField[i] = -1; + } + + int assignLimit = Math.min(fieldIds.size(), numColumns); + for (int i = 0; i < assignLimit; i++) { + int fieldId = fieldIds.get(i); + colToField[i] = fieldId; + fieldToColumns.computeIfAbsent(fieldId, ignored -> new TreeSet<>()).add(i); + } + + List overflowFields = new ArrayList<>(); + for (int i = assignLimit; i < fieldIds.size(); i++) { + int fieldId = fieldIds.get(i); + overflowFields.add(fieldId); + overflowFieldSet.add(fieldId); + } + + return new RowAllocation(colToField, overflowFields); + } + + public Map> fieldToColumns() { + Map> result = new TreeMap<>(); + for (Map.Entry> entry : fieldToColumns.entrySet()) { + result.put( + entry.getKey(), + Collections.unmodifiableList(new ArrayList<>(entry.getValue()))); + } + return Collections.unmodifiableMap(result); + } + + public Set overflowFieldSet() { + return Collections.unmodifiableSet(overflowFieldSet); + } + + public int maxRowWidth() { + return maxRowWidth; + } + + public int numColumns() { + return numColumns; + } + + /** Physical column allocation for one row. */ + public static class RowAllocation { + + private final int[] colToField; + private final List overflowFields; + + private RowAllocation(int[] colToField, List overflowFields) { + this.colToField = colToField; + this.overflowFields = Collections.unmodifiableList(overflowFields); + } + + public int[] colToField() { + return colToField.clone(); + } + + public List overflowFields() { + return overflowFields; + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingContext.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingContext.java new file mode 100644 index 000000000000..9c301e672092 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingContext.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Cross-file state for shared-shredding MAP columns. */ +public class MapSharedShreddingContext { + + private static final int WINDOW_SIZE = 20; + private static final double PERCENTILE_RATIO = 0.90; + private static final int MAX_CLOSE_ABSOLUTE_SLACK = 4; + private static final double MAX_CLOSE_RELATIVE_RATIO = 1.25; + + private final Map columnToMaxColumns; + private final Map> recentMaxRowWidths; + private final List shreddingColumnNames; + + public MapSharedShreddingContext(Map columnToMaxColumns) { + this.columnToMaxColumns = new TreeMap<>(columnToMaxColumns); + this.recentMaxRowWidths = new TreeMap<>(); + this.shreddingColumnNames = + Collections.unmodifiableList(new ArrayList<>(this.columnToMaxColumns.keySet())); + } + + /** Returns the physical column count K to use for each shared-shredding field. */ + public Map computeNextK() { + Map result = new TreeMap<>(); + for (Map.Entry entry : columnToMaxColumns.entrySet()) { + String fieldName = entry.getKey(); + int maxColumns = entry.getValue(); + Deque widths = recentMaxRowWidths.get(fieldName); + if (widths == null || widths.isEmpty()) { + result.put(fieldName, maxColumns); + } else { + int adaptiveWidth = computeAdaptiveWidth(new ArrayList<>(widths)); + result.put(fieldName, Math.max(1, Math.min(adaptiveWidth, maxColumns))); + } + } + return result; + } + + /** Reports one completed file's maximum row width for a shared-shredding field. */ + public void reportFileStats(String fieldName, int maxRowWidth) { + Deque widths = + recentMaxRowWidths.computeIfAbsent(fieldName, ignored -> new ArrayDeque<>()); + widths.addLast(maxRowWidth); + while (widths.size() > WINDOW_SIZE) { + widths.removeFirst(); + } + } + + public List getShreddingColumnNames() { + return shreddingColumnNames; + } + + public boolean isEmpty() { + return columnToMaxColumns.isEmpty(); + } + + private static int computeAdaptiveWidth(List values) { + checkArgument(!values.isEmpty(), "values should not be empty."); + + Collections.sort(values); + int maxWidth = values.get(values.size() - 1); + int percentileRank = (int) Math.ceil(PERCENTILE_RATIO * values.size()); + percentileRank = Math.max(1, Math.min(percentileRank, values.size())); + int percentileWidth = values.get(percentileRank - 1); + + int relativeCloseThreshold = + (int) Math.ceil((double) percentileWidth * MAX_CLOSE_RELATIVE_RATIO); + if (maxWidth - percentileWidth <= MAX_CLOSE_ABSOLUTE_SLACK + || maxWidth <= relativeCloseThreshold) { + return maxWidth; + } + return percentileWidth; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingFieldDict.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingFieldDict.java new file mode 100644 index 000000000000..b315e5af2441 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingFieldDict.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; + +/** File-local field name to field id dictionary for one shared-shredding MAP column. */ +public class MapSharedShreddingFieldDict { + + private final Map nameToId = new TreeMap<>(); + private int nextId = 0; + + public int getOrAssign(String name) { + Integer id = nameToId.get(name); + if (id != null) { + return id; + } + int newId = nextId++; + nameToId.put(name, newId); + return newId; + } + + public Map nameToId() { + return Collections.unmodifiableMap(nameToId); + } + + public int size() { + return nextId; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingRowConverter.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingRowConverter.java new file mode 100644 index 000000000000..2b0bc532c7eb --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingRowConverter.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** Converts logical rows containing shared-shredding MAP fields into physical rows. */ +public class MapSharedShreddingRowConverter { + + private final RowType logicalType; + private final RowType physicalType; + private final Map contextByFieldName; + private final ColumnContext[] contextByFieldPos; + private final List shreddingFieldNames; + + public MapSharedShreddingRowConverter( + RowType logicalType, Map fieldToNumColumns) { + this.logicalType = logicalType; + this.physicalType = + MapSharedShreddingUtils.logicalToPhysicalSchema(logicalType, fieldToNumColumns); + this.contextByFieldName = new LinkedHashMap<>(); + this.contextByFieldPos = new ColumnContext[logicalType.getFieldCount()]; + this.shreddingFieldNames = new ArrayList<>(); + + for (int i = 0; i < logicalType.getFieldCount(); i++) { + DataField field = logicalType.getFields().get(i); + Integer numColumns = fieldToNumColumns.get(field.name()); + if (numColumns == null) { + continue; + } + + MapType mapType = (MapType) field.type(); + ColumnContext context = new ColumnContext(field.name(), numColumns, mapType); + contextByFieldName.put(field.name(), context); + contextByFieldPos[i] = context; + shreddingFieldNames.add(field.name()); + } + } + + public RowType physicalType() { + return physicalType; + } + + public List shreddingFieldNames() { + return Collections.unmodifiableList(shreddingFieldNames); + } + + public InternalRow convert(InternalRow logicalRow) { + return new SharedShreddingRow(logicalRow); + } + + public MapSharedShreddingFieldMeta buildFieldMeta(String fieldName) { + ColumnContext context = contextByFieldName.get(fieldName); + if (context == null) { + throw new IllegalArgumentException( + "Cannot find shared-shredding field in row converter: " + fieldName); + } + return new MapSharedShreddingFieldMeta( + context.dict.nameToId(), + context.allocator.fieldToColumns(), + context.allocator.overflowFieldSet(), + context.allocator.numColumns(), + context.allocator.maxRowWidth()); + } + + private InternalRow convertMap(InternalMap map, ColumnContext context) { + int size = map.size(); + InternalArray keys = map.keyArray(); + InternalArray values = map.valueArray(); + + List fieldIds = new ArrayList<>(size); + Map fieldIdToValueIndex = new LinkedHashMap<>(); + for (int i = 0; i < size; i++) { + Object key = context.keyGetter.getElementOrNull(keys, i); + if (key == null) { + throw new IllegalArgumentException( + "Shared-shredding MAP keys cannot be null for field: " + context.fieldName); + } + int fieldId = context.dict.getOrAssign(key.toString()); + fieldIds.add(fieldId); + fieldIdToValueIndex.put(fieldId, i); + } + + MapSharedShreddingColumnAllocator.RowAllocation allocation = + context.allocator.allocateRow(fieldIds); + int[] colToField = allocation.colToField(); + GenericRow physicalStruct = new GenericRow(context.numColumns + 2); + physicalStruct.setField(0, new GenericArray(colToField)); + for (int i = 0; i < context.numColumns; i++) { + int fieldId = colToField[i]; + if (fieldId == -1) { + physicalStruct.setField(1 + i, null); + } else { + physicalStruct.setField( + 1 + i, + context.valueGetter.getElementOrNull( + values, fieldIdToValueIndex.get(fieldId))); + } + } + + if (allocation.overflowFields().isEmpty()) { + physicalStruct.setField(1 + context.numColumns, null); + } else { + Map overflow = new LinkedHashMap<>(); + for (Integer fieldId : allocation.overflowFields()) { + overflow.put( + fieldId, + context.valueGetter.getElementOrNull( + values, fieldIdToValueIndex.get(fieldId))); + } + physicalStruct.setField(1 + context.numColumns, new GenericMap(overflow)); + } + return physicalStruct; + } + + private class SharedShreddingRow implements InternalRow { + + private final InternalRow row; + private final InternalRow[] convertedFields; + private final boolean[] converted; + + private SharedShreddingRow(InternalRow row) { + this.row = row; + this.convertedFields = new InternalRow[contextByFieldPos.length]; + this.converted = new boolean[contextByFieldPos.length]; + } + + @Override + public int getFieldCount() { + return row.getFieldCount(); + } + + @Override + public RowKind getRowKind() { + return row.getRowKind(); + } + + @Override + public void setRowKind(RowKind kind) { + row.setRowKind(kind); + } + + @Override + public boolean isNullAt(int pos) { + return row.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return row.getBoolean(pos); + } + + @Override + public byte getByte(int pos) { + return row.getByte(pos); + } + + @Override + public short getShort(int pos) { + return row.getShort(pos); + } + + @Override + public int getInt(int pos) { + return row.getInt(pos); + } + + @Override + public long getLong(int pos) { + return row.getLong(pos); + } + + @Override + public float getFloat(int pos) { + return row.getFloat(pos); + } + + @Override + public double getDouble(int pos) { + return row.getDouble(pos); + } + + @Override + public BinaryString getString(int pos) { + return row.getString(pos); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + return row.getDecimal(pos, precision, scale); + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + return row.getTimestamp(pos, precision); + } + + @Override + public byte[] getBinary(int pos) { + return row.getBinary(pos); + } + + @Override + public Variant getVariant(int pos) { + return row.getVariant(pos); + } + + @Override + public Blob getBlob(int pos) { + return row.getBlob(pos); + } + + @Override + public InternalArray getArray(int pos) { + return row.getArray(pos); + } + + @Override + public InternalVector getVector(int pos) { + return row.getVector(pos); + } + + @Override + public InternalMap getMap(int pos) { + return row.getMap(pos); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + ColumnContext context = contextByFieldPos[pos]; + if (context == null) { + return row.getRow(pos, numFields); + } + if (row.isNullAt(pos)) { + return null; + } + if (!converted[pos]) { + convertedFields[pos] = convertMap(row.getMap(pos), context); + converted[pos] = true; + } + return convertedFields[pos]; + } + } + + private static class ColumnContext { + + private final String fieldName; + private final int numColumns; + private final InternalArray.ElementGetter keyGetter; + private final InternalArray.ElementGetter valueGetter; + private final MapSharedShreddingFieldDict dict; + private final MapSharedShreddingColumnAllocator allocator; + + private ColumnContext(String fieldName, int numColumns, MapType mapType) { + this.fieldName = fieldName; + this.numColumns = numColumns; + this.keyGetter = InternalArray.createElementGetter(mapType.getKeyType()); + DataType valueType = mapType.getValueType(); + this.valueGetter = InternalArray.createElementGetter(valueType); + this.dict = new MapSharedShreddingFieldDict(); + this.allocator = new MapSharedShreddingColumnAllocator(numColumns); + } + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingColumnAllocatorTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingColumnAllocatorTest.java new file mode 100644 index 000000000000..933e1faf6180 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingColumnAllocatorTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link MapSharedShreddingColumnAllocator}. */ +class MapSharedShreddingColumnAllocatorTest { + + @Test + void testBasicAllocation() { + MapSharedShreddingColumnAllocator allocator = new MapSharedShreddingColumnAllocator(3); + + MapSharedShreddingColumnAllocator.RowAllocation allocation = + allocator.allocateRow(Arrays.asList(10, 20)); + + assertThat(allocation.colToField()).containsExactly(10, 20, -1); + assertThat(allocation.overflowFields()).isEmpty(); + } + + @Test + void testExactlyKFields() { + MapSharedShreddingColumnAllocator allocator = new MapSharedShreddingColumnAllocator(3); + + MapSharedShreddingColumnAllocator.RowAllocation allocation = + allocator.allocateRow(Arrays.asList(0, 1, 2)); + + assertThat(allocation.colToField()).containsExactly(0, 1, 2); + assertThat(allocation.overflowFields()).isEmpty(); + } + + @Test + void testOverflowWhenExceedK() { + MapSharedShreddingColumnAllocator allocator = new MapSharedShreddingColumnAllocator(2); + + MapSharedShreddingColumnAllocator.RowAllocation allocation = + allocator.allocateRow(Arrays.asList(10, 20, 30, 40)); + + assertThat(allocation.colToField()).containsExactly(10, 20); + assertThat(allocation.overflowFields()).containsExactly(30, 40); + assertThatThrownBy(() -> allocation.overflowFields().add(50)) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + void testEmptyRow() { + MapSharedShreddingColumnAllocator allocator = new MapSharedShreddingColumnAllocator(3); + + MapSharedShreddingColumnAllocator.RowAllocation allocation = + allocator.allocateRow(Arrays.asList()); + + assertThat(allocation.colToField()).containsExactly(-1, -1, -1); + assertThat(allocation.overflowFields()).isEmpty(); + } + + @Test + void testMaxRowWidthTracked() { + MapSharedShreddingColumnAllocator allocator = new MapSharedShreddingColumnAllocator(3); + + allocator.allocateRow(Arrays.asList(1, 2)); + allocator.allocateRow(Arrays.asList(1, 2, 3, 4, 5)); + allocator.allocateRow(Arrays.asList(1)); + + assertThat(allocator.maxRowWidth()).isEqualTo(5); + } + + @Test + void testFieldToColumnsAccumulated() { + MapSharedShreddingColumnAllocator allocator = new MapSharedShreddingColumnAllocator(3); + + allocator.allocateRow(Arrays.asList(10, 20, 30)); + allocator.allocateRow(Arrays.asList(20, 40)); + + Map> fieldToColumns = allocator.fieldToColumns(); + assertThat(fieldToColumns.get(10)).containsExactly(0); + assertThat(fieldToColumns.get(20)).containsExactly(0, 1); + assertThat(fieldToColumns.get(30)).containsExactly(2); + assertThat(fieldToColumns.get(40)).containsExactly(1); + assertThatThrownBy(() -> fieldToColumns.put(50, Arrays.asList(2))) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> fieldToColumns.get(20).add(2)) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + void testOverflowFieldSetAccumulated() { + MapSharedShreddingColumnAllocator allocator = new MapSharedShreddingColumnAllocator(2); + + allocator.allocateRow(Arrays.asList(1, 2, 3)); + allocator.allocateRow(Arrays.asList(4, 5, 6, 7)); + + assertThat(allocator.overflowFieldSet()).isEqualTo(new TreeSet<>(Arrays.asList(3, 6, 7))); + assertThatThrownBy(() -> allocator.overflowFieldSet().add(8)) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + void testGetNumColumns() { + MapSharedShreddingColumnAllocator allocator = new MapSharedShreddingColumnAllocator(5); + + assertThat(allocator.numColumns()).isEqualTo(5); + } + + @Test + void testSingleColumnAllocator() { + MapSharedShreddingColumnAllocator allocator = new MapSharedShreddingColumnAllocator(1); + + MapSharedShreddingColumnAllocator.RowAllocation allocation = + allocator.allocateRow(Arrays.asList(10, 20, 30)); + + assertThat(allocation.colToField()).containsExactly(10); + assertThat(allocation.overflowFields()).containsExactly(20, 30); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingContextTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingContextTest.java new file mode 100644 index 000000000000..e4ec16bd519c --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingContextTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link MapSharedShreddingContext}. */ +class MapSharedShreddingContextTest { + + @Test + void testFirstFileUsesKMax() { + MapSharedShreddingContext context = new MapSharedShreddingContext(columns("tags", 256)); + + assertThat(context.computeNextK()).containsEntry("tags", 256); + } + + @Test + void testAdaptKAfterOneFile() { + MapSharedShreddingContext context = new MapSharedShreddingContext(columns("tags", 256)); + + context.reportFileStats("tags", 7); + + assertThat(context.computeNextK()).containsEntry("tags", 7); + } + + @Test + void testAdaptKCappedByKMax() { + MapSharedShreddingContext context = new MapSharedShreddingContext(columns("tags", 10)); + + context.reportFileStats("tags", 20); + + assertThat(context.computeNextK()).containsEntry("tags", 10); + } + + @Test + void testWindowP90UsesMaxWhenSamplesAreClose() { + MapSharedShreddingContext context = new MapSharedShreddingContext(columns("tags", 256)); + + context.reportFileStats("tags", 3); + context.reportFileStats("tags", 7); + context.reportFileStats("tags", 5); + + assertThat(context.computeNextK()).containsEntry("tags", 7); + } + + @Test + void testWindowP90IgnoresSingleFarOutlier() { + MapSharedShreddingContext context = new MapSharedShreddingContext(columns("tags", 256)); + + for (int i = 0; i < 19; i++) { + context.reportFileStats("tags", 3); + } + context.reportFileStats("tags", 1000); + + assertThat(context.computeNextK()).containsEntry("tags", 3); + } + + @Test + void testWindowP90UsesMaxWithinAbsoluteSlack() { + MapSharedShreddingContext context = new MapSharedShreddingContext(columns("tags", 256)); + + for (int i = 0; i < 19; i++) { + context.reportFileStats("tags", 3); + } + context.reportFileStats("tags", 7); + + assertThat(context.computeNextK()).containsEntry("tags", 7); + } + + @Test + void testWindowP90UsesMaxWithinRelativeSlack() { + MapSharedShreddingContext context = new MapSharedShreddingContext(columns("tags", 256)); + + for (int i = 0; i < 19; i++) { + context.reportFileStats("tags", 100); + } + context.reportFileStats("tags", 125); + + assertThat(context.computeNextK()).containsEntry("tags", 125); + } + + @Test + void testWindowP90IgnoresMaxBeyondBothSlacks() { + MapSharedShreddingContext context = new MapSharedShreddingContext(columns("tags", 256)); + + for (int i = 0; i < 19; i++) { + context.reportFileStats("tags", 100); + } + context.reportFileStats("tags", 130); + + assertThat(context.computeNextK()).containsEntry("tags", 100); + } + + @Test + void testMultipleColumnsIndependent() { + Map columns = columns("tags", 256); + columns.put("attrs", 128); + MapSharedShreddingContext context = new MapSharedShreddingContext(columns); + + context.reportFileStats("tags", 10); + context.reportFileStats("attrs", 5); + + assertThat(context.computeNextK()).containsEntry("tags", 10).containsEntry("attrs", 5); + } + + @Test + void testGetShreddingColumnNamesSorted() { + Map columns = columns("tags", 256); + columns.put("metrics", 64); + columns.put("props", 128); + MapSharedShreddingContext context = new MapSharedShreddingContext(columns); + + assertThat(context.getShreddingColumnNames()).containsExactly("metrics", "props", "tags"); + assertThatThrownBy(() -> context.getShreddingColumnNames().add("extra")) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + void testSlidingWindowEvictsOldEntries() { + MapSharedShreddingContext context = new MapSharedShreddingContext(columns("tags", 256)); + + context.reportFileStats("tags", 104); + for (int i = 0; i < 19; i++) { + context.reportFileStats("tags", 100); + } + assertThat(context.computeNextK()).containsEntry("tags", 104); + + context.reportFileStats("tags", 100); + + assertThat(context.computeNextK()).containsEntry("tags", 100); + } + + private static Map columns(String name, int maxColumns) { + Map columns = new LinkedHashMap<>(); + columns.put(name, maxColumns); + return columns; + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingFieldDictTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingFieldDictTest.java new file mode 100644 index 000000000000..1bfd58440570 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingFieldDictTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link MapSharedShreddingFieldDict}. */ +class MapSharedShreddingFieldDictTest { + + @Test + void testAssignMonotonicallyIncreasingIds() { + MapSharedShreddingFieldDict dict = new MapSharedShreddingFieldDict(); + + assertThat(dict.getOrAssign("cpu_usage")).isEqualTo(0); + assertThat(dict.getOrAssign("mem_load")).isEqualTo(1); + assertThat(dict.getOrAssign("disk_io")).isEqualTo(2); + assertThat(dict.size()).isEqualTo(3); + } + + @Test + void testLookupReturnsExistingId() { + MapSharedShreddingFieldDict dict = new MapSharedShreddingFieldDict(); + + int first = dict.getOrAssign("cpu_usage"); + + assertThat(dict.getOrAssign("cpu_usage")).isEqualTo(first); + assertThat(dict.size()).isEqualTo(1); + } + + @Test + void testGetNameToId() { + MapSharedShreddingFieldDict dict = new MapSharedShreddingFieldDict(); + + dict.getOrAssign("b_field"); + dict.getOrAssign("a_field"); + + Map nameToId = dict.nameToId(); + assertThat(nameToId.keySet()).containsExactly("a_field", "b_field"); + assertThat(nameToId.get("b_field")).isEqualTo(0); + assertThat(nameToId.get("a_field")).isEqualTo(1); + assertThatThrownBy(() -> nameToId.put("c_field", 2)) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + void testEmptyDict() { + MapSharedShreddingFieldDict dict = new MapSharedShreddingFieldDict(); + + assertThat(dict.size()).isZero(); + assertThat(dict.nameToId()).isEmpty(); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingRowConverterTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingRowConverterTest.java new file mode 100644 index 000000000000..3fed33f722fb --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingRowConverterTest.java @@ -0,0 +1,699 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link MapSharedShreddingRowConverter}. */ +class MapSharedShreddingRowConverterTest { + + @Test + void testBasicConversion() { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD( + 1, "tags", DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()))); + MapSharedShreddingRowConverter converter = + new MapSharedShreddingRowConverter(logicalType, columns("tags", 3)); + assertThat(converter.shreddingFieldNames()).containsExactly("tags"); + assertThatThrownBy(() -> converter.shreddingFieldNames().add("metrics")) + .isInstanceOf(UnsupportedOperationException.class); + + InternalRow first = converter.convert(GenericRow.of(100, stringKeyMap("a", 1L, "b", 2L))); + InternalRow firstTags = first.getRow(1, 5); + + // Target physical row: + // id = 100 + // tags = [__field_mapping=[a, b, empty], __col_0=1, __col_1=2, __col_2=null, + // __overflow=null] + assertThat(first.getInt(0)).isEqualTo(100); + assertThat(firstTags.getArray(0).toIntArray()).containsExactly(0, 1, -1); + assertThat(firstTags.getLong(1)).isEqualTo(1L); + assertThat(firstTags.getLong(2)).isEqualTo(2L); + assertThat(firstTags.isNullAt(3)).isTrue(); + assertThat(firstTags.isNullAt(4)).isTrue(); + + InternalRow second = + converter.convert(GenericRow.of(200, stringKeyMap("b", 3L, "c", 4L, "a", 5L))); + InternalRow secondTags = second.getRow(1, 5); + + // Target physical row: + // id = 200 + // tags = [__field_mapping=[b, c, a], __col_0=3, __col_1=4, __col_2=5, + // __overflow=null] + assertThat(second.getInt(0)).isEqualTo(200); + assertThat(secondTags.getArray(0).toIntArray()).containsExactly(1, 2, 0); + assertThat(secondTags.getLong(1)).isEqualTo(3L); + assertThat(secondTags.getLong(2)).isEqualTo(4L); + assertThat(secondTags.getLong(3)).isEqualTo(5L); + assertThat(secondTags.isNullAt(4)).isTrue(); + + assertThat(converter.buildFieldMeta("tags")) + .isEqualTo( + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Arrays.asList(0, 2), + 1, Arrays.asList(0, 1), + 2, Collections.singletonList(1)), + new TreeSet(), + 3, + 3)); + } + + @Test + void testBasicMapWithNullValue() { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD( + 0, + "metrics", + DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()))); + MapSharedShreddingRowConverter converter = + new MapSharedShreddingRowConverter(logicalType, columns("metrics", 2)); + + InternalRow row = converter.convert(GenericRow.of(stringKeyMap("a", null, "b", 20L))); + InternalRow metrics = row.getRow(0, 4); + + // Target physical row: + // metrics = [__field_mapping=[a, b], __col_0=null, __col_1=20, + // __overflow=null] + assertThat(metrics.getArray(0).toIntArray()).containsExactly(0, 1); + assertThat(metrics.isNullAt(1)).isTrue(); + assertThat(metrics.getLong(2)).isEqualTo(20L); + assertThat(metrics.isNullAt(3)).isTrue(); + assertThat(converter.buildFieldMeta("metrics")) + .isEqualTo( + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1)), + new TreeSet(), + 2, + 2)); + } + + @Test + void testOverflowWhenExceedK() { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD( + 0, + "metrics", + DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()))); + MapSharedShreddingRowConverter converter = + new MapSharedShreddingRowConverter(logicalType, columns("metrics", 2)); + + InternalRow row = + converter.convert(GenericRow.of(stringKeyMap("a", 10L, "b", 20L, "c", 30L))); + InternalRow metrics = row.getRow(0, 4); + + // Target physical row: + // metrics = [__field_mapping=[a, b], __col_0=10, __col_1=20, + // __overflow={c:30}] + assertThat(metrics.getArray(0).toIntArray()).containsExactly(0, 1); + assertThat(metrics.getLong(1)).isEqualTo(10L); + assertThat(metrics.getLong(2)).isEqualTo(20L); + assertThat(metrics.getMap(3)).isEqualTo(intKeyMap(2, 30L)); + assertThat(converter.buildFieldMeta("metrics")) + .isEqualTo( + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1)), + new TreeSet<>(Collections.singletonList(2)), + 2, + 3)); + } + + @Test + void testEmptyAndNullMaps() { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD( + 0, + "metrics", + DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()))); + MapSharedShreddingRowConverter converter = + new MapSharedShreddingRowConverter(logicalType, columns("metrics", 2)); + + InternalRow nullRow = converter.convert(GenericRow.of((InternalMap) null)); + assertThat(nullRow.isNullAt(0)).isTrue(); + + InternalRow emptyRow = converter.convert(GenericRow.of(stringKeyMap())); + InternalRow metrics = emptyRow.getRow(0, 4); + + // Target physical row for an empty map: + // metrics = [__field_mapping=[empty, empty], __col_0=null, __col_1=null, + // __overflow=null] + assertThat(metrics.getArray(0).toIntArray()).containsExactly(-1, -1); + assertThat(metrics.isNullAt(1)).isTrue(); + assertThat(metrics.isNullAt(2)).isTrue(); + assertThat(metrics.isNullAt(3)).isTrue(); + + assertThat(converter.buildFieldMeta("metrics")) + .isEqualTo( + new MapSharedShreddingFieldMeta( + new TreeMap(), + new TreeMap>(), + new TreeSet(), + 2, + 0)); + } + + @Test + void testNestedValueStruct() { + RowType valueType = + DataTypes.ROW( + DataTypes.FIELD(0, "x", DataTypes.INT()), + DataTypes.FIELD(1, "y", DataTypes.DOUBLE())); + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "tags", DataTypes.MAP(DataTypes.STRING(), valueType))); + MapSharedShreddingRowConverter converter = + new MapSharedShreddingRowConverter(logicalType, columns("tags", 2)); + + InternalRow row = + converter.convert( + GenericRow.of( + stringKeyMap( + "a", GenericRow.of(1, 1.1D), + "b", GenericRow.of(2, 2.2D), + "c", GenericRow.of(3, 3.3D)))); + InternalRow tags = row.getRow(0, 4); + + // Target physical row: + // tags = [__field_mapping=[a, b], __col_0={x=1,y=1.1}, __col_1={x=2,y=2.2}, + // __overflow={c:{x=3,y=3.3}}] + assertThat(tags.getArray(0).toIntArray()).containsExactly(0, 1); + assertThat(tags.getRow(1, 2)).isEqualTo(GenericRow.of(1, 1.1D)); + assertThat(tags.getRow(2, 2)).isEqualTo(GenericRow.of(2, 2.2D)); + assertThat(tags.getMap(3)).isEqualTo(intKeyMap(2, GenericRow.of(3, 3.3D))); + assertThat(converter.buildFieldMeta("tags")) + .isEqualTo( + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1)), + new TreeSet<>(Collections.singletonList(2)), + 2, + 3)); + } + + @Test + void testNestedValueList() { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD( + 1, + "tags", + DataTypes.MAP( + DataTypes.STRING(), DataTypes.ARRAY(DataTypes.INT())))); + MapSharedShreddingRowConverter converter = + new MapSharedShreddingRowConverter(logicalType, columns("tags", 2)); + + InternalRow first = + converter.convert( + GenericRow.of(1, stringKeyMap("a", array(1, null, 2), "b", array(3)))); + InternalRow firstTags = first.getRow(1, 4); + + // Target physical row: + // id = 1 + // tags = [__field_mapping=[a, b], __col_0=[1,null,2], __col_1=[3], + // __overflow=null] + assertThat(firstTags.getArray(0).toIntArray()).containsExactly(0, 1); + assertThat(firstTags.getArray(1)).isEqualTo(array(1, null, 2)); + assertThat(firstTags.getArray(2)).isEqualTo(array(3)); + assertThat(firstTags.isNullAt(3)).isTrue(); + + InternalRow second = + converter.convert(GenericRow.of(2, stringKeyMap("a", array((Object) null)))); + InternalRow secondTags = second.getRow(1, 4); + + // Target physical row: + // id = 2 + // tags = [__field_mapping=[a, empty], __col_0=[null], __col_1=null, + // __overflow=null] + assertThat(secondTags.getArray(0).toIntArray()).containsExactly(0, -1); + assertThat(secondTags.getArray(1)).isEqualTo(array((Object) null)); + assertThat(secondTags.isNullAt(2)).isTrue(); + assertThat(secondTags.isNullAt(3)).isTrue(); + + InternalRow third = converter.convert(GenericRow.of(3, stringKeyMap("c", array(5, 6, 7)))); + InternalRow thirdTags = third.getRow(1, 4); + + // Target physical row: + // id = 3 + // tags = [__field_mapping=[c, empty], __col_0=[5,6,7], __col_1=null, + // __overflow=null] + assertThat(thirdTags.getArray(0).toIntArray()).containsExactly(2, -1); + assertThat(thirdTags.getArray(1)).isEqualTo(array(5, 6, 7)); + assertThat(thirdTags.isNullAt(2)).isTrue(); + assertThat(thirdTags.isNullAt(3)).isTrue(); + + InternalRow fourth = + converter.convert( + GenericRow.of( + 4, + stringKeyMap( + "b", array(8), + "a", array(9, 10), + "c", array((Object) null)))); + InternalRow fourthTags = fourth.getRow(1, 4); + + // Target physical row: + // id = 4 + // tags = [__field_mapping=[b, a], __col_0=[8], __col_1=[9,10], + // __overflow={c:[null]}] + assertThat(fourthTags.getArray(0).toIntArray()).containsExactly(1, 0); + assertThat(fourthTags.getArray(1)).isEqualTo(array(8)); + assertThat(fourthTags.getArray(2)).isEqualTo(array(9, 10)); + assertThat(fourthTags.getMap(3)).isEqualTo(intKeyMap(2, array((Object) null))); + + assertThat(converter.buildFieldMeta("tags")) + .isEqualTo( + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Arrays.asList(0, 1), + 1, Arrays.asList(0, 1), + 2, Collections.singletonList(0)), + new TreeSet<>(Collections.singletonList(2)), + 2, + 3)); + } + + @Test + void testNestedValueMap() { + MapType innerMapType = DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()); + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD( + 1, "nested", DataTypes.MAP(DataTypes.STRING(), innerMapType))); + MapSharedShreddingRowConverter converter = + new MapSharedShreddingRowConverter(logicalType, columns("nested", 2)); + + InternalRow first = + converter.convert( + GenericRow.of( + 1, + stringKeyMap( + "a", + stringKeyMap("x", 1, "y", null), + "b", + stringKeyMap("z", 3)))); + InternalRow firstNested = first.getRow(1, 4); + + // Target physical row: + // id = 1 + // nested = [__field_mapping=[a, b], __col_0={x:1,y:null}, __col_1={z:3}, + // __overflow=null] + assertThat(firstNested.getArray(0).toIntArray()).containsExactly(0, 1); + assertThat(firstNested.getMap(1)).isEqualTo(stringKeyMap("x", 1, "y", null)); + assertThat(firstNested.getMap(2)).isEqualTo(stringKeyMap("z", 3)); + assertThat(firstNested.isNullAt(3)).isTrue(); + + InternalRow second = + converter.convert(GenericRow.of(2, stringKeyMap("c", stringKeyMap("p", null)))); + InternalRow secondNested = second.getRow(1, 4); + + // Target physical row: + // id = 2 + // nested = [__field_mapping=[c, empty], __col_0={p:null}, __col_1=null, + // __overflow=null] + assertThat(secondNested.getArray(0).toIntArray()).containsExactly(2, -1); + assertThat(secondNested.getMap(1)).isEqualTo(stringKeyMap("p", null)); + assertThat(secondNested.isNullAt(2)).isTrue(); + assertThat(secondNested.isNullAt(3)).isTrue(); + + InternalRow nullRow = converter.convert(GenericRow.of(3, null)); + assertThat(nullRow.isNullAt(1)).isTrue(); + + InternalRow fourth = + converter.convert( + GenericRow.of( + 4, + stringKeyMap( + "a", + stringKeyMap("m", 7), + "b", + stringKeyMap("n", 8), + "c", + stringKeyMap("o", 9)))); + InternalRow fourthNested = fourth.getRow(1, 4); + + // Target physical row: + // id = 4 + // nested = [__field_mapping=[a, b], __col_0={m:7}, __col_1={n:8}, + // __overflow={c:{o:9}}] + assertThat(fourthNested.getArray(0).toIntArray()).containsExactly(0, 1); + assertThat(fourthNested.getMap(1)).isEqualTo(stringKeyMap("m", 7)); + assertThat(fourthNested.getMap(2)).isEqualTo(stringKeyMap("n", 8)); + assertThat(fourthNested.getMap(3)).isEqualTo(intKeyMap(2, stringKeyMap("o", 9))); + + assertThat(converter.buildFieldMeta("nested")) + .isEqualTo( + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1), + 2, Collections.singletonList(0)), + new TreeSet<>(Collections.singletonList(2)), + 2, + 3)); + } + + @Test + void testNestedComplex() { + RowType valueType = + DataTypes.ROW( + DataTypes.FIELD(0, "score", DataTypes.INT()), + DataTypes.FIELD(1, "tags", DataTypes.ARRAY(DataTypes.STRING())), + DataTypes.FIELD( + 2, "meta", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))); + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD(1, "data", DataTypes.MAP(DataTypes.STRING(), valueType))); + MapSharedShreddingRowConverter converter = + new MapSharedShreddingRowConverter(logicalType, columns("data", 2)); + + InternalRow first = + converter.convert( + GenericRow.of( + 1, + stringKeyMap( + "a", + GenericRow.of( + 10, stringArray("t1", "t2"), stringKeyMap("x", 1)), + "b", + GenericRow.of( + 20, + stringArray("t3"), + stringKeyMap("y", 2, "z", 3))))); + InternalRow firstData = first.getRow(1, 4); + + // Target physical row: + // id = 1 + // data = [__field_mapping=[a, b], + // __col_0={score=10,tags=[t1,t2],meta={x:1}}, + // __col_1={score=20,tags=[t3],meta={y:2,z:3}}, __overflow=null] + assertThat(firstData.getArray(0).toIntArray()).containsExactly(0, 1); + assertThat(firstData.getRow(1, 3)) + .isEqualTo(GenericRow.of(10, stringArray("t1", "t2"), stringKeyMap("x", 1))); + assertThat(firstData.getRow(2, 3)) + .isEqualTo(GenericRow.of(20, stringArray("t3"), stringKeyMap("y", 2, "z", 3))); + assertThat(firstData.isNullAt(3)).isTrue(); + + InternalRow second = + converter.convert( + GenericRow.of( + 2, + stringKeyMap( + "c", GenericRow.of(null, null, stringKeyMap("p", null))))); + InternalRow secondData = second.getRow(1, 4); + + // Target physical row: + // id = 2 + // data = [__field_mapping=[c, empty], + // __col_0={score=null,tags=null,meta={p:null}}, __col_1=null, + // __overflow=null] + assertThat(secondData.getArray(0).toIntArray()).containsExactly(2, -1); + assertThat(secondData.getRow(1, 3)) + .isEqualTo(GenericRow.of(null, null, stringKeyMap("p", null))); + assertThat(secondData.isNullAt(2)).isTrue(); + assertThat(secondData.isNullAt(3)).isTrue(); + + InternalRow third = + converter.convert( + GenericRow.of( + 3, + stringKeyMap( + "a", + GenericRow.of(30, stringArray(null, "t4"), stringKeyMap()), + "b", + GenericRow.of(null, stringArray(), stringKeyMap("q", 5)), + "c", + GenericRow.of( + 40, stringArray("t5"), stringKeyMap("r", 6))))); + InternalRow thirdData = third.getRow(1, 4); + + // Target physical row: + // id = 3 + // data = [__field_mapping=[a, b], + // __col_0={score=30,tags=[null,t4],meta={}}, + // __col_1={score=null,tags=[],meta={q:5}}, + // __overflow={c:{score=40,tags=[t5],meta={r:6}}}] + assertThat(thirdData.getArray(0).toIntArray()).containsExactly(0, 1); + assertThat(thirdData.getRow(1, 3)) + .isEqualTo(GenericRow.of(30, stringArray(null, "t4"), stringKeyMap())); + assertThat(thirdData.getRow(2, 3)) + .isEqualTo(GenericRow.of(null, stringArray(), stringKeyMap("q", 5))); + assertThat(thirdData.getMap(3)) + .isEqualTo( + intKeyMap(2, GenericRow.of(40, stringArray("t5"), stringKeyMap("r", 6)))); + + InternalRow nullRow = converter.convert(GenericRow.of(4, null)); + assertThat(nullRow.isNullAt(1)).isTrue(); + + assertThat(converter.buildFieldMeta("data")) + .isEqualTo( + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1), + 2, Collections.singletonList(0)), + new TreeSet<>(Collections.singletonList(2)), + 2, + 3)); + } + + @Test + void testMultipleMapFields() { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD( + 1, "tags", DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT())), + DataTypes.FIELD( + 2, "attrs", DataTypes.MAP(DataTypes.STRING(), DataTypes.DOUBLE()))); + Map fieldToNumColumns = new HashMap<>(); + fieldToNumColumns.put("tags", 2); + fieldToNumColumns.put("attrs", 3); + MapSharedShreddingRowConverter converter = + new MapSharedShreddingRowConverter(logicalType, fieldToNumColumns); + + InternalRow first = + converter.convert( + GenericRow.of( + 1, + stringKeyMap("a", 10L, "b", 20L), + stringKeyMap("x", 1.1D, "y", 2.2D))); + InternalRow firstTags = first.getRow(1, 4); + InternalRow firstAttrs = first.getRow(2, 5); + + // Target physical row: + // id = 1 + // tags = [__field_mapping=[a, b], __col_0=10, __col_1=20, __overflow=null] + // attrs = [__field_mapping=[x, y, empty], __col_0=1.1, __col_1=2.2, + // __col_2=null, __overflow=null] + assertThat(firstTags.getArray(0).toIntArray()).containsExactly(0, 1); + assertThat(firstTags.getLong(1)).isEqualTo(10L); + assertThat(firstTags.getLong(2)).isEqualTo(20L); + assertThat(firstTags.isNullAt(3)).isTrue(); + assertThat(firstAttrs.getArray(0).toIntArray()).containsExactly(0, 1, -1); + assertThat(firstAttrs.getDouble(1)).isEqualTo(1.1D); + assertThat(firstAttrs.getDouble(2)).isEqualTo(2.2D); + assertThat(firstAttrs.isNullAt(3)).isTrue(); + assertThat(firstAttrs.isNullAt(4)).isTrue(); + + InternalRow second = + converter.convert( + GenericRow.of( + 2, + stringKeyMap("c", 30L, "a", 40L, "b", 50L), + stringKeyMap("z", 3.3D))); + InternalRow secondTags = second.getRow(1, 4); + InternalRow secondAttrs = second.getRow(2, 5); + + // Target physical row: + // id = 2 + // tags = [__field_mapping=[c, a], __col_0=30, __col_1=40, __overflow={b:50}] + // attrs = [__field_mapping=[z, empty, empty], __col_0=3.3, __col_1=null, + // __col_2=null, __overflow=null] + assertThat(secondTags.getArray(0).toIntArray()).containsExactly(2, 0); + assertThat(secondTags.getLong(1)).isEqualTo(30L); + assertThat(secondTags.getLong(2)).isEqualTo(40L); + assertThat(secondTags.getMap(3)).isEqualTo(intKeyMap(1, 50L)); + assertThat(secondAttrs.getArray(0).toIntArray()).containsExactly(2, -1, -1); + assertThat(secondAttrs.getDouble(1)).isEqualTo(3.3D); + assertThat(secondAttrs.isNullAt(2)).isTrue(); + assertThat(secondAttrs.isNullAt(3)).isTrue(); + assertThat(secondAttrs.isNullAt(4)).isTrue(); + + InternalRow third = + converter.convert( + GenericRow.of( + 3, null, stringKeyMap("x", 4.4D, "y", 5.5D, "z", 6.6D, "w", 7.7D))); + InternalRow thirdAttrs = third.getRow(2, 5); + + // Target physical row: + // id = 3 + // tags = null + // attrs = [__field_mapping=[x, y, z], __col_0=4.4, __col_1=5.5, __col_2=6.6, + // __overflow={w:7.7}] + assertThat(third.isNullAt(1)).isTrue(); + assertThat(thirdAttrs.getArray(0).toIntArray()).containsExactly(0, 1, 2); + assertThat(thirdAttrs.getDouble(1)).isEqualTo(4.4D); + assertThat(thirdAttrs.getDouble(2)).isEqualTo(5.5D); + assertThat(thirdAttrs.getDouble(3)).isEqualTo(6.6D); + assertThat(thirdAttrs.getMap(4)).isEqualTo(intKeyMap(3, 7.7D)); + + assertThat(converter.shreddingFieldNames()).containsExactly("tags", "attrs"); + assertThat(converter.buildFieldMeta("tags")) + .isEqualTo( + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Arrays.asList(0, 1), + 1, Collections.singletonList(1), + 2, Collections.singletonList(0)), + new TreeSet<>(Collections.singletonList(1)), + 2, + 3)); + assertThat(converter.buildFieldMeta("attrs")) + .isEqualTo( + new MapSharedShreddingFieldMeta( + nameToId("w", 3, "x", 0, "y", 1, "z", 2), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1), + 2, Arrays.asList(0, 2)), + new TreeSet<>(Collections.singletonList(3)), + 3, + 4)); + } + + @Test + void testBuildFieldMetaInvalidFieldName() { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD( + 1, "tags", DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()))); + MapSharedShreddingRowConverter converter = + new MapSharedShreddingRowConverter(logicalType, columns("tags", 3)); + + assertThat(converter.buildFieldMeta("tags")) + .isEqualTo( + new MapSharedShreddingFieldMeta( + new TreeMap(), + new TreeMap>(), + new TreeSet(), + 3, + 0)); + assertThatThrownBy(() -> converter.buildFieldMeta("id")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("id"); + assertThatThrownBy(() -> converter.buildFieldMeta("nonexistent")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("nonexistent"); + } + + private static Map columns(String fieldName, int numColumns) { + Map columns = new HashMap<>(); + columns.put(fieldName, numColumns); + return columns; + } + + private static GenericMap stringKeyMap(Object... keyValues) { + Map values = new LinkedHashMap<>(); + for (int i = 0; i < keyValues.length; i += 2) { + values.put(BinaryString.fromString((String) keyValues[i]), keyValues[i + 1]); + } + return new GenericMap(values); + } + + private static GenericArray array(Object... values) { + return new GenericArray(values); + } + + private static GenericArray stringArray(String... values) { + Object[] internalValues = new Object[values.length]; + for (int i = 0; i < values.length; i++) { + internalValues[i] = values[i] == null ? null : BinaryString.fromString(values[i]); + } + return new GenericArray(internalValues); + } + + private static GenericMap intKeyMap(Object... keyValues) { + Map values = new LinkedHashMap<>(); + for (int i = 0; i < keyValues.length; i += 2) { + values.put(keyValues[i], keyValues[i + 1]); + } + return new GenericMap(values); + } + + private static Map nameToId(Object... keyValues) { + Map nameToId = new TreeMap<>(); + for (int i = 0; i < keyValues.length; i += 2) { + nameToId.put((String) keyValues[i], (Integer) keyValues[i + 1]); + } + return nameToId; + } + + @SuppressWarnings("unchecked") + private static Map> fieldToColumns(Object... keyValues) { + Map> fieldToColumns = new TreeMap<>(); + for (int i = 0; i < keyValues.length; i += 2) { + fieldToColumns.put((Integer) keyValues[i], (List) keyValues[i + 1]); + } + return fieldToColumns; + } +}