Skip to content

Commit

Permalink
Add rollback_to_snapshot table procedure in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jan 7, 2025
1 parent cb8df9d commit f07e642
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 10 deletions.
4 changes: 2 additions & 2 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -1642,11 +1642,11 @@ FROM example.testdb."customer_orders$snapshots"
ORDER BY committed_at DESC LIMIT 1
```

The procedure `system.rollback_to_snapshot` allows the caller to roll back the
The table procedure `rollback_to_snapshot` allows the caller to roll back the
state of the table to a previous snapshot id:

```
CALL example.system.rollback_to_snapshot('testdb', 'customer_orders', 8954597067493422955)
ALTER TABLE testdb.customer_orders EXECUTE rollback_to_snapshot(8954597067493422955)
```

#### `NOT NULL` column constraint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle;
import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle;
import io.trino.plugin.iceberg.procedure.IcebergRemoveOrphanFilesHandle;
import io.trino.plugin.iceberg.procedure.IcebergRollbackToSnapshotHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId;
import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory;
Expand Down Expand Up @@ -326,6 +327,7 @@
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ROLLBACK_TO_SNAPSHOT;
import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFiles;
import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFilesFromTable;
import static io.trino.spi.StandardErrorCode.COLUMN_ALREADY_EXISTS;
Expand Down Expand Up @@ -1582,6 +1584,7 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
return switch (procedureId) {
case OPTIMIZE -> getTableHandleForOptimize(tableHandle, icebergTable, executeProperties, retryMode);
case DROP_EXTENDED_STATS -> getTableHandleForDropExtendedStats(session, tableHandle);
case ROLLBACK_TO_SNAPSHOT -> getTableHandleForRollbackToSnapshot(session, tableHandle, executeProperties);
case EXPIRE_SNAPSHOTS -> getTableHandleForExpireSnapshots(session, tableHandle, executeProperties);
case REMOVE_ORPHAN_FILES -> getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties);
case ADD_FILES -> getTableHandleForAddFiles(session, accessControl, tableHandle, executeProperties);
Expand Down Expand Up @@ -1752,6 +1755,19 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForAddFilesFromTable
icebergTable.io().properties()));
}

private Optional<ConnectorTableExecuteHandle> getTableHandleForRollbackToSnapshot(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
{
long snapshotId = (long) executeProperties.get("snapshot_id");
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());

return Optional.of(new IcebergTableExecuteHandle(
tableHandle.getSchemaTableName(),
ROLLBACK_TO_SNAPSHOT,
new IcebergRollbackToSnapshotHandle(snapshotId),
icebergTable.location(),
icebergTable.io().properties()));
}

private static Object requireProcedureArgument(Map<String, Object> properties, String name)
{
Object value = properties.get(name);
Expand All @@ -1767,6 +1783,7 @@ public Optional<ConnectorTableLayout> getLayoutForTableExecute(ConnectorSession
case OPTIMIZE:
return getLayoutForOptimize(session, executeHandle);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down Expand Up @@ -1796,6 +1813,7 @@ public BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle
case OPTIMIZE:
return beginOptimize(session, executeHandle, table);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down Expand Up @@ -1841,6 +1859,7 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa
finishOptimize(session, executeHandle, fragments, splitSourceInfo);
return;
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down Expand Up @@ -1974,6 +1993,9 @@ public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteH
case DROP_EXTENDED_STATS:
executeDropExtendedStats(session, executeHandle);
return;
case ROLLBACK_TO_SNAPSHOT:
executeRollbackToSnapshot(session, executeHandle);
return;
case EXPIRE_SNAPSHOTS:
executeExpireSnapshots(session, executeHandle);
return;
Expand Down Expand Up @@ -2006,6 +2028,15 @@ private void executeDropExtendedStats(ConnectorSession session, IcebergTableExec
transaction = null;
}

private void executeRollbackToSnapshot(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
{
checkArgument(executeHandle.procedureHandle() instanceof IcebergRollbackToSnapshotHandle, "Unexpected procedure handle %s", executeHandle.procedureHandle());
long snapshotId = ((IcebergRollbackToSnapshotHandle) executeHandle.procedureHandle()).snapshotId();

Table icebergTable = catalog.loadTable(session, executeHandle.schemaTableName());
icebergTable.manageSnapshots().setCurrentSnapshot(snapshotId).commit();
}

private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
{
IcebergExpireSnapshotsHandle expireSnapshotsHandle = (IcebergExpireSnapshotsHandle) executeHandle.procedureHandle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.trino.plugin.iceberg.procedure.RegisterTableProcedure;
import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotProcedure;
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotTableProcedure;
import io.trino.plugin.iceberg.procedure.UnregisterTableProcedure;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
Expand Down Expand Up @@ -131,6 +132,7 @@ public void configure(Binder binder)
Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(DropExtendedStatsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RollbackToSnapshotTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(ExpireSnapshotsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
typeManager,
pageSorter);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
property = "@type")
@JsonSubTypes({
@JsonSubTypes.Type(value = IcebergDropExtendedStatsHandle.class, name = "drop_extended_stats"),
@JsonSubTypes.Type(value = IcebergRollbackToSnapshotHandle.class, name = "rollback_to_snapshot"),
@JsonSubTypes.Type(value = IcebergExpireSnapshotsHandle.class, name = "expire_snapshots"),
@JsonSubTypes.Type(value = IcebergOptimizeHandle.class, name = "optimize"),
@JsonSubTypes.Type(value = IcebergRemoveOrphanFilesHandle.class, name = "remove_orphan_files"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

public record IcebergRollbackToSnapshotHandle(long snapshotId)
implements IcebergProcedureHandle {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public enum IcebergTableProcedureId
{
OPTIMIZE,
DROP_EXTENDED_STATS,
ROLLBACK_TO_SNAPSHOT,
EXPIRE_SNAPSHOTS,
REMOVE_ORPHAN_FILES,
ADD_FILES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static java.lang.invoke.MethodHandles.lookup;
import static java.util.Objects.requireNonNull;

@Deprecated
public class RollbackToSnapshotProcedure
implements Provider<Procedure>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import com.google.common.collect.ImmutableList;
import com.google.inject.Provider;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.session.PropertyMetadata;

import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ROLLBACK_TO_SNAPSHOT;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
import static io.trino.spi.session.PropertyMetadata.longProperty;

public class RollbackToSnapshotTableProcedure
implements Provider<TableProcedureMetadata>
{
@Override
public TableProcedureMetadata get()
{
return new TableProcedureMetadata(
ROLLBACK_TO_SNAPSHOT.name(),
coordinatorOnly(),
ImmutableList.<PropertyMetadata<?>>builder()
.add(longProperty(
"snapshot_id",
"Snapshot ID",
null,
false))
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,13 @@ public void testTableComments()

@Test
public void testRollbackSnapshot()
{
testRollbackSnapshot("ALTER TABLE tpch.test_rollback EXECUTE rollback_to_snapshot(%s)");
testRollbackSnapshot("ALTER TABLE tpch.test_rollback EXECUTE rollback_to_snapshot(snapshot_id => %s)");
testRollbackSnapshot("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)");
}

private void testRollbackSnapshot(String rollbackToSnapshotFormat)
{
assertUpdate("CREATE TABLE test_rollback (col0 INTEGER, col1 BIGINT)");
long afterCreateTableId = getCurrentSnapshotId("test_rollback");
Expand All @@ -1685,16 +1692,16 @@ public void testRollbackSnapshot()
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

// Check that rollback_to_snapshot can be executed also when it does not do any changes
assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterFirstInsertId));
assertUpdate(format(rollbackToSnapshotFormat, afterFirstInsertId));
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (456, CAST(654 AS BIGINT))", 1);
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT))");

assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterFirstInsertId));
assertUpdate(format(rollbackToSnapshotFormat, afterFirstInsertId));
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterCreateTableId));
assertUpdate(format(rollbackToSnapshotFormat, afterCreateTableId));
assertThat((long) computeActual("SELECT COUNT(*) FROM test_rollback").getOnlyValue()).isEqualTo(0);

assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (789, CAST(987 AS BIGINT))", 1);
Expand All @@ -1703,7 +1710,7 @@ public void testRollbackSnapshot()
// extra insert which should be dropped on rollback
assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (999, CAST(999 AS BIGINT))", 1);

assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterSecondInsertId));
assertUpdate(format("ALTER TABLE tpch.test_rollback EXECUTE rollback_to_snapshot(%s)", afterSecondInsertId));
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (789, CAST(987 AS BIGINT))");

assertUpdate("DROP TABLE test_rollback");
Expand Down Expand Up @@ -7657,7 +7664,7 @@ public void testCorruptedTableLocation()
assertQueryFails("TRUNCATE TABLE " + tableName, "Metadata not found in metadata location for table " + schemaTableName);
assertQueryFails("COMMENT ON TABLE " + tableName + " IS NULL", "Metadata not found in metadata location for table " + schemaTableName);
assertQueryFails("COMMENT ON COLUMN " + tableName + ".foo IS NULL", "Metadata not found in metadata location for table " + schemaTableName);
assertQueryFails("CALL iceberg.system.rollback_to_snapshot(CURRENT_SCHEMA, '" + tableName + "', 8954597067493422955)", "Metadata not found in metadata location for table " + schemaTableName);
assertQueryFails("ALTER TABLE " + tableName + " EXECUTE rollback_to_snapshot(8954597067493422955)", "Metadata not found in metadata location for table " + schemaTableName);

// Avoid failing metadata queries
assertQuery("SHOW TABLES LIKE 'test_corrupted_table_location_%' ESCAPE '\\'", "VALUES '" + tableName + "'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public void testMaterializedViewOnTableRolledBack()

// Base MV on a snapshot "in the future"
assertUpdate("REFRESH MATERIALIZED VIEW mv_on_rolled_back_the_mv", 1);
assertUpdate(format("CALL system.rollback_to_snapshot(CURRENT_SCHEMA, 'mv_on_rolled_back_base_table', %s)", firstSnapshot));
assertUpdate(format("ALTER TABLE mv_on_rolled_back_base_table EXECUTE rollback_to_snapshot(%s)", firstSnapshot));

// View still can be queried
assertThat(query("TABLE mv_on_rolled_back_the_mv"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ public void testAnalyzeAndRollbackToSnapshot()
(null, null, null, null, 26, null, null)
""");

assertUpdate(format("CALL system.rollback_to_snapshot('%s', '%s', %s)", schema, tableName, createSnapshot));
assertUpdate(format("ALTER TABLE %s.%s EXECUTE rollback_to_snapshot(%s)", schema, tableName, createSnapshot));
// NDV information still present after rollback_to_snapshot
assertQuery(
"SHOW STATS FOR " + tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public void testRollbackToSnapshot()
Thread.sleep(1);
onTrino().executeQuery(format("INSERT INTO %s VALUES 2", tableName));
long snapshotId = getSecondOldestTableSnapshot(tableName);
onTrino().executeQuery(format("call system.rollback_to_snapshot('default', '%s', %d)", tableName, snapshotId));
onTrino().executeQuery(format("ALTER TABLE %s EXECUTE rollback_to_snapshot(%d)", tableName, snapshotId));
assertThat(onTrino().executeQuery(format("SELECT * FROM %s", tableName)))
.containsOnly(row(1));
onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName));
Expand Down

0 comments on commit f07e642

Please sign in to comment.