Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public static Type createTimestampWithLogicalType(
return Types.primitive(INT64, repetition)
.as(
LogicalTypeAnnotation.timestampType(
isAdjustToUTC, LogicalTypeAnnotation.TimeUnit.MILLIS))
isAdjustToUTC, LogicalTypeAnnotation.TimeUnit.MICROS))
.named(name);
} else if (precision > 6) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition).named(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ private SimpleColStats toTimestampStats(Statistics<?> stats, int precision) {
if (precision <= 3) {
LongStatistics longStats = (LongStatistics) stats;
return new SimpleColStats(
Timestamp.fromEpochMillis(longStats.getMin()),
Timestamp.fromEpochMillis(longStats.getMax()),
Timestamp.fromMicros(longStats.getMin()),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change the extractor decodes TIMESTAMP(0..3) footer stats as micros solely from the Paimon field precision. Existing Paimon Parquet files written before this PR use TIMESTAMP_MILLIS and store min/max in epoch milliseconds, so extracting stats for those files (for example during migrate/clone or any metadata regeneration) would turn 2024-01-01T00:00:00.123 into a 1970 timestamp and write incorrect file stats. Can we derive the unit from stats.type().getLogicalTypeAnnotation() / the column metadata, like the reader does, and keep MILLIS for legacy files?

Timestamp.fromMicros(longStats.getMax()),
stats.getNumNulls());
} else if (precision <= 6) {
LongStatistics longStats = (LongStatistics) stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* Parquet write timestamp precision 0-3 as int64 mills, 4-6 as int64 micros, 7-9 as int96, this
* class wrap the real vector to provide {@link TimestampColumnVector} interface.
* Parquet write timestamp precision 0-6 as int64 micros, 7-9 as int96, this class wrap the real
* vector to provide {@link TimestampColumnVector} interface.
*/
public class ParquetTimestampVector implements TimestampColumnVector {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.TimestampType;

import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.VersionParser.ParsedVersion;
Expand Down Expand Up @@ -115,14 +117,18 @@ public VectorizedColumnReader(
}

private boolean isLazyDecodingSupported(
PrimitiveType.PrimitiveTypeName typeName, ColumnVector columnVector) {
PrimitiveType.PrimitiveTypeName typeName,
DataType dataType,
ColumnVector columnVector) {
boolean isSupported = false;
switch (typeName) {
case INT32:
isSupported = columnVector instanceof IntColumnVector;
break;
case INT64:
isSupported = columnVector instanceof LongColumnVector;
isSupported =
columnVector instanceof LongColumnVector
&& !isLowPrecisionTimestamp(dataType);
break;
case FLOAT:
isSupported = columnVector instanceof FloatColumnVector;
Expand All @@ -139,6 +145,17 @@ private boolean isLazyDecodingSupported(
return isSupported;
}

private static boolean isLowPrecisionTimestamp(DataType dataType) {
switch (dataType.getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
return ((TimestampType) dataType).getPrecision() <= 3;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return ((LocalZonedTimestampType) dataType).getPrecision() <= 3;
default:
return false;
}
}

/** Reads `total` rows from this columnReader into column. */
void readBatch(
int total,
Expand Down Expand Up @@ -198,12 +215,9 @@ void readBatch(
(VectorizedValuesReader) dataColumn);
}

// TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post
// process
// the values to add microseconds precision.
if (column.hasDictionary()
|| (startRowId == pageFirstRowIndex
&& isLazyDecodingSupported(typeName, column))) {
&& isLazyDecodingSupported(typeName, type, column))) {
column.setDictionary(new ParquetDictionary(dictionary));
} else {
updater.decodeDictionaryIds(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void write(InternalArray arrayData, int ordinal) {
}

private void writeTimestamp(Timestamp value) {
recordConsumer.addLong(value.getMillisecond());
recordConsumer.addLong(value.toMicros());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,7 @@ private static Comparable<?> toParquetObject(
} else if (value instanceof Timestamp) {
Timestamp timestamp = (Timestamp) value;
int precision = getTimestampPrecision(type);
if (precision <= 3) {
// milliseconds
return timestamp.getMillisecond();
} else if (precision <= 6) {
// microseconds
if (precision <= 6) {
return timestamp.toMicros();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This predicate literal is now always epoch micros for TIMESTAMP(0..6), but precision 0..3 files written by existing Paimon versions store epoch milliseconds. The filter is created in ParquetFileFormat before ParquetReaderFactory opens each file, so it cannot see whether a particular file schema is TIMESTAMP_MILLIS or TIMESTAMP_MICROS. For old files, ts = 2024-01-01 becomes 1704067200000000 while the row-group stats/data are around 1704067200000, and Parquet can incorrectly drop matching row groups. We should either build the timestamp filter after reading the file schema or disable/avoid this pushdown for legacy low-precision timestamp files.

}
// precision > 6 uses INT96, not supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ public void testInFilterDecimal64Bit() {

@Test
public void testTimestampMillis() {
// precision <= 3 uses milliseconds (INT64)
// precision <= 3 now uses microseconds (MICROS annotation, matching the writer)
int precision = 3;
PredicateBuilder builder =
new PredicateBuilder(
Expand All @@ -456,16 +456,16 @@ public void testTimestampMillis() {
new DataField(0, "ts1", new TimestampType(precision)))));

Timestamp value = Timestamp.fromEpochMillis(1704067200000L); // 2024-01-01 00:00:00
long expectedMillis = value.getMillisecond();
long expectedMicros = value.toMicros();

test(builder.isNull(0), "eq(ts1, null)", true);
test(builder.isNotNull(0), "noteq(ts1, null)", true);
test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true);
test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")", true);
test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")", true);
test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMillis + ")", true);
test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")", true);
test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMillis + ")", true);
test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true);
test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")", true);
test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")", true);
test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMicros + ")", true);
test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")", true);
test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMicros + ")", true);
}

@Test
Expand Down Expand Up @@ -493,7 +493,7 @@ public void testTimestampMicros() {

@Test
public void testLocalZonedTimestampMillis() {
// precision <= 3 uses milliseconds (INT64)
// precision <= 3 now uses microseconds (MICROS annotation, matching the writer)
int precision = 3;
PredicateBuilder builder =
new PredicateBuilder(
Expand All @@ -505,14 +505,14 @@ public void testLocalZonedTimestampMillis() {
new LocalZonedTimestampType(precision)))));

Timestamp value = Timestamp.fromEpochMillis(1704067200000L);
long expectedMillis = value.getMillisecond();
long expectedMicros = value.toMicros();

test(builder.isNull(0), "eq(ts1, null)", true);
test(builder.isNotNull(0), "noteq(ts1, null)", true);
test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true);
test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")", true);
test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")", true);
test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")", true);
test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true);
test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")", true);
test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")", true);
test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")", true);
}

@Test
Expand Down Expand Up @@ -555,22 +555,22 @@ public void testInFilterTimestampMillis() {
test(
builder.in(0, Arrays.asList(v1, v2, v3)),
"or(or(eq(ts1, "
+ v1.getMillisecond()
+ v1.toMicros()
+ "), eq(ts1, "
+ v2.getMillisecond()
+ v2.toMicros()
+ ")), eq(ts1, "
+ v3.getMillisecond()
+ v3.toMicros()
+ "))",
true);

test(
builder.notIn(0, Arrays.asList(v1, v2, v3)),
"and(and(noteq(ts1, "
+ v1.getMillisecond()
+ v1.toMicros()
+ "), noteq(ts1, "
+ v2.getMillisecond()
+ v2.toMicros()
+ ")), noteq(ts1, "
+ v3.getMillisecond()
+ v3.toMicros()
+ "))",
true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,88 @@ public void testReadTimestampNanosWrittenByParquet() throws Exception {
assertThat(count.get()).isEqualTo(nanosValues.length);
}

@Test
public void testReadTimestampMicrosWrittenByParquetForLowPrecision() throws Exception {
// Regression test: after PR #8230, TIMESTAMP(n<=3) columns are written as INT64 with a
// MICROS annotation and epoch-microsecond values. The vectorized reader must decode them
// correctly when the Paimon row type declares precision 3 (not 6).
//
// The fix lives in LongTimestampUpdater.longTimestamp(): it reads the actual Parquet time
// unit via timestampUnit() and normalises MICROS values to epoch-milliseconds before
// storing them in the LongColumnVector, so ParquetTimestampVector.getTimestamp() receives
// epoch-ms and correctly calls Timestamp.fromEpochMillis(). Without timestampUnit() (e.g.
// paimon 1.4.1), the raw epoch-µs would be stored and fromEpochMillis() would return
// year ~58xxx or throw ArithmeticException: Millis overflow.
Path path = new Path(folder.getPath(), UUID.randomUUID().toString());
Configuration conf = new Configuration();
Type timestampMicrosType =
Types.primitive(INT64, Type.Repetition.REQUIRED)
.as(
LogicalTypeAnnotation.timestampType(
false, LogicalTypeAnnotation.TimeUnit.MICROS))
.named("f0")
.withId(0);
Type arrayTimestampMicrosType =
ConversionPatterns.listOfElements(
Type.Repetition.OPTIONAL,
"f1",
Types.primitive(INT64, Type.Repetition.OPTIONAL)
.as(
LogicalTypeAnnotation.timestampType(
false,
LogicalTypeAnnotation.TimeUnit.MICROS))
.named("element")
.withId(2))
.withId(1);
MessageType schema =
new MessageType("origin-parquet", timestampMicrosType, arrayTimestampMicrosType);
long[] microsValues = new long[] {1704067200123000L, -1000000L};

try (ParquetWriter<Group> writer =
ExampleParquetWriter.builder(
HadoopOutputFile.fromPath(
new org.apache.hadoop.fs.Path(path.toString()), conf))
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withConf(new Configuration())
.withType(schema)
.build()) {
SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(schema);
for (long micros : microsValues) {
Group row = simpleGroupFactory.newGroup();
row.append("f0", micros);
Group array = row.addGroup("f1");
array.addGroup(0).add(0, micros);
array.addGroup(0).add(0, micros + 1000L);
writer.write(row);
}
}

RowType paimonRowType =
RowType.builder()
.fields(new TimestampType(3), new ArrayType(new TimestampType(3)))
.build();
ParquetReaderFactory format =
new ParquetReaderFactory(new Options(), paimonRowType, 500, FilterCompat.NOOP);
AtomicInteger count = new AtomicInteger(0);
try (RecordReader<InternalRow> reader =
format.createReader(
new FormatReaderContext(
new LocalFileIO(), path, new LocalFileIO().getFileSize(path)))) {
reader.forEachRemaining(
row -> {
long micros = microsValues[count.get()];
assertThat(row.getTimestamp(0, 3))
.isEqualTo(Timestamp.fromMicros(micros));
assertThat(row.getArray(1).getTimestamp(0, 3))
.isEqualTo(Timestamp.fromMicros(micros));
assertThat(row.getArray(1).getTimestamp(1, 3))
.isEqualTo(Timestamp.fromMicros(micros + 1000L));
count.incrementAndGet();
});
}
assertThat(count.get()).isEqualTo(microsValues.length);
}

private void innerTestTypes(File folder, List<Integer> records, int rowGroupSize)
throws IOException {
List<InternalRow> rows = records.stream().map(this::newRow).collect(Collectors.toList());
Expand Down
Loading
Loading