Skip to content

Commit a0e9758

Browse files
Support procedure set_current_snapshot for Iceberg
1 parent 8f8f690 commit a0e9758

File tree

4 files changed

+313
-0
lines changed

4 files changed

+313
-0
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,36 @@ Example::
826826

827827
CALL iceberg.system.rollback_to_timestamp('schema_name', 'table_name', TIMESTAMP '1995-04-26 00:00:00.000');
828828

829+
Set Current Snapshot
830+
^^^^^^^^^^^^^^^^^^^^
831+
832+
This procedure sets a current snapshot ID for a table by using ``snapshot_id`` or ``ref``.
833+
Use either ``snapshot_id`` or ``ref``, but do not use both in the same procedure.
834+
835+
The following arguments are available:
836+
837+
===================== ========== =============== =======================================================================
838+
Argument Name required type Description
839+
===================== ========== =============== =======================================================================
840+
``schema`` ✔️ string Schema of the table to update
841+
842+
``table_name`` ✔️ string Name of the table to update
843+
844+
``snapshot_id`` long Snapshot ID to set as current
845+
846+
``ref`` string Snapshot Reference (branch or tag) to set as current
847+
===================== ========== =============== =======================================================================
848+
849+
Examples:
850+
851+
* Set current table snapshot ID for the given table to 10000 ::
852+
853+
CALL iceberg.system.set_current_snapshot('schema_name', 'table_name', 10000);
854+
855+
* Set current table snapshot ID for the given table to snapshot ID of branch1 ::
856+
857+
CALL iceberg.system.set_current_snapshot(schema => 'schema_name', table_name => 'table_name', ref => 'branch1');
858+
829859
Expire Snapshots
830860
^^^^^^^^^^^^^^^^
831861

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.facebook.presto.iceberg.procedure.RemoveOrphanFiles;
4545
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
4646
import com.facebook.presto.iceberg.procedure.RollbackToTimestampProcedure;
47+
import com.facebook.presto.iceberg.procedure.SetCurrentSnapshotProcedure;
4748
import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure;
4849
import com.facebook.presto.orc.CachingStripeMetadataSource;
4950
import com.facebook.presto.orc.DwrfAwareStripeMetadataSourceFactory;
@@ -158,6 +159,7 @@ public void setup(Binder binder)
158159
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
159160
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);
160161
procedures.addBinding().toProvider(RemoveOrphanFiles.class).in(Scopes.SINGLETON);
162+
procedures.addBinding().toProvider(SetCurrentSnapshotProcedure.class).in(Scopes.SINGLETON);
161163

162164
// for orc
163165
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg.procedure;
15+
16+
import com.facebook.presto.iceberg.IcebergMetadataFactory;
17+
import com.facebook.presto.spi.ConnectorSession;
18+
import com.facebook.presto.spi.SchemaTableName;
19+
import com.facebook.presto.spi.connector.ConnectorMetadata;
20+
import com.facebook.presto.spi.procedure.Procedure;
21+
import com.google.common.collect.ImmutableList;
22+
import org.apache.iceberg.SnapshotRef;
23+
import org.apache.iceberg.Table;
24+
25+
import javax.inject.Inject;
26+
import javax.inject.Provider;
27+
28+
import java.lang.invoke.MethodHandle;
29+
30+
import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle;
31+
import static com.facebook.presto.common.type.StandardTypes.BIGINT;
32+
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
33+
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
34+
import static com.google.common.base.Preconditions.checkState;
35+
import static java.util.Objects.requireNonNull;
36+
37+
public class SetCurrentSnapshotProcedure
38+
implements Provider<Procedure>
39+
{
40+
private static final MethodHandle SET_CURRENT_SNAPSHOT = methodHandle(
41+
SetCurrentSnapshotProcedure.class,
42+
"setCurrentSnapshot",
43+
ConnectorSession.class,
44+
String.class,
45+
String.class,
46+
Long.class,
47+
String.class);
48+
49+
private final IcebergMetadataFactory metadataFactory;
50+
51+
@Inject
52+
public SetCurrentSnapshotProcedure(IcebergMetadataFactory metadataFactory)
53+
{
54+
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
55+
}
56+
57+
@Override
58+
public Procedure get()
59+
{
60+
return new Procedure(
61+
"system",
62+
"set_current_snapshot",
63+
ImmutableList.of(
64+
new Procedure.Argument("schema", VARCHAR),
65+
new Procedure.Argument("table_name", VARCHAR),
66+
new Procedure.Argument("snapshot_id", BIGINT, false, null),
67+
new Procedure.Argument("ref", VARCHAR, false, null)),
68+
SET_CURRENT_SNAPSHOT.bindTo(this));
69+
}
70+
71+
public void setCurrentSnapshot(ConnectorSession clientSession, String schema, String table, Long snapshotId, String reference)
72+
{
73+
checkState((snapshotId != null && reference == null) || (snapshotId == null && reference != null),
74+
"Either snapshot_id or reference must be provided, not both");
75+
SchemaTableName schemaTableName = new SchemaTableName(schema, table);
76+
ConnectorMetadata metadata = metadataFactory.create();
77+
Table icebergTable = getIcebergTable(metadata, clientSession, schemaTableName);
78+
long targetSnapshotId = snapshotId != null ? snapshotId : getSnapshotIdFromReference(icebergTable, reference);
79+
icebergTable.manageSnapshots().setCurrentSnapshot(targetSnapshotId).commit();
80+
}
81+
82+
private long getSnapshotIdFromReference(Table table, String refName)
83+
{
84+
SnapshotRef ref = table.refs().get(refName);
85+
checkState(ref != null, "Cannot find matching snapshot ID for ref " + refName);
86+
return ref.snapshotId();
87+
}
88+
}
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg.procedure;
15+
16+
import com.facebook.presto.iceberg.IcebergConfig;
17+
import com.facebook.presto.iceberg.IcebergQueryRunner;
18+
import com.facebook.presto.testing.QueryRunner;
19+
import com.facebook.presto.tests.AbstractTestQueryFramework;
20+
import com.google.common.collect.ImmutableMap;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.iceberg.CatalogUtil;
23+
import org.apache.iceberg.Table;
24+
import org.apache.iceberg.catalog.Catalog;
25+
import org.apache.iceberg.catalog.TableIdentifier;
26+
import org.apache.iceberg.hadoop.HadoopCatalog;
27+
import org.testng.annotations.Test;
28+
29+
import java.io.File;
30+
import java.nio.file.Path;
31+
import java.util.Map;
32+
33+
import static com.facebook.presto.iceberg.CatalogType.HADOOP;
34+
import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath;
35+
import static java.lang.String.format;
36+
import static java.util.regex.Pattern.quote;
37+
38+
public class TestSetCurrentSnapshotProcedure
39+
extends AbstractTestQueryFramework
40+
{
41+
public static final String ICEBERG_CATALOG = "iceberg";
42+
public static final String TEST_SCHEMA = "tpch";
43+
44+
@Override
45+
protected QueryRunner createQueryRunner()
46+
throws Exception
47+
{
48+
return IcebergQueryRunner.createIcebergQueryRunner(ImmutableMap.of(), HADOOP, ImmutableMap.of());
49+
}
50+
51+
public void createTable(String tableName)
52+
{
53+
assertUpdate("CREATE TABLE IF NOT EXISTS " + tableName + " (id integer, value VARCHAR)");
54+
}
55+
56+
public void dropTable(String tableName)
57+
{
58+
assertQuerySucceeds("DROP TABLE IF EXISTS " + TEST_SCHEMA + "." + tableName);
59+
}
60+
61+
@Test
62+
public void testSetCurrentSnapshotUsingPositionalArgs()
63+
{
64+
String tableName = "test_current_snapshot_table";
65+
createTable(tableName);
66+
try {
67+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
68+
Table table = loadTable(tableName);
69+
70+
long snapShotIdv1 = table.currentSnapshot().snapshotId();
71+
assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a')");
72+
73+
assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'b')", 1);
74+
assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a'), (2, 'b')");
75+
76+
assertUpdate(format("CALL system.set_current_snapshot('%s', '%s', %d)", TEST_SCHEMA, tableName, snapShotIdv1));
77+
// now current table will have only 1 row same as snapShotIdv1
78+
assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a')");
79+
}
80+
finally {
81+
dropTable(tableName);
82+
}
83+
}
84+
85+
@Test
86+
public void testSetCurrentSnapshotUsingNamedArgs()
87+
{
88+
String tableName = "test_named_arg_table";
89+
createTable(tableName);
90+
try {
91+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
92+
Table table = loadTable(tableName);
93+
94+
long snapShotIdv1 = table.currentSnapshot().snapshotId();
95+
assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a')");
96+
97+
assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'b')", 1);
98+
table.refresh();
99+
assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a'), (2, 'b')");
100+
101+
assertUpdate(format("CALL system.set_current_snapshot(snapshot_id => %d, table_name => '%s', schema => '%s')",
102+
snapShotIdv1, tableName, TEST_SCHEMA));
103+
// now current table will have only 1 row same as snapShotIdv1
104+
assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a')");
105+
}
106+
finally {
107+
dropTable(tableName);
108+
}
109+
}
110+
111+
@Test
112+
public void testSetCurrentSnapshotToInvalidSnapshot()
113+
{
114+
String tableName = "invalid_test_table";
115+
createTable(tableName);
116+
try {
117+
assertQueryFails(format("CALL system.set_current_snapshot(snapshot_id => %d, table_name => '%s', schema => '%s')",
118+
-1L, tableName, TEST_SCHEMA),
119+
"Cannot roll back to unknown snapshot id: -1");
120+
assertQueryFails(format("CALL system.set_current_snapshot('%s', '%s', %d)", TEST_SCHEMA, tableName, -1L),
121+
"Cannot roll back to unknown snapshot id: -1");
122+
}
123+
finally {
124+
dropTable(tableName);
125+
}
126+
}
127+
128+
@Test
129+
public void testInvalidRollbackToSnapshotCases()
130+
{
131+
assertQueryFails(format("CALL system.set_current_snapshot(schema => '%s', table_name => '%s', %d)",
132+
TEST_SCHEMA, "tableName", 1),
133+
"line 1:1: Named and positional arguments cannot be mixed");
134+
assertQueryFails("CALL custom.set_current_snapshot('test_schema', 'test_table', 1)",
135+
"Procedure not registered: custom.set_current_snapshot");
136+
assertQueryFails("CALL system.set_current_snapshot('', 'test_table', 1)",
137+
"schemaName is empty");
138+
assertQueryFails("CALL system.set_current_snapshot('test_schema', '', 1)",
139+
"tableName is empty");
140+
assertQueryFails("CALL system.set_current_snapshot('test_schema', 'test_table')",
141+
"Either snapshot_id or reference must be provided, not both");
142+
assertQueryFails("CALL system.set_current_snapshot('test_schema', 'test_table', 1, 'branch1')",
143+
"Either snapshot_id or reference must be provided, not both");
144+
assertQueryFails("CALL system.set_current_snapshot('schema_name', 'table_name', 'branch1')",
145+
quote("line 1:63: Cannot cast type varchar(7) to bigint"));
146+
}
147+
148+
@Test
149+
public void testSetCurrentSnapshotToRef()
150+
{
151+
String tableName = "test_set_snapshot_ref_table";
152+
createTable(tableName);
153+
try {
154+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
155+
Table table = loadTable(tableName);
156+
157+
long snapShotIdv1 = table.currentSnapshot().snapshotId();
158+
table.manageSnapshots().createBranch("ref1", snapShotIdv1).commit();
159+
assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a')");
160+
161+
assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'b')", 1);
162+
table.refresh();
163+
assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a'), (2, 'b')");
164+
165+
assertUpdate(format("CALL system.set_current_snapshot(ref => '%s', table_name => '%s', schema => '%s')",
166+
"ref1", tableName, TEST_SCHEMA));
167+
// now current table will have only 1 row same as ref1
168+
assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a')");
169+
}
170+
finally {
171+
dropTable(tableName);
172+
}
173+
}
174+
175+
private Table loadTable(String tableName)
176+
{
177+
Catalog catalog = CatalogUtil.loadCatalog(HadoopCatalog.class.getName(), ICEBERG_CATALOG, getProperties(), new Configuration());
178+
return catalog.loadTable(TableIdentifier.of(TEST_SCHEMA, tableName));
179+
}
180+
181+
private Map<String, String> getProperties()
182+
{
183+
File metastoreDir = getCatalogDirectory();
184+
return ImmutableMap.of("warehouse", metastoreDir.toString());
185+
}
186+
187+
private File getCatalogDirectory()
188+
{
189+
Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
190+
Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, HADOOP.name(), new IcebergConfig().getFileFormat(), false);
191+
return catalogDirectory.toFile();
192+
}
193+
}

0 commit comments

Comments
 (0)