Skip to content

[flink] Add restore_as_latest procedure#8139

Open
zhuxiangyi wants to merge 5 commits into
apache:masterfrom
zhuxiangyi:feat/restore-as-latest-procedure
Open

[flink] Add restore_as_latest procedure#8139
zhuxiangyi wants to merge 5 commits into
apache:masterfrom
zhuxiangyi:feat/restore-as-latest-procedure

Conversation

@zhuxiangyi

Copy link
Copy Markdown

Purpose

This PR adds a non-destructive restore procedure for Flink:

CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 3);
CALL sys.restore_as_latest(`table` => 'default.T', tag => 'tag-1');

Unlike rollback_to, this procedure restores the table to the state of a target snapshot or tag by creating a new latest snapshot. Later snapshots and tags are preserved.

What is changed

  • Add RestoreAsLatestProcedure and register it in the Flink procedure factory list.
  • Add commit support to create a new latest snapshot from the complete data manifests of the target snapshot.
  • Add IT coverage for restoring from snapshot and tag, preserving later snapshots, and writing after restore.
  • Document the procedure in Flink procedures and snapshot/tag maintenance docs.

Tests

mvn -pl paimon-flink/paimon-flink-common -am -Pfast-build -DfailIfNoTests=false -Dtest=RestoreAsLatestProcedureITCase test
git diff --check

Notes

This PR is not associated with an issue yet. If the community prefers following the discussion-first flow strictly, I can open or join an issue/discussion and adjust the design accordingly.

@zhuxiangyi zhuxiangyi force-pushed the feat/restore-as-latest-procedure branch 2 times, most recently from ecd0935 to 0f821ed Compare June 5, 2026 23:37
@zhuxiangyi zhuxiangyi force-pushed the feat/restore-as-latest-procedure branch from 0f821ed to 1492902 Compare June 6, 2026 01:03
@JingsongLi

Copy link
Copy Markdown
Contributor

I think restoreAsLatest can be invisible to streaming readers that handle overwrite snapshots.

The new snapshot writes the target snapshot's files into the base manifest list, but writes an empty delta manifest list and marks the commit as CommitKind.OVERWRITE (FileStoreCommitImpl.java:1174-1200). DataTableStreamScan first handles overwrite snapshots via the overwrite-change path, and if the returned plan is empty it advances past the snapshot. Since the restore snapshot has no delta, a streaming reader with streaming-read-overwrite=true can skip the restore entirely, missing both files/rows that should be removed from the current latest snapshot and files/rows that should be restored from the target snapshot.

Could restoreAsLatest produce an overwrite delta relative to the previous latest snapshot (DELETE previous-only files and ADD target-only files), or introduce a dedicated commit kind/streaming-scan handling for restore snapshots?

@zhuxiangyi

Copy link
Copy Markdown
Author

@JingsongLi
Thanks for pointing this out. I agree that the current restore snapshot is not sufficient for streaming readers with streaming-read-overwrite=true.

The new snapshot currently has the target snapshot's complete data manifests in baseManifestList, but its deltaManifestList is empty. This makes the final table state correct for batch/full scans, but the restore can be invisible to streaming overwrite readers.

I will update restoreAsLatest to generate an overwrite delta relative to the previous latest snapshot: DELETE files that exist only in the previous latest snapshot, and ADD files that exist only in the target snapshot. The baseManifestList will contain the previous latest snapshot's merged effective ADD files, while deltaManifestList will describe the previous-latest-to-target transition.

Ensure restore_as_latest writes an overwrite delta so streaming overwrite readers can observe restored file changes.
@zhuxiangyi zhuxiangyi force-pushed the feat/restore-as-latest-procedure branch from 439db01 to 1c523ac Compare June 8, 2026 02:17
@zhuxiangyi

Copy link
Copy Markdown
Author

@JingsongLi
Updated in the latest commit. restoreAsLatest now writes an overwrite delta from the previous latest snapshot to the target snapshot: DELETE files that exist only in the previous latest snapshot and ADD files that exist only in the target snapshot.

I also added IT coverage to verify both DELETE-only and ADD-only restore deltas.

Comment thread paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java Outdated
Comment thread paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java Outdated
Skip automatic expiration on the restore-as-latest path so it no longer
deletes the snapshots/tags it promises to keep (e.g. with
snapshot.num-retained.max=1), and keep nextRowId monotonic by taking the
max of the previous latest and target snapshot, preventing row id reuse
that breaks _ROW_ID global uniqueness on row-tracking tables.

Add IT cases covering both fixes.
@zhuxiangyi zhuxiangyi requested a review from JingsongLi June 21, 2026 16:33
targetSnapshot.properties(),
nextRowId);

return commitSnapshotImpl(newSnapshot, new ArrayList<>(PartitionEntry.merge(deltaFiles)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This restore commit bypasses the normal post-commit callbacks. Regular commits call commitCallbacks with the committed snapshot and delta files after commitSnapshotImpl succeeds, and those callbacks keep external state in sync (for example Iceberg compatibility metadata uses context.snapshot/context.deltaFiles, and chain-table overwrite handling reacts to CommitKind.OVERWRITE). restoreAsLatest also changes the table state with an overwrite delta, but it returns immediately after writing the Paimon snapshot, so those external views can remain at the pre-restore state. Could we trigger the same commit callback path after a successful restore, using the restored base/delta/index files and the new snapshot context?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks. restoreAsLatest committed via commitSnapshotImpl directly and skipped the commit callbacks, so external views (Iceberg metadata, chain-table overwrite) could stay at the pre-restore state.

Fixed by notifying the callbacks after a successful restore, like a regular commit. The context uses the restore delta (DELETE previous-only + ADD target-only) and an index delta derived the same way from the previous-latest and target index manifests. Both callbacks are idempotent, so retries stay correct.

Added an IT case testRestoreTriggersCommitCallback asserting Iceberg metadata is generated for the restore snapshot.

restoreAsLatest committed the restore snapshot directly via
commitSnapshotImpl, bypassing the commit callbacks that a regular commit
runs. External views that depend on those callbacks (Iceberg
compatibility metadata, chain-table overwrite handling) could therefore
stay at the pre-restore state.

Notify the callbacks after a successful restore using the restored
base/delta/index files and the new snapshot. The index changes are
derived from the previous latest and target index manifests, mirroring
how the data delta files are computed.

Add an IT case asserting Iceberg metadata is generated for the restore
snapshot.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants