Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/docs/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,34 @@ All available procedures are listed below.
CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)
</td>
</tr>
<tr>
<td>restore_as_latest</td>
<td>
-- for Flink 1.18<br/>
-- restore a snapshot as the latest snapshot<br/>
CALL [catalog.]sys.restore_as_latest('identifier', cast(null as string), snapshotId)<br/><br/>
-- restore a tag as the latest snapshot<br/>
CALL [catalog.]sys.restore_as_latest('identifier', 'tagName', cast(null as bigint))<br/><br/>
-- for Flink 1.19 and later<br/>
-- restore a snapshot as the latest snapshot<br/>
CALL [catalog.]sys.restore_as_latest(`table` => 'identifier', snapshot_id => snapshotId)<br/><br/>
-- restore a tag as the latest snapshot<br/>
CALL [catalog.]sys.restore_as_latest(`table` => 'identifier', tag => 'tagName')
</td>
<td>
To restore a specific version of target table as the latest snapshot without deleting later snapshots or tags.
Argument:
<li>table: the target table identifier. Cannot be empty.</li>
<li>snapshotId (Long): id of the snapshot that will restore from.</li>
<li>tagName: name of the tag that will restore from.</li>
</td>
<td>
-- for Flink 1.18<br/>
CALL sys.restore_as_latest('default.T', cast(null as string), 10)<br/><br/>
-- for Flink 1.19 and later<br/>
CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 10)
</td>
</tr>
<tr>
<td>rollback_to_timestamp</td>
<td>
Expand Down
21 changes: 20 additions & 1 deletion docs/docs/maintenance/manage-snapshots.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,25 @@ CALL sys.rollback(table => 'database_name.table_name', snapshot => snasphot_id);

</Tabs>

## Restore Snapshot as Latest

Restore a table to the state of a specific snapshot ID by creating a new latest snapshot. Unlike rollback, this operation
does not delete snapshots or tags whose snapshot id is larger than the restored snapshot.

<Tabs groupId="restore-as-latest">

<TabItem value="flink-sql" label="Flink SQL">

Run the following command:

```sql
CALL sys.restore_as_latest(`table` => 'database_name.table_name', snapshot_id => <snapshot-id>);
```

</TabItem>

</Tabs>

## Remove Orphan Files

Paimon files are deleted physically only when expiring snapshots. However, it is possible that some unexpected errors occurred
Expand Down Expand Up @@ -402,4 +421,4 @@ The table can be `*` to clean all tables in the database.

</TabItem>

</Tabs>
</Tabs>
19 changes: 19 additions & 0 deletions docs/docs/maintenance/manage-tags.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,22 @@ CALL sys.rollback(table => 'test.t', version => '2');
</TabItem>

</Tabs>

## Restore Tag as Latest

Restore a table to the state of a specific tag by creating a new latest snapshot. Unlike rollback, this operation does not
delete snapshots or tags whose snapshot id is larger than the restored tag.

<Tabs groupId="restore-as-latest">

<TabItem value="flink-sql" label="Flink SQL">

Run the following command:

```sql
CALL sys.restore_as_latest(`table` => 'database_name.table_name', tag => 'tag_name');
```

</TabItem>

</Tabs>
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ int overwritePartition(
/** Compact the manifest entries only. */
void compactManifest();

/** Restore the target snapshot as the latest snapshot. */
boolean restoreAsLatest(Snapshot targetSnapshot);

/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/**
* Default implementation of {@link FileStoreCommit}.
Expand Down Expand Up @@ -1164,6 +1165,160 @@ public boolean replaceManifestList(
return commitSnapshotImpl(newSnapshot, emptyList());
}

@Override
public boolean restoreAsLatest(Snapshot targetSnapshot) {
Snapshot latest =
checkNotNull(
snapshotManager.latestSnapshot(),
"Latest snapshot is null, can not restore.");

Map<FileEntry.Identifier, ManifestEntry> latestEntries = new HashMap<>();
FileEntry.mergeEntries(
manifestFile,
manifestList.readDataManifests(latest),
latestEntries,
options.scanManifestParallelism());

latestEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD);

Map<FileEntry.Identifier, ManifestEntry> targetEntries = new HashMap<>();
FileEntry.mergeEntries(
manifestFile,
manifestList.readDataManifests(targetSnapshot),
targetEntries,
options.scanManifestParallelism());
targetEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD);

List<ManifestEntry> deltaFiles = new ArrayList<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The restore delta is built only from data-file identifier differences. For tables using deletion vectors or other index-manifest state, the logical contents can change while the data files stay the same; restoring across a DV-only delete/update would leave deltaFiles empty and deltaRecordCount as 0, even though targetSnapshot.indexManifest() and totalRecordCount differ. Streaming overwrite readers call readChanges() from the DELTA data manifests, and the streaming path does not load DV indexes, so such a restore can still be skipped by streaming readers. Please include the relevant index/DV changes in the restore transition (or otherwise make the overwrite read/delta counts handle index-only restores) instead of relying on the final indexManifest alone.

for (Map.Entry<FileEntry.Identifier, ManifestEntry> entry : latestEntries.entrySet()) {
if (!targetEntries.containsKey(entry.getKey())) {
ManifestEntry manifestEntry = entry.getValue();
deltaFiles.add(
ManifestEntry.create(
FileKind.DELETE,
manifestEntry.partition(),
manifestEntry.bucket(),
manifestEntry.totalBuckets(),
manifestEntry.file()));
}
}
for (Map.Entry<FileEntry.Identifier, ManifestEntry> entry : targetEntries.entrySet()) {
if (!latestEntries.containsKey(entry.getKey())) {
ManifestEntry manifestEntry = entry.getValue();
deltaFiles.add(
ManifestEntry.create(
FileKind.ADD,
manifestEntry.partition(),
manifestEntry.bucket(),
manifestEntry.totalBuckets(),
manifestEntry.file()));
}
}

Pair<String, Long> baseManifestList =
manifestList.write(manifestFile.write(new ArrayList<>(latestEntries.values())));
Pair<String, Long> deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));
// For row-tracking tables nextRowId must stay monotonic: restoring an older snapshot must
// not move it backwards, otherwise new appends would reuse row ids already assigned by the
// snapshots between the target and the previous latest, breaking the global uniqueness of
// _ROW_ID. Keep the larger of the previous latest and the target nextRowId.
Long nextRowId = maxNextRowId(latest.nextRowId(), targetSnapshot.nextRowId());
Snapshot newSnapshot =
new Snapshot(
latest.id() + 1,
targetSnapshot.schemaId(),
baseManifestList.getKey(),
baseManifestList.getRight(),
deltaManifestList.getKey(),
deltaManifestList.getRight(),
null,
null,
targetSnapshot.indexManifest(),
commitUser,
Long.MAX_VALUE,
CommitKind.OVERWRITE,
System.currentTimeMillis(),
targetSnapshot.totalRecordCount(),
recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles),
null,
targetSnapshot.watermark(),
targetSnapshot.statistics(),
targetSnapshot.properties(),
nextRowId);

// The restore is an overwrite from the previous latest to the target, so the base files,
// delta files and index changes describe the transition the callbacks need. These are
// shared by the pre- and post-commit callbacks below.
List<SimpleFileEntry> baseFiles =
SimpleFileEntry.from(new ArrayList<>(latestEntries.values()));
List<IndexManifestEntry> indexChanges = restoreIndexChanges(latest, targetSnapshot);

// Like a regular commit, run the pre-commit callbacks before the snapshot becomes visible.
// They may veto the restore by throwing (e.g. a chain-table snapshot branch rejects a
// pure-DELETE overwrite that would drop a snapshot partition still anchoring delta
// partitions), in which case the restore snapshot is never created.
commitPreCallbacks.forEach(
callback -> callback.call(baseFiles, deltaFiles, indexChanges, newSnapshot));

boolean success =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This restore path still bypasses the normal pre-commit callbacks. Regular commits call commitPreCallbacks before commitSnapshotImpl, and ChainTableCommitPreCallback uses that hook to reject unsafe pure-DELETE overwrite commits on the snapshot branch. restoreAsLatest also creates an OVERWRITE delta, so restoring a snapshot branch back to an older state can delete snapshot partitions that a normal overwrite would validate and potentially abort. Please run the same pre-callback path (with the restore base files, delta files, index changes, and new snapshot) before making the restore snapshot visible.

commitSnapshotImpl(newSnapshot, new ArrayList<>(PartitionEntry.merge(deltaFiles)));
if (success) {
// Notify the post-commit callbacks so external views stay in sync with the restored
// state (e.g. Iceberg compatibility metadata and chain-table overwrite handling).
CommitCallback.Context context =
new CommitCallback.Context(
baseFiles,
deltaFiles,
indexChanges,
newSnapshot,
newSnapshot.commitIdentifier());
commitCallbacks.forEach(callback -> callback.call(context));
}
return success;
}

/**
* Computes the index file changes between the previous latest snapshot and the restore target,
* mirroring how the data delta files are derived: entries that only exist in the previous
* latest are marked as {@link FileKind#DELETE}, entries that only exist in the target are kept
* as ADD.
*/
private List<IndexManifestEntry> restoreIndexChanges(Snapshot latest, Snapshot target) {
Set<IndexManifestEntry> latestIndexEntries = readIndexEntries(latest.indexManifest());
Set<IndexManifestEntry> targetIndexEntries = readIndexEntries(target.indexManifest());

List<IndexManifestEntry> indexChanges = new ArrayList<>();
for (IndexManifestEntry entry : latestIndexEntries) {
if (!targetIndexEntries.contains(entry)) {
indexChanges.add(entry.toDeleteEntry());
}
}
for (IndexManifestEntry entry : targetIndexEntries) {
if (!latestIndexEntries.contains(entry)) {
indexChanges.add(entry);
}
}
return indexChanges;
}

private Set<IndexManifestEntry> readIndexEntries(@Nullable String indexManifest) {
if (indexManifest == null) {
return Collections.emptySet();
}
return new HashSet<>(indexManifestFile.read(indexManifest));
}

@Nullable
private static Long maxNextRowId(@Nullable Long left, @Nullable Long right) {
if (left == null) {
return right;
}
if (right == null) {
return left;
}
return Math.max(left, right);
}

public void compactManifest() {
int retryCount = 0;
long startMillis = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.table.sink;

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -203,6 +204,20 @@ public void compactManifests() {
commit.compactManifest();
}

public boolean restoreAsLatest(Snapshot targetSnapshot) {
checkCommitted();
boolean success = commit.restoreAsLatest(targetSnapshot);
if (success) {
// Skip automatic expiration for the restore path. Restore-as-latest promises not to
// delete snapshots or tags whose snapshot id is larger than the restored snapshot, but
// the newly committed latest snapshot would otherwise let expiration (e.g. a low
// snapshot.num-retained.max) immediately remove the restored snapshot and the later
// snapshots/tags it is meant to preserve.
maintain(COMMIT_IDENTIFIER, maintainExecutor, false);
}
return success;
}

private void checkCommitted() {
checkState(!batchCommitted, "BatchTableCommit only support one-time committing.");
batchCommitted = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.operation;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
Expand Down Expand Up @@ -594,6 +595,39 @@ private BinaryRow findGroupedPartition(FileStoreTable table, String region, Stri
"Partition " + region + "|" + dt + " not found"));
}

@Test
public void testRestoreRejectedWhenDroppingAnchorSnapshotPartition() throws Exception {
Path tablePath = tablePath("restore_reject_anchor");
createChainTable(tablePath, true);

FileStoreTable snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
FileStoreTable deltaTable = loadTable(tablePath).switchToBranch("delta");

// snapshot branch: an unrelated group (US) first, then the CN anchor.
writeGrouped(snapshotTable, "US", "20250101", "v1"); // snapshot branch snapshot #1
writeGrouped(snapshotTable, "CN", "20250301", "v2"); // snapshot #2 adds CN/20250301

// delta branch: a CN delta that uses CN/20250301 as its only anchor.
writeGrouped(deltaTable, "CN", "20250315", "v3");

// Restoring the snapshot branch back to snapshot #1 would drop CN/20250301, the only anchor
// of the CN/20250315 delta. The pre-commit callback must reject this restore (same as a
// regular overwrite) instead of silently breaking the chain.
FileStoreTable snapshotBranch = loadTable(tablePath).switchToBranch("snapshot");
Snapshot target = snapshotBranch.snapshotManager().snapshot(1);
try (TableCommitImpl commit = snapshotBranch.newCommit(commitUser)) {
assertThatThrownBy(() -> commit.restoreAsLatest(target))
.hasMessageContaining("Snapshot partition cannot be dropped");
}
// The dangerous restore was aborted, so the latest snapshot is unchanged.
assertThat(
loadTable(tablePath)
.switchToBranch("snapshot")
.snapshotManager()
.latestSnapshotId())
.isEqualTo(2L);
}

private Path tablePath(String tableName) {
return new Path(tempDir.toUri().toString(), tableName);
}
Expand Down
Loading
Loading