Support OBJECT data in IoTConsensusV2#17711
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds end-to-end support for TSDataType.OBJECT in the IoTConsensusV2 replication pipeline, including object file piece transfer, follower-side application, path/region-id handling, and focused integration tests to validate replica consistency.
Changes:
- Add OBJECT-aware validation + object-path encoding/decoding utilities (including region-id remapping support).
- Add a new IoTConsensusV2 transfer request type and sink/receiver/handler logic to ship OBJECT file pieces ahead of inserts/TsFiles and apply them on followers.
- Extend tablet + TsFile parsing/decoding flows to recognize
OBJECT, and add IoTConsensusV2 Stream/Batch IT coverage.
Reviewed changes
Copilot reviewed 27 out of 27 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java | Adds OBJECT name validation and OS/path-segment checks for table/column names when configured. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/iotconsensusv2/request/IoTConsensusV2RequestType.java | Introduces a new request type for object-file piece transfer. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java | Enables decoding of OBJECT columns as binary-like values. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java | Adds writeObject(ObjectNode) to materialize object files and trigger metadata writes. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java | Adds device-id validation and tiered-storage restriction checks for OBJECT scenarios. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java | Splits OBJECT payloads into ObjectNodes during partition splitting, updating bitmaps accordingly. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java | Extracts OBJECT piece payloads into ObjectNodes during InsertRows partition split. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java | Adds progress-index plumbing and generates relational insert-row metadata for OBJECT at EOF. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2TabletBinaryReq.java | Allows callers to deserialize a generic plan node (not only InsertNode). |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2ObjectFileUtils.java | Adds utilities to discover OBJECT descriptors from insert-nodes and TsFiles and validate object existence. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2ObjectFilePieceReq.java | Adds the thrift request wrapper for transferring object file pieces using ObjectNode serialization. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java | Sends object-file pieces before insert-node / tsfile transfer in sync mode. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java | Adds async handling for insert-nodes that require object-file piece transfer. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java | Adds object-file piece transfer phase before tsfile/mod transfer in async mode. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2ObjectFileInsertNodeEventHandler.java | Implements async sequencing: object-file pieces then insert-node transfer. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java | Receives/applies object-file piece requests, and supports ObjectNode in tablet-binary flow. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java | Treats OBJECT as a binary-like value when building tablets from TsFiles. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java | Treats OBJECT as a binary-like value in scan-based TsFile parsing. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java | Adds OBJECT to binary-like column handling and null filling. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java | Implements execution of ObjectNode writes on data regions. |
| iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/PlainObjectPath.java | Adds a plain (non-base32) object path implementation. |
| iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/ObjectTypeUtils.java | Updates region-id rewrite logic using Path-based parsing and new object-path types. |
| iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/IObjectPath.java | Extends the interface (time/device/measurement/path accessors) and selects factory/deserializer by config. |
| iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/Base32ObjectPath.java | Adds a base32-encoded object path implementation. |
| integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java | Enables the new OBJECT replica consistency test in stream mode. |
| integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java | Adds the OBJECT replica consistency test logic and helper methods. |
| integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java | Enables the new OBJECT replica consistency test in batch mode. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| final RelationalInsertRowNode insertRowNode = new RelationalInsertRowNode(this.getPlanNodeId()); | ||
| insertRowNode.setAligned(true); | ||
| insertRowNode.setDeviceID(filePath.getDeviceID()); | ||
| insertRowNode.setTargetPath(new PartialPath(filePath.getDeviceID().getTableName())); |
| final IObjectPath newObjectPath; | ||
| if (objectPath instanceof PlainObjectPath) { | ||
| newObjectPath = | ||
| new PlainObjectPath(objectPath.toString().replaceFirst(regionId + "", newRegionId + "")); | ||
| } else { | ||
| final String[] subPath = new String[path.getNameCount() - 1]; | ||
| for (int i = 1; i < path.getNameCount(); i++) { | ||
| subPath[i - 1] = path.getName(i).toString(); | ||
| } | ||
| newObjectPath = new Base32ObjectPath(Paths.get(newRegionId + "", subPath)); |
| final Object[] values = insertRowNode.getValues(); | ||
| if (values[i] == null) { | ||
| continue; | ||
| } | ||
| final byte[] binary = ((Binary) values[i]).getValues(); | ||
| final ByteBuffer buffer = ByteBuffer.wrap(binary); | ||
| final boolean isEOF = buffer.get() == 1; | ||
| final long offset = buffer.getLong(); | ||
| final byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); | ||
| final IObjectPath relativePath = |
| final IoTConsensusV2ObjectFilePieceReq req = new IoTConsensusV2ObjectFilePieceReq(); | ||
|
|
||
| final ByteBuffer body = transferReq.body.duplicate(); | ||
| final PlanNode planNode = WALEntry.deserializeForConsensus(body); |
Caideyipi
left a comment
There was a problem hiding this comment.
I reviewed the IoTConsensusV2 OBJECT replication path and ran mvn -Ddevelocity.off=true -pl iotdb-core/calc-commons,iotdb-core/node-commons,iotdb-core/datanode -DskipTests compile successfully. I left a few correctness comments below.
| setDataRegionReplicaSet(entry.getKey()); | ||
| for (int i = 0; i < columns.length; i++) { | ||
| if (dataTypes[i] == TSDataType.OBJECT) { | ||
| handleObjectValue(i, 0, times.length, entry, result); |
There was a problem hiding this comment.
times.length can be the tablet capacity, not the number of valid rows in this insert. If a reused tablet has stale OBJECT values after rowCount, this loop will generate ObjectNodes for rows that are not part of the current write and may create extra object rows on the follower. Please bound this by rowCount (or the active split range) and add coverage where the backing arrays are larger than the current row count.
| Map.Entry<TRegionReplicaSet, List<Integer>> entry, | ||
| List<WritePlanNode> result) { | ||
| for (int row = startRow; row < endRow; row++) { | ||
| if (((Binary[]) columns[column])[row] == null) { |
There was a problem hiding this comment.
This should honor the null bitmap before reading the Binary slot. For OBJECT columns a null row can still leave a stale value in a reused backing array; this code will transfer that object and only later mark the row null. Please mirror the descriptor collection logic and skip rows where bitMaps != null && bitMaps[column] != null && bitMaps[column].isMarked(row).
| req.dataNodeId = thisDataNodeId; | ||
| req.version = IoTConsensusV2RequestVersion.VERSION_1.getVersion(); | ||
| req.type = IoTConsensusV2RequestType.TRANSFER_OBJECT_FILE_PIECE.getType(); | ||
| req.body = objectNode.serialize(); |
There was a problem hiding this comment.
objectNode.serialize() currently treats source-file read failures as a successful transfer: ObjectNode.serialize() logs only at debug, writes readSuccess && isEOF, and still sends the zero-filled contents buffer. It also uses RandomAccessFile.read(byte[]) without validating the byte count. With this new IoTConsensusV2 path, a removed/truncated object file or short read can be accepted by the follower as corrupted content. Please fail fast here (and use readFully or validate the bytes read) so the event is retried instead of committed with bad object data.
Description
This PR adds OBJECT data support to IoTConsensusV2.
Tests