Skip to content

Commit e066ac7

Browse files
Add support to query Iceberg table by branch/tag name
1 parent 0d681d3 commit e066ac7

File tree

4 files changed

+71
-10
lines changed

4 files changed

+71
-10
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1340,7 +1340,7 @@ Time Travel
13401340
Iceberg and Presto Iceberg connector support time travel via table snapshots
13411341
identified by unique snapshot IDs. The snapshot IDs are stored in the ``$snapshots``
13421342
metadata table. You can rollback the state of a table to a previous snapshot ID.
1343-
It also supports time travel query using VERSION (SYSTEM_VERSION) and TIMESTAMP (SYSTEM_TIME) options.
1343+
It also supports time travel query using SYSTEM_VERSION (VERSION) and SYSTEM_TIME (TIMESTAMP) options.
13441344

13451345
Example Queries
13461346
^^^^^^^^^^^^^^^
@@ -1522,6 +1522,40 @@ In the following query, the expression CURRENT_TIMESTAMP returns the current tim
15221522
10 | united states | 1 | comment
15231523
(1 row)
15241524

1525+
Querying branches and tags
1526+
^^^^^^^^^^^^^^^^^^^^^^^^^^^
1527+
1528+
Iceberg supports branches and tags which are named references to snapshots.
1529+
1530+
Query Iceberg table by specifying the branch name:
1531+
1532+
.. code-block:: sql
1533+
1534+
SELECT * FROM nation FOR SYSTEM_VERSION AS OF 'testBranch';
1535+
1536+
.. code-block:: text
1537+
1538+
nationkey | name | regionkey | comment
1539+
-----------+---------------+-----------+---------
1540+
10 | united states | 1 | comment
1541+
20 | canada | 2 | comment
1542+
30 | mexico | 3 | comment
1543+
(3 rows)
1544+
1545+
Query Iceberg table by specifying the tag name:
1546+
1547+
.. code-block:: sql
1548+
1549+
SELECT * FROM nation FOR SYSTEM_VERSION AS OF 'testTag';
1550+
1551+
.. code-block:: text
1552+
1553+
nationkey | name | regionkey | comment
1554+
-----------+---------------+-----------+---------
1555+
10 | united states | 1 | comment
1556+
20 | canada | 2 | comment
1557+
(3 rows)
1558+
15251559
Type mapping
15261560
------------
15271561

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.facebook.presto.common.type.SqlTimestampWithTimeZone;
2222
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
2323
import com.facebook.presto.common.type.TypeManager;
24+
import com.facebook.presto.common.type.VarcharType;
2425
import com.facebook.presto.hive.HivePartition;
2526
import com.facebook.presto.hive.HiveWrittenPartitions;
2627
import com.facebook.presto.hive.NodeVersion;
@@ -978,8 +979,9 @@ private static long getSnapshotIdForTableVersion(Table table, ConnectorTableVers
978979
throw new PrestoException(NOT_SUPPORTED, "Unsupported table version expression type: " + tableVersion.getVersionExpressionType());
979980
}
980981
if (tableVersion.getVersionType() == VersionType.VERSION) {
982+
long snapshotId;
981983
if (tableVersion.getVersionExpressionType() instanceof BigintType) {
982-
long snapshotId = (long) tableVersion.getTableVersion();
984+
snapshotId = (long) tableVersion.getTableVersion();
983985
if (table.snapshot(snapshotId) == null) {
984986
throw new PrestoException(ICEBERG_INVALID_SNAPSHOT_ID, "Iceberg snapshot ID does not exists: " + snapshotId);
985987
}
@@ -990,6 +992,13 @@ private static long getSnapshotIdForTableVersion(Table table, ConnectorTableVers
990992
return getSnapshotIdTimeOperator(table, table.snapshot(snapshotId).timestampMillis(), VersionOperator.LESS_THAN);
991993
}
992994
}
995+
else if (tableVersion.getVersionExpressionType() instanceof VarcharType) {
996+
String branchOrTagName = ((Slice) tableVersion.getTableVersion()).toStringUtf8();
997+
if (!table.refs().containsKey(branchOrTagName)) {
998+
throw new PrestoException(ICEBERG_INVALID_SNAPSHOT_ID, "Could not find Iceberg table branch or tag: " + branchOrTagName);
999+
}
1000+
return table.refs().get(branchOrTagName).snapshotId();
1001+
}
9931002
throw new PrestoException(NOT_SUPPORTED, "Unsupported table version expression type: " + tableVersion.getVersionExpressionType());
9941003
}
9951004
throw new PrestoException(NOT_SUPPORTED, "Unsupported table version type: " + tableVersion.getVersionType());

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,20 +1539,25 @@ public void testDecimal(boolean decimalVectorReaderEnabled)
15391539
@Test
15401540
public void testRefsTable()
15411541
{
1542-
assertUpdate("CREATE TABLE test_table_references (id BIGINT)");
1543-
assertUpdate("INSERT INTO test_table_references VALUES (0), (1), (2)", 3);
1542+
assertUpdate("CREATE TABLE test_table_references (id1 BIGINT, id2 BIGINT)");
1543+
assertUpdate("INSERT INTO test_table_references VALUES (0, 00), (1, 10), (2, 20)", 3);
15441544

15451545
Table icebergTable = loadTable("test_table_references");
15461546
icebergTable.manageSnapshots().createBranch("testBranch").commit();
15471547

1548-
assertUpdate("INSERT INTO test_table_references VALUES (0), (1), (2)", 3);
1548+
assertUpdate("INSERT INTO test_table_references VALUES (3, 30), (4, 40), (5, 50)", 3);
15491549

15501550
assertEquals(icebergTable.refs().size(), 2);
15511551
icebergTable.manageSnapshots().createTag("testTag", icebergTable.currentSnapshot().snapshotId()).commit();
15521552

15531553
assertEquals(icebergTable.refs().size(), 3);
1554+
assertUpdate("INSERT INTO test_table_references VALUES (6, 60), (7, 70), (8, 80)", 3);
15541555
assertQuery("SELECT count(*) FROM \"test_table_references$refs\"", "VALUES 3");
15551556

1557+
assertQuery("SELECT count(*) FROM test_table_references FOR SYSTEM_VERSION AS OF 'testBranch'", "VALUES 3");
1558+
assertQuery("SELECT count(*) FROM test_table_references FOR SYSTEM_VERSION AS OF 'testTag'", "VALUES 6");
1559+
assertQuery("SELECT count(*) FROM test_table_references FOR SYSTEM_VERSION AS OF 'main'", "VALUES 9");
1560+
15561561
assertQuery("SELECT * from \"test_table_references$refs\" where name = 'testBranch' and type = 'BRANCH'",
15571562
format("VALUES('%s', '%s', %s, %s, %s, %s)",
15581563
"testBranch",
@@ -1570,6 +1575,17 @@ public void testRefsTable()
15701575
icebergTable.refs().get("testTag").maxRefAgeMs(),
15711576
icebergTable.refs().get("testTag").minSnapshotsToKeep(),
15721577
icebergTable.refs().get("testTag").maxSnapshotAgeMs()));
1578+
1579+
// test branch & tag access when schema is changed
1580+
assertUpdate("ALTER TABLE test_table_references DROP COLUMN id2");
1581+
assertUpdate("ALTER TABLE test_table_references ADD COLUMN id2_new BIGINT");
1582+
1583+
// since current table schema is changed from col id2 to id2_new
1584+
assertQuery("SELECT * FROM test_table_references where id1=1", "VALUES(1, NULL)");
1585+
assertQuery("SELECT * FROM test_table_references FOR SYSTEM_VERSION AS OF 'testBranch' where id1=1", "VALUES(1, NULL)");
1586+
// Currently Presto returns current table schema for any previous snapshot access https://github.com/prestodb/presto/issues/23553
1587+
// otherwise querying a tag uses the snapshot's schema https://iceberg.apache.org/docs/nightly/branching/#schema-selection-with-branches-and-tags
1588+
assertQuery("SELECT * FROM test_table_references FOR SYSTEM_VERSION AS OF 'testTag' where id1=1", "VALUES(1, NULL)");
15731589
}
15741590

15751591
@Test

presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableVersion.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ public void testTableVersionMisc()
187187
{
188188
// Alias cases - SYSTEM_TIME and SYSTEM_VERSION
189189
assertQuery("SELECT desc FROM " + tableName1 + " FOR SYSTEM_VERSION AS OF " + tab1VersionId1 + " ORDER BY 1", "VALUES 'aaa'");
190+
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR SYSTEM_VERSION AS OF 'main'", "VALUES 3");
191+
assertQuery("SELECT desc FROM " + tableName1 + " FOR SYSTEM_VERSION AS OF 'main'" + " ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
190192
assertQuery("SELECT desc FROM " + tableName1 + " FOR SYSTEM_TIME AS OF TIMESTAMP " + "'" + tab1Timestamp1 + "'" + " ORDER BY 1", "VALUES 'aaa'");
191193
assertQuery("SELECT desc FROM " + tableName1 + " FOR SYSTEM_TIME AS OF CURRENT_TIMESTAMP ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
192194
assertQuery("SELECT SUM(id) FROM " + tableName1 + " FOR SYSTEM_VERSION AS OF " + tab1VersionId2, "VALUES 3");
@@ -272,10 +274,10 @@ public void testTableVersionMisc()
272274
@Test
273275
public void testTableVersionErrors()
274276
{
275-
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF 100", ".* Type integer is invalid. Supported table version AS OF/BEFORE expression type is BIGINT");
276-
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF 'bad'", ".* Type varchar\\(3\\) is invalid. Supported table version AS OF/BEFORE expression type is BIGINT");
277-
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF CURRENT_DATE", ".* Type date is invalid. Supported table version AS OF/BEFORE expression type is BIGINT");
278-
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF CURRENT_TIMESTAMP", ".* Type timestamp with time zone is invalid. Supported table version AS OF/BEFORE expression type is BIGINT");
277+
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF 100", ".* Type integer is invalid. Supported table version AS OF/BEFORE expression type is BIGINT or VARCHAR");
278+
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF 'bad'", "Could not find Iceberg table branch or tag: bad");
279+
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF CURRENT_DATE", ".* Type date is invalid. Supported table version AS OF/BEFORE expression type is BIGINT or VARCHAR");
280+
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF CURRENT_TIMESTAMP", ".* Type timestamp with time zone is invalid. Supported table version AS OF/BEFORE expression type is BIGINT or VARCHAR");
279281
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF id", ".* cannot be resolved");
280282
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF (SELECT 10000000)", ".* Constant expression cannot contain a subquery");
281283
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF NULL", "Table version AS OF/BEFORE expression cannot be NULL for .*");
@@ -296,7 +298,7 @@ public void testTableVersionErrors()
296298

297299
assertQueryFails("SELECT desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId1 + " ORDER BY 1", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab1\"");
298300
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab2Timestamp1 + "' - INTERVAL '1' MONTH", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab2\"");
299-
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION BEFORE 100", ".* Type integer is invalid. Supported table version AS OF/BEFORE expression type is BIGINT");
301+
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION BEFORE 100", ".* Type integer is invalid. Supported table version AS OF/BEFORE expression type is BIGINT or VARCHAR");
300302
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION BEFORE " + tab2VersionId1 + " - " + tab2VersionId1, "Iceberg snapshot ID does not exists: 0");
301303
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE 'bad'", ".* Type varchar\\(3\\) is invalid. Supported table version AS OF/BEFORE expression type is Timestamp with Time Zone.");
302304
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE NULL", "Table version AS OF/BEFORE expression cannot be NULL for .*");

0 commit comments

Comments
 (0)