Parse-once for JSON-index ingestion: cache the parsed Map and flatten it directly#18756
Parse-once for JSON-index ingestion: cache the parsed Map and flatten it directly#18756xiangfu0 wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR optimizes realtime ingestion for JSON columns with JSON-family indexes by avoiding a per-row JSON serialize → re-parse round-trip, reusing the already-parsed representation and flattening it directly for indexing.
Changes:
- Add
JsonUtils.flattenParsed(...)and a native-type check to flatten parsed JSON values (Map/List/JsonNode) without string tokenization, with a fallback to the existing string path for non-native leaf types. - Introduce a transient per-row parsed JSON cache on
GenericRow, populate it inDataTypeTransformerfor JSON-indexed JSON columns, and feed it to JSON mutable indexes whensupportsParsedValue()is enabled. - Extend
MutableJsonIndexwith additive SPI defaults (addParsed,supportsParsedValue) and implement the parsed path inMutableJsonIndexImpl, with accompanying unit tests and a JMH benchmark.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java | Adds flattenParsed and native-type detection to avoid re-tokenizing JSON strings during indexing. |
| pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java | Adds coverage ensuring flattenParsed matches the legacy serialize+reparse behavior. |
| pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java | Adds transient per-row parsed JSON cache accessors and clears it on clear(). |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableJsonIndex.java | Adds additive SPI hooks for parsed JSON ingestion and capability gating. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java | Computes which JSON columns should cache parsed values and populates the cache during transform. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java | Feeds cached parsed JSON to mutable JSON indexes when supported. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java | Implements addParsed + supportsParsedValue using flattenParsed. |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java | Adds test asserting parsed-vs-string indexing produces identical matches. |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformerTest.java | Adds test verifying caching is enabled only when JSON index is configured. |
| pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonFlatten.java | Adds a JMH benchmark comparing flatten-from-string vs flatten-from-parsed-map costs. |
| if (!_jsonCacheColumns.isEmpty() && _jsonCacheColumns.contains(column) | ||
| && (value instanceof Map || value instanceof List)) { |
| default void add(Object value, int dictId, int docId) { | ||
| try { | ||
| if (value instanceof Map) { | ||
| add(JsonUtils.objectToString(value)); | ||
| if (value instanceof Map || value instanceof List) { | ||
| // Already-parsed JSON value (e.g. a Map cached on the GenericRow before it was serialized for the forward | ||
| // index): flatten it directly, avoiding the serialize-then-reparse round-trip. | ||
| addParsed(value); | ||
| } else { | ||
| // String (the common case) or, for any other unexpected type, fail fast with a ClassCastException as before. | ||
| add((String) value); | ||
| } |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18756 +/- ##
============================================
- Coverage 64.80% 57.02% -7.78%
+ Complexity 1322 7 -1315
============================================
Files 3393 2607 -786
Lines 211332 152809 -58523
Branches 33234 24849 -8385
============================================
- Hits 136952 87143 -49809
+ Misses 63329 58251 -5078
+ Partials 11051 7415 -3636
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
577accd to
8fcae8f
Compare
2837e35 to
0f23508
Compare
Avoid the realtime JSON-index serialize -> re-parse round-trip by caching parsed JSON values on GenericRow for JSON columns with Pinot's built-in json index enabled. DataTypeTransformer records the parsed Map/List/JsonNode while still writing the canonical string used by the forward index. MutableSegmentImpl feeds that cached value directly to MutableJsonIndexImpl.addParsed(), which flattens via JsonUtils.flattenParsed(Object, JsonIndexConfig). The generic MutableJsonIndex SPI is unchanged. flattenParsed preserves the existing string-path output: DecimalNode leaves are normalized the same way the serialized path re-parses them, and unsafe leaf types fall back to serialize+reparse. GenericRow invalidates cached parsed values when the field value changes, including sanitization and row reuse paths. BenchmarkJsonFlatten now covers 330 B, 2.9 KB, and 8 KB payloads. The latest JDK 21 JMH run shows flatten-only gains of 1.87x / 6.44x / 3.26x, and estimated serialize+flatten gains of 1.52x / 2.80x / 2.21x.
0f23508 to
ea3e9ec
Compare
What
A
dataType: JSONcolumn with a JSON index currently pays a serialize -> re-parse round-trip per row:DataTypeTransformerserializes the parsed JSON value to the forward-index string, and the mutable JSON index then re-parses that same string before flattening it.This PR keeps a transient parsed-value cache on the
GenericRowfor JSON columns that have Pinot's built-injsonindex enabled, then feeds that cached value directly toMutableJsonIndexImpl:GenericRowstores a transient per-row parsed JSON cache. It is not part of row value/equality/copy/serialized state and is invalidated when the column value changes.DataTypeTransformerpopulates the cache only fordataType: JSONcolumns with the built-in JSON index enabled.MutableSegmentImplpasses the cached parsed value toMutableJsonIndexImpl.addParsed(...)when present; otherwise indexing uses the existing string path.JsonUtils.flattenParsed(Object, JsonIndexConfig)flattens a parsedMap/List/JsonNodewhile preserving the old string-path output.BenchmarkJsonFlattennow runs the same benchmark over multiple payload sizes (330,2900,8000bytes).User Manual / Sample Config
No new table config is introduced. Existing realtime tables with JSON columns and Pinot's built-in JSON index benefit automatically.
{ "tableName": "logs_REALTIME", "tableType": "REALTIME", "indexingConfig": { "jsonIndexColumns": ["payload"] } }The optimization is intentionally scoped to the built-in
jsonindex implementation. Plugin/custom JSON-like indexes continue to receive the serialized string path.Behavior-preserving
flattenParsedis tested to produce the same flattened records asflatten(String, JsonIndexConfig)on the serialized form, including nested objects/arrays, top-level arrays,JsonNode,BigDecimal,BigInteger, floats, nulls, max levels, and JSON index matching.flattenParsedfalls back to serialize+reparse so the JSON index output remains identical to today's behavior.IndexingFailureTest#testJsonIndexUsesParsedCachecovers the segment-level path where a cachedJsonNodeis indexed viaMutableSegmentImpl.Performance (
BenchmarkJsonFlatten)Validated with the checked-in JMH benchmark on JDK 21:
The benchmark isolates JSON flatten/index-input cost and separately measures
serializeMap, which still remains because the forward index stores the serialized JSON string. The last column combinesserializeMapwith flattening as a harmonic throughput estimate for the full Map-input path.The local machine was noisy, but the payload-size trend is consistent: avoiding the string re-tokenization is most valuable as payload size grows.
API Surface
No
MutableJsonIndexSPI change remains after review simplification. The additive public surface is limited to:GenericRowparsed JSON cache accessors.JsonUtils.flattenParsed(Object, JsonIndexConfig).MutableJsonIndexImpl.addParsed(Object)as an implementation method used byMutableSegmentImpl.Validation
GITHUB_ACTIONS=true ./mvnw -pl pinot-spi,pinot-segment-local -am -Dtest=JsonUtilsTest,GenericRowTest,DataTypeTransformerTest,JsonIndexTest,IndexingFailureTest -Dsurefire.failIfNoSpecifiedTests=false testGITHUB_ACTIONS=true ./mvnw -pl pinot-segment-local -am -Dtest=IndexingFailureTest -Dsurefire.failIfNoSpecifiedTests=false testGITHUB_ACTIONS=true ./mvnw spotless:apply -pl pinot-segment-local,pinot-spi,pinot-perfGITHUB_ACTIONS=true ./mvnw checkstyle:check -pl pinot-segment-local,pinot-spi,pinot-perfGITHUB_ACTIONS=true ./mvnw license:format -pl pinot-segment-local,pinot-spi,pinot-perfGITHUB_ACTIONS=true ./mvnw license:check -pl pinot-segment-local,pinot-spi,pinot-perf