diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md
index 31601354f1a8..325ce57235ed 100644
--- a/docs/docs/flink/procedures.md
+++ b/docs/docs/flink/procedures.md
@@ -498,6 +498,34 @@ All available procedures are listed below.
CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)
+
| rollback_to_timestamp |
diff --git a/docs/docs/maintenance/manage-snapshots.mdx b/docs/docs/maintenance/manage-snapshots.mdx
index 821aa0b7559f..edbbf0ca0ddf 100644
--- a/docs/docs/maintenance/manage-snapshots.mdx
+++ b/docs/docs/maintenance/manage-snapshots.mdx
@@ -353,6 +353,25 @@ CALL sys.rollback(table => 'database_name.table_name', snapshot => snasphot_id);
+## Restore Snapshot as Latest
+
+Restore a table to the state of a specific snapshot ID by creating a new latest snapshot. Unlike rollback, this operation
+does not delete snapshots or tags whose snapshot id is larger than the restored snapshot.
+
+
+
+
+
+Run the following command:
+
+```sql
+CALL sys.restore_as_latest(`table` => 'database_name.table_name', snapshot_id => );
+```
+
+
+
+
+
## Remove Orphan Files
Paimon files are deleted physically only when expiring snapshots. However, it is possible that some unexpected errors occurred
@@ -402,4 +421,4 @@ The table can be `*` to clean all tables in the database.
-
\ No newline at end of file
+
diff --git a/docs/docs/maintenance/manage-tags.mdx b/docs/docs/maintenance/manage-tags.mdx
index 78e588e5f53f..173dbe5e7ab2 100644
--- a/docs/docs/maintenance/manage-tags.mdx
+++ b/docs/docs/maintenance/manage-tags.mdx
@@ -300,3 +300,22 @@ CALL sys.rollback(table => 'test.t', version => '2');
+
+## Restore Tag as Latest
+
+Restore a table to the state of a specific tag by creating a new latest snapshot. Unlike rollback, this operation does not
+delete snapshots or tags whose snapshot id is larger than the restored tag.
+
+
+
+
+
+Run the following command:
+
+```sql
+CALL sys.restore_as_latest(`table` => 'database_name.table_name', tag => 'tag_name');
+```
+
+
+
+
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 31fb3c52cab6..a283360ca3e9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -74,6 +74,9 @@ int overwritePartition(
/** Compact the manifest entries only. */
void compactManifest();
+ /** Restore the target snapshot as the latest snapshot. */
+ boolean restoreAsLatest(Snapshot targetSnapshot);
+
/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List commitMessages);
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 7cca259cbf9f..367b5efd8fee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -26,6 +26,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileEntry;
@@ -89,6 +90,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -107,6 +109,7 @@
import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
* Default implementation of {@link FileStoreCommit}.
@@ -1164,6 +1167,288 @@ public boolean replaceManifestList(
return commitSnapshotImpl(newSnapshot, emptyList());
}
+ @Override
+ public boolean restoreAsLatest(Snapshot targetSnapshot) {
+ Snapshot latest =
+ checkNotNull(
+ snapshotManager.latestSnapshot(),
+ "Latest snapshot is null, can not restore.");
+
+ Map latestEntries = new HashMap<>();
+ FileEntry.mergeEntries(
+ manifestFile,
+ manifestList.readDataManifests(latest),
+ latestEntries,
+ options.scanManifestParallelism());
+
+ latestEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD);
+
+ Map targetEntries = new HashMap<>();
+ FileEntry.mergeEntries(
+ manifestFile,
+ manifestList.readDataManifests(targetSnapshot),
+ targetEntries,
+ options.scanManifestParallelism());
+ targetEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD);
+
+ List deltaFiles = new ArrayList<>();
+ for (Map.Entry entry : latestEntries.entrySet()) {
+ if (!targetEntries.containsKey(entry.getKey())) {
+ ManifestEntry manifestEntry = entry.getValue();
+ deltaFiles.add(
+ ManifestEntry.create(
+ FileKind.DELETE,
+ manifestEntry.partition(),
+ manifestEntry.bucket(),
+ manifestEntry.totalBuckets(),
+ manifestEntry.file()));
+ }
+ }
+ for (Map.Entry entry : targetEntries.entrySet()) {
+ if (!latestEntries.containsKey(entry.getKey())) {
+ ManifestEntry manifestEntry = entry.getValue();
+ deltaFiles.add(
+ ManifestEntry.create(
+ FileKind.ADD,
+ manifestEntry.partition(),
+ manifestEntry.bucket(),
+ manifestEntry.totalBuckets(),
+ manifestEntry.file()));
+ }
+ }
+
+ // Include data files whose deletion vector changed between the previous latest and the
+ // target while the data file itself stayed the same. A DV-only delete/update does not
+ // rewrite the data file, so the identifier diff above misses it and leaves an empty data
+ // delta that streaming overwrite readers would skip. Re-emitting such files as
+ // DELETE(latest) + ADD(target) makes the restore transition visible. The physical row count
+ // is unchanged, so the pair nets to zero in the delta record count, consistent with the
+ // unchanged totalRecordCount.
+ addDeletionVectorOnlyChanges(
+ latest, targetSnapshot, latestEntries, targetEntries, deltaFiles);
+
+ Pair baseManifestList =
+ manifestList.write(manifestFile.write(new ArrayList<>(latestEntries.values())));
+ Pair deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));
+ // For row-tracking tables nextRowId must stay monotonic: restoring an older snapshot must
+ // not move it backwards, otherwise new appends would reuse row ids already assigned by the
+ // snapshots between the target and the previous latest, breaking the global uniqueness of
+ // _ROW_ID. Keep the larger of the previous latest and the target nextRowId.
+ Long nextRowId = maxNextRowId(latest.nextRowId(), targetSnapshot.nextRowId());
+ Snapshot newSnapshot =
+ new Snapshot(
+ latest.id() + 1,
+ targetSnapshot.schemaId(),
+ baseManifestList.getKey(),
+ baseManifestList.getRight(),
+ deltaManifestList.getKey(),
+ deltaManifestList.getRight(),
+ null,
+ null,
+ targetSnapshot.indexManifest(),
+ commitUser,
+ Long.MAX_VALUE,
+ CommitKind.OVERWRITE,
+ System.currentTimeMillis(),
+ targetSnapshot.totalRecordCount(),
+ recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles),
+ null,
+ targetSnapshot.watermark(),
+ targetSnapshot.statistics(),
+ targetSnapshot.properties(),
+ nextRowId);
+
+ // The restore is an overwrite from the previous latest to the target, so the base files,
+ // delta files and index changes describe the transition the callbacks need. These are
+ // shared by the pre- and post-commit callbacks below.
+ List baseFiles =
+ SimpleFileEntry.from(new ArrayList<>(latestEntries.values()));
+ List indexChanges = restoreIndexChanges(latest, targetSnapshot);
+
+ // Like a regular commit, run the pre-commit callbacks before the snapshot becomes visible.
+ // They may veto the restore by throwing (e.g. a chain-table snapshot branch rejects a
+ // pure-DELETE overwrite that would drop a snapshot partition still anchoring delta
+ // partitions), in which case the restore snapshot is never created.
+ commitPreCallbacks.forEach(
+ callback -> callback.call(baseFiles, deltaFiles, indexChanges, newSnapshot));
+
+ boolean success =
+ commitSnapshotImpl(newSnapshot, new ArrayList<>(PartitionEntry.merge(deltaFiles)));
+ if (success) {
+ // Notify the post-commit callbacks so external views stay in sync with the restored
+ // state (e.g. Iceberg compatibility metadata and chain-table overwrite handling).
+ CommitCallback.Context context =
+ new CommitCallback.Context(
+ baseFiles,
+ deltaFiles,
+ indexChanges,
+ newSnapshot,
+ newSnapshot.commitIdentifier());
+ commitCallbacks.forEach(callback -> callback.call(context));
+ }
+ return success;
+ }
+
+ /**
+ * Computes the index file changes between the previous latest snapshot and the restore target,
+ * mirroring how the data delta files are derived: entries that only exist in the previous
+ * latest are marked as {@link FileKind#DELETE}, entries that only exist in the target are kept
+ * as ADD.
+ */
+ private List restoreIndexChanges(Snapshot latest, Snapshot target) {
+ Set latestIndexEntries = readIndexEntries(latest.indexManifest());
+ Set targetIndexEntries = readIndexEntries(target.indexManifest());
+
+ List indexChanges = new ArrayList<>();
+ for (IndexManifestEntry entry : latestIndexEntries) {
+ if (!targetIndexEntries.contains(entry)) {
+ indexChanges.add(entry.toDeleteEntry());
+ }
+ }
+ for (IndexManifestEntry entry : targetIndexEntries) {
+ if (!latestIndexEntries.contains(entry)) {
+ indexChanges.add(entry);
+ }
+ }
+ return indexChanges;
+ }
+
+ private Set readIndexEntries(@Nullable String indexManifest) {
+ if (indexManifest == null) {
+ return Collections.emptySet();
+ }
+ return new HashSet<>(indexManifestFile.read(indexManifest));
+ }
+
+ /**
+ * Adds, to {@code deltaFiles}, data files whose deletion vector differs between the previous
+ * latest and the target snapshot but whose data file itself is unchanged (i.e. a DV-only
+ * delete/update). Such files are present in both snapshots, so the data-file identifier diff
+ * does not emit them; without this, a DV-only restore produces an empty data delta that
+ * streaming overwrite readers skip. They are re-emitted as DELETE(latest) + ADD(target).
+ */
+ private void addDeletionVectorOnlyChanges(
+ Snapshot latest,
+ Snapshot target,
+ Map latestEntries,
+ Map targetEntries,
+ List deltaFiles) {
+ Map latestDvs =
+ readDeletionVectors(latest.indexManifest());
+ Map targetDvs =
+ readDeletionVectors(target.indexManifest());
+ if (latestDvs.isEmpty() && targetDvs.isEmpty()) {
+ return;
+ }
+
+ Map latestByFile =
+ indexByDataFile(latestEntries.values());
+ Map targetByFile =
+ indexByDataFile(targetEntries.values());
+
+ Set keys = new HashSet<>(latestDvs.keySet());
+ keys.addAll(targetDvs.keySet());
+ for (DeletionVectorKey key : keys) {
+ if (Objects.equals(latestDvs.get(key), targetDvs.get(key))) {
+ continue;
+ }
+ ManifestEntry latestEntry = latestByFile.get(key);
+ ManifestEntry targetEntry = targetByFile.get(key);
+ // Only handle DV-only changes here: the data file must be present in both snapshots. A
+ // file added/removed across the restore is already in deltaFiles via the identifier
+ // diff.
+ if (latestEntry != null && targetEntry != null) {
+ deltaFiles.add(toDeltaEntry(FileKind.DELETE, latestEntry));
+ deltaFiles.add(toDeltaEntry(FileKind.ADD, targetEntry));
+ }
+ }
+ }
+
+ private Map readDeletionVectors(
+ @Nullable String indexManifest) {
+ if (indexManifest == null) {
+ return Collections.emptyMap();
+ }
+ Map result = new HashMap<>();
+ for (IndexManifestEntry entry : indexManifestFile.read(indexManifest)) {
+ if (entry.kind() != FileKind.ADD
+ || !DELETION_VECTORS_INDEX.equals(entry.indexFile().indexType())) {
+ continue;
+ }
+ LinkedHashMap dvRanges = entry.indexFile().dvRanges();
+ if (dvRanges == null) {
+ continue;
+ }
+ for (DeletionVectorMeta meta : dvRanges.values()) {
+ result.put(
+ new DeletionVectorKey(
+ entry.partition(), entry.bucket(), meta.dataFileName()),
+ meta);
+ }
+ }
+ return result;
+ }
+
+ private static Map indexByDataFile(
+ Collection entries) {
+ Map result = new HashMap<>();
+ for (ManifestEntry entry : entries) {
+ result.put(
+ new DeletionVectorKey(
+ entry.partition(), entry.bucket(), entry.file().fileName()),
+ entry);
+ }
+ return result;
+ }
+
+ private static ManifestEntry toDeltaEntry(FileKind kind, ManifestEntry entry) {
+ return ManifestEntry.create(
+ kind, entry.partition(), entry.bucket(), entry.totalBuckets(), entry.file());
+ }
+
+ /** Identifies a data file (partition, bucket, file name) for deletion-vector comparison. */
+ private static class DeletionVectorKey {
+ private final BinaryRow partition;
+ private final int bucket;
+ private final String dataFileName;
+
+ DeletionVectorKey(BinaryRow partition, int bucket, String dataFileName) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.dataFileName = dataFileName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DeletionVectorKey that = (DeletionVectorKey) o;
+ return bucket == that.bucket
+ && Objects.equals(partition, that.partition)
+ && Objects.equals(dataFileName, that.dataFileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partition, bucket, dataFileName);
+ }
+ }
+
+ @Nullable
+ private static Long maxNextRowId(@Nullable Long left, @Nullable Long right) {
+ if (left == null) {
+ return right;
+ }
+ if (right == null) {
+ return left;
+ }
+ return Math.max(left, right);
+ }
+
public void compactManifest() {
int retryCount = 0;
long startMillis = System.currentTimeMillis();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 0311c9bbe4d8..849b25d54e8b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.Path;
@@ -203,6 +204,20 @@ public void compactManifests() {
commit.compactManifest();
}
+ public boolean restoreAsLatest(Snapshot targetSnapshot) {
+ checkCommitted();
+ boolean success = commit.restoreAsLatest(targetSnapshot);
+ if (success) {
+ // Skip automatic expiration for the restore path. Restore-as-latest promises not to
+ // delete snapshots or tags whose snapshot id is larger than the restored snapshot, but
+ // the newly committed latest snapshot would otherwise let expiration (e.g. a low
+ // snapshot.num-retained.max) immediately remove the restored snapshot and the later
+ // snapshots/tags it is meant to preserve.
+ maintain(COMMIT_IDENTIFIER, maintainExecutor, false);
+ }
+ return success;
+ }
+
private void checkCommitted() {
checkState(!batchCommitted, "BatchTableCommit only support one-time committing.");
batchCommitted = true;
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java
index 5ff5edbf71b4..8550edf358fc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
@@ -594,6 +595,39 @@ private BinaryRow findGroupedPartition(FileStoreTable table, String region, Stri
"Partition " + region + "|" + dt + " not found"));
}
+ @Test
+ public void testRestoreRejectedWhenDroppingAnchorSnapshotPartition() throws Exception {
+ Path tablePath = tablePath("restore_reject_anchor");
+ createChainTable(tablePath, true);
+
+ FileStoreTable snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ FileStoreTable deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ // snapshot branch: an unrelated group (US) first, then the CN anchor.
+ writeGrouped(snapshotTable, "US", "20250101", "v1"); // snapshot branch snapshot #1
+ writeGrouped(snapshotTable, "CN", "20250301", "v2"); // snapshot #2 adds CN/20250301
+
+ // delta branch: a CN delta that uses CN/20250301 as its only anchor.
+ writeGrouped(deltaTable, "CN", "20250315", "v3");
+
+ // Restoring the snapshot branch back to snapshot #1 would drop CN/20250301, the only anchor
+ // of the CN/20250315 delta. The pre-commit callback must reject this restore (same as a
+ // regular overwrite) instead of silently breaking the chain.
+ FileStoreTable snapshotBranch = loadTable(tablePath).switchToBranch("snapshot");
+ Snapshot target = snapshotBranch.snapshotManager().snapshot(1);
+ try (TableCommitImpl commit = snapshotBranch.newCommit(commitUser)) {
+ assertThatThrownBy(() -> commit.restoreAsLatest(target))
+ .hasMessageContaining("Snapshot partition cannot be dropped");
+ }
+ // The dangerous restore was aborted, so the latest snapshot is unchanged.
+ assertThat(
+ loadTable(tablePath)
+ .switchToBranch("snapshot")
+ .snapshotManager()
+ .latestSnapshotId())
+ .isEqualTo(2L);
+ }
+
private Path tablePath(String tableName) {
return new Path(tempDir.toUri().toString(), tableName);
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index e13b11d474b6..251866454fd8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -39,6 +39,7 @@
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.operation.commit.ConflictDetection;
import org.apache.paimon.operation.commit.RetryCommitResult;
@@ -889,6 +890,37 @@ public void testDVIndexFiles(boolean bitmap64) throws Exception {
assertThat(dvs.get("f2").isDeleted(3)).isTrue();
}
+ @Test
+ public void testRestoreIncludesDeletionVectorOnlyChanges() throws Exception {
+ TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, new HashMap<>());
+ BinaryRow partition = gen.getPartition(gen.next());
+
+ // snapshot 1: data files f1, f2, no deletion vectors
+ store.commit(store.writeDataFiles(partition, 0, Arrays.asList("f1", "f2")));
+ Snapshot target = store.snapshotManager().latestSnapshot();
+
+ // snapshot 2: DV-only change — add a deletion vector for f1, data files unchanged
+ store.commit(
+ store.writeDVIndexFiles(
+ partition, 0, Collections.singletonMap("f1", Arrays.asList(1, 3))));
+
+ // restore back to snapshot 1
+ try (FileStoreCommitImpl commit = store.newCommit()) {
+ assertThat(commit.restoreAsLatest(target)).isTrue();
+ }
+
+ // f1's deletion vector changed even though the data file is unchanged, so the restore delta
+ // must re-emit f1 as DELETE + ADD instead of being empty. An empty data delta would let
+ // streaming overwrite readers skip the restore entirely.
+ Snapshot restored = store.snapshotManager().latestSnapshot();
+ ManifestList manifestList = store.manifestListFactory().create();
+ List deltaManifests = manifestList.readDeltaManifests(restored);
+ long added = deltaManifests.stream().mapToLong(ManifestFileMeta::numAddedFiles).sum();
+ long deleted = deltaManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum();
+ assertThat(added).isEqualTo(1);
+ assertThat(deleted).isEqualTo(1);
+ }
+
@Test
public void testManifestCompact() throws Exception {
TestFileStore store = createStore(false);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedure.java
new file mode 100644
index 000000000000..8aeb8576d70a
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedure.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+
+/**
+ * Restore as latest procedure. Usage:
+ *
+ *
+ * -- restore a snapshot as the latest snapshot
+ * CALL sys.restore_as_latest(`table` => 'tableId', snapshot_id => snapshotId)
+ *
+ * -- restore a tag as the latest snapshot
+ * CALL sys.restore_as_latest(`table` => 'tableId', tag => 'tagName')
+ *
+ */
+public class RestoreAsLatestProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "restore_as_latest";
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "tag", type = @DataTypeHint("STRING"), isOptional = true),
+ @ArgumentHint(
+ name = "snapshot_id",
+ type = @DataTypeHint("BIGINT"),
+ isOptional = true)
+ })
+ public @DataTypeHint(
+ "ROW")
+ Row[] call(ProcedureContext procedureContext, String tableId, String tagName, Long snapshotId)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+ FileStore> store = fileStoreTable.store();
+ Snapshot latestSnapshot = store.snapshotManager().latestSnapshot();
+ Preconditions.checkNotNull(latestSnapshot, "Latest snapshot is null, can not restore.");
+
+ boolean hasTag = !StringUtils.isNullOrWhitespaceOnly(tagName);
+ boolean hasSnapshot = snapshotId != null;
+ Preconditions.checkArgument(
+ hasTag != hasSnapshot, "Must specify exactly one of tag and snapshot_id.");
+
+ Snapshot targetSnapshot;
+ if (hasTag) {
+ targetSnapshot = store.newTagManager().getOrThrow(tagName).trimToSnapshot();
+ } else {
+ targetSnapshot = findSnapshot(store, snapshotId);
+ }
+
+ try (TableCommitImpl commit =
+ fileStoreTable.newCommit("restore-as-latest-" + UUID.randomUUID().toString())) {
+ Preconditions.checkState(
+ commit.restoreAsLatest(targetSnapshot),
+ "Failed to restore snapshot %s as latest.",
+ targetSnapshot.id());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to restore snapshot %s as latest.", targetSnapshot.id()),
+ e);
+ }
+
+ return new Row[] {
+ Row.of(
+ latestSnapshot.id(),
+ targetSnapshot.id(),
+ store.snapshotManager().latestSnapshotId())
+ };
+ }
+
+ private Snapshot findSnapshot(FileStore> store, long snapshotId) {
+ SnapshotManager snapshotManager = store.snapshotManager();
+ if (snapshotManager.snapshotExists(snapshotId)) {
+ return snapshotManager.snapshot(snapshotId);
+ }
+
+ SortedMap> tags = store.newTagManager().tags();
+ for (Map.Entry> entry : tags.entrySet()) {
+ if (entry.getKey().id() == snapshotId) {
+ return entry.getKey();
+ } else if (entry.getKey().id() > snapshotId) {
+ break;
+ }
+ }
+
+ throw new IllegalArgumentException(
+ String.format("Restore snapshot '%s' doesn't exist.", snapshotId));
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index db2777a3a0fc..b999168f2cbb 100644
--- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -69,6 +69,7 @@ org.apache.paimon.flink.procedure.DropPartitionProcedure
org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
+org.apache.paimon.flink.procedure.RestoreAsLatestProcedure
org.apache.paimon.flink.procedure.RollbackToTimestampProcedure
org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
@@ -106,4 +107,4 @@ org.apache.paimon.flink.procedure.DataEvolutionMergeIntoProcedure
org.apache.paimon.flink.procedure.ReassignRowIdProcedure
org.apache.paimon.flink.procedure.CreateGlobalIndexProcedure
org.apache.paimon.flink.procedure.VectorSearchProcedure
-org.apache.paimon.flink.procedure.DropGlobalIndexProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.DropGlobalIndexProcedure
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedureITCase.java
new file mode 100644
index 000000000000..c89d58e6496f
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedureITCase.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** IT cases for restore_as_latest procedure. */
+public class RestoreAsLatestProcedureITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testRestoreSnapshotAsLatest() throws Exception {
+ sql("CREATE TABLE T (id INT, name STRING)");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ commitRow(table, 1, "a");
+ commitRow(table, 2, "b");
+ commitRow(table, 3, "c");
+ assertEquals(3, snapshotManager.latestSnapshotId());
+
+ assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 1)"))
+ .containsExactly(Row.of(3L, 1L, 4L));
+
+ assertEquals(4, snapshotManager.latestSnapshotId());
+ assertRestoreDelta(table, 4, 0, 2, -2L);
+ assertTrue(snapshotManager.snapshotExists(2));
+ assertTrue(snapshotManager.snapshotExists(3));
+ assertThat(sql("SELECT * FROM T")).containsExactly(Row.of(1, "a"));
+
+ assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 3)"))
+ .containsExactly(Row.of(4L, 3L, 5L));
+
+ assertEquals(5, snapshotManager.latestSnapshotId());
+ assertRestoreDelta(table, 5, 2, 0, 2L);
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(1, "a"), Row.of(2, "b"), Row.of(3, "c"));
+
+ commitRow(table, 4, "d");
+ assertEquals(6, snapshotManager.latestSnapshotId());
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, "a"), Row.of(2, "b"), Row.of(3, "c"), Row.of(4, "d"));
+ }
+
+ @Test
+ public void testRestoreTagAsLatest() throws Exception {
+ sql("CREATE TABLE T (id INT, name STRING)");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ commitRow(table, 1, "a");
+ commitRow(table, 2, "b");
+ commitRow(table, 3, "c");
+ assertEquals(3, snapshotManager.latestSnapshotId());
+
+ sql("CALL sys.create_tag(`table` => 'default.T', tag => 'tag-1', snapshot_id => 1)");
+
+ assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', tag => 'tag-1')"))
+ .containsExactly(Row.of(3L, 1L, 4L));
+
+ assertEquals(4, snapshotManager.latestSnapshotId());
+ assertRestoreDelta(table, 4, 0, 2, -2L);
+ assertTrue(snapshotManager.snapshotExists(2));
+ assertTrue(snapshotManager.snapshotExists(3));
+ assertThat(sql("SELECT * FROM T")).containsExactly(Row.of(1, "a"));
+ }
+
+ @Test
+ public void testRestoreDoesNotExpireKeptSnapshots() throws Exception {
+ sql("CREATE TABLE T (id INT, name STRING)");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ commitRow(table, 1, "a");
+ commitRow(table, 2, "b");
+ commitRow(table, 3, "c");
+ assertEquals(3, snapshotManager.latestSnapshotId());
+
+ // Configure aggressive expiration that would otherwise drop every snapshot but the latest.
+ sql(
+ "ALTER TABLE T SET ("
+ + "'snapshot.num-retained.min' = '1', "
+ + "'snapshot.num-retained.max' = '1')");
+
+ assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 1)"))
+ .containsExactly(Row.of(3L, 1L, 4L));
+
+ // Restore-as-latest must not delete snapshots whose id is larger than the restored one,
+ // even with snapshot.num-retained.max = 1. The restore path skips automatic expiration.
+ assertEquals(4, snapshotManager.latestSnapshotId());
+ assertTrue(snapshotManager.snapshotExists(1));
+ assertTrue(snapshotManager.snapshotExists(2));
+ assertTrue(snapshotManager.snapshotExists(3));
+ }
+
+ @Test
+ public void testRestoreKeepsRowIdMonotonic() throws Exception {
+ sql("CREATE TABLE RT (id INT, name STRING) WITH ('row-tracking.enabled' = 'true')");
+
+ FileStoreTable table = paimonTable("RT");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ commitRow(table, 1, "a");
+ commitRow(table, 2, "b");
+ commitRow(table, 3, "c");
+ assertEquals(3, snapshotManager.latestSnapshotId());
+
+ Long latestNextRowId = snapshotManager.latestSnapshot().nextRowId();
+ assertThat(latestNextRowId).isNotNull();
+
+ // Restore the first snapshot, whose own nextRowId is smaller than the current latest.
+ assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.RT', snapshot_id => 1)"))
+ .containsExactly(Row.of(3L, 1L, 4L));
+
+ // nextRowId must not move backwards: otherwise new appends would reuse row ids already
+ // assigned by snapshots 2 and 3, breaking the global uniqueness of _ROW_ID. The restore
+ // snapshot keeps the larger of the previous latest and the target nextRowId.
+ Snapshot restored = table.snapshot(4);
+ assertThat(restored.nextRowId()).isEqualTo(latestNextRowId);
+ }
+
+ @Test
+ public void testRestoreTriggersCommitCallback() throws Exception {
+ sql(
+ "CREATE TABLE T (id INT, name STRING) WITH ("
+ + "'metadata.iceberg.storage' = 'table-location')");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ commitRow(table, 1, "a");
+ commitRow(table, 2, "b");
+ commitRow(table, 3, "c");
+ assertEquals(3, snapshotManager.latestSnapshotId());
+
+ assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 1)"))
+ .containsExactly(Row.of(3L, 1L, 4L));
+ assertEquals(4, snapshotManager.latestSnapshotId());
+
+ // The restore must trigger the commit callbacks like a regular commit, so external views
+ // stay in sync with the restored state. With Iceberg compatibility enabled, that means
+ // Iceberg metadata is generated for the restore snapshot.
+ Path icebergMetadata = new Path(table.location(), "metadata/v4.metadata.json");
+ assertTrue(table.fileIO().exists(icebergMetadata));
+ }
+
+ private void assertRestoreDelta(
+ FileStoreTable table,
+ long snapshotId,
+ long expectedNumAddedFiles,
+ long expectedNumDeletedFiles,
+ long expectedDeltaRecordCount) {
+ Snapshot snapshot = table.snapshot(snapshotId);
+ ManifestList manifestList = table.store().manifestListFactory().create();
+ List deltaManifests = manifestList.readDeltaManifests(snapshot);
+
+ assertThat(deltaManifests).hasSize(1);
+ assertThat(deltaManifests.get(0).numAddedFiles()).isEqualTo(expectedNumAddedFiles);
+ assertThat(deltaManifests.get(0).numDeletedFiles()).isEqualTo(expectedNumDeletedFiles);
+ assertThat(snapshot.deltaRecordCount()).isEqualTo(expectedDeltaRecordCount);
+ }
+
+ private void commitRow(FileStoreTable table, int id, String name) throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(id, BinaryString.fromString(name)));
+ commit.commit(write.prepareCommit());
+ }
+ }
+}
|