Skip to content

Commit 70e61d9

Browse files
committed
API,Core: Multi-Table transactions API and support for REST
1 parent 2a06bb5 commit 70e61d9

22 files changed

+2299
-12
lines changed

api/src/main/java/org/apache/iceberg/Transaction.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg;
2020

21+
import java.util.List;
2122
import org.apache.iceberg.exceptions.CommitFailedException;
2223
import org.apache.iceberg.exceptions.ValidationException;
2324

@@ -172,4 +173,14 @@ default ManageSnapshots manageSnapshots() {
172173
* @throws CommitFailedException If the updates cannot be committed due to conflicts.
173174
*/
174175
void commitTransaction();
176+
177+
/** Rolls back any pending changes. */
178+
default void rollback() {
179+
throw new UnsupportedOperationException("Rollback not supported");
180+
}
181+
182+
/** Provides access to the pending changes that are about to be committed. */
183+
default List<PendingUpdate> pendingUpdates() {
184+
throw new UnsupportedOperationException("Pending Updates not supported");
185+
}
175186
}

core/src/main/java/org/apache/iceberg/BaseScan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ protected Schema tableSchema() {
9999
return schema;
100100
}
101101

102-
protected TableScanContext context() {
102+
public TableScanContext context() {
103103
return context;
104104
}
105105

core/src/main/java/org/apache/iceberg/BaseTransaction.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.iceberg.metrics.MetricsReporter;
4444
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
4545
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
46+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
4647
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4748
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
4849
import org.apache.iceberg.util.PropertyUtil;
@@ -107,6 +108,10 @@ public String tableName() {
107108
}
108109

109110
public TableMetadata startMetadata() {
111+
return base;
112+
}
113+
114+
public TableMetadata currentMetadata() {
110115
return current;
111116
}
112117

@@ -426,10 +431,10 @@ private void commitSimpleTransaction() {
426431
throw e;
427432

428433
} catch (PendingUpdateFailedException e) {
429-
cleanUpOnCommitFailure();
434+
rollback();
430435
throw e.wrapped();
431436
} catch (RuntimeException e) {
432-
cleanUpOnCommitFailure();
437+
rollback();
433438
throw e;
434439
}
435440

@@ -468,7 +473,8 @@ private void commitSimpleTransaction() {
468473
}
469474
}
470475

471-
private void cleanUpOnCommitFailure() {
476+
@Override
477+
public void rollback() {
472478
// the commit failed and no files were committed. clean up each update.
473479
Tasks.foreach(updates)
474480
.suppressFailureWhenFinished()
@@ -486,6 +492,11 @@ private void cleanUpOnCommitFailure() {
486492
.run(ops.io()::deleteFile);
487493
}
488494

495+
@Override
496+
public List<PendingUpdate> pendingUpdates() {
497+
return ImmutableList.copyOf(updates);
498+
}
499+
489500
private void applyUpdates(TableOperations underlyingOps) {
490501
if (base != underlyingOps.refresh()) {
491502
// use refreshed the metadata

core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.iceberg;
2020

21+
import java.util.List;
22+
2123
class CommitCallbackTransaction implements Transaction {
2224
static Transaction addCallback(Transaction txn, Runnable callback) {
2325
return new CommitCallbackTransaction(txn, callback);
@@ -121,4 +123,14 @@ public void commitTransaction() {
121123
wrapped.commitTransaction();
122124
callback.run();
123125
}
126+
127+
@Override
128+
public void rollback() {
129+
wrapped.rollback();
130+
}
131+
132+
@Override
133+
public List<PendingUpdate> pendingUpdates() {
134+
return wrapped.pendingUpdates();
135+
}
124136
}

core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
import org.apache.iceberg.Schema;
2929
import org.apache.iceberg.Table;
3030
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
31+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3132
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3233
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
34+
import org.apache.iceberg.rest.RESTCatalogTransaction;
35+
import org.apache.iceberg.rest.RESTSessionCatalog;
3336

3437
public abstract class BaseSessionCatalog implements SessionCatalog {
3538
private final Cache<String, Catalog> catalogs =
@@ -62,7 +65,13 @@ public <T> T withContext(SessionContext context, Function<Catalog, T> task) {
6265
return task.apply(asCatalog(context));
6366
}
6467

65-
public class AsCatalog implements Catalog, SupportsNamespaces {
68+
public void commitTransaction(SessionContext context, List<TableCommit> commits) {
69+
throw new UnsupportedOperationException(
70+
"commitTransaction is not supported by catalog " + name());
71+
}
72+
73+
public class AsCatalog implements Catalog, SupportsNamespaces, SupportsCatalogTransactions {
74+
6675
private final SessionContext context;
6776

6877
private AsCatalog(SessionContext context) {
@@ -159,5 +168,19 @@ public boolean removeProperties(Namespace namespace, Set<String> removals) {
159168
public boolean namespaceExists(Namespace namespace) {
160169
return BaseSessionCatalog.this.namespaceExists(context, namespace);
161170
}
171+
172+
@Override
173+
public CatalogTransaction createTransaction(CatalogTransaction.IsolationLevel isolationLevel) {
174+
Preconditions.checkState(
175+
BaseSessionCatalog.this instanceof RESTSessionCatalog,
176+
"Only RESTSessionCatalog currently supports CatalogTransactions");
177+
178+
return new RESTCatalogTransaction(
179+
this, (RESTSessionCatalog) BaseSessionCatalog.this, context, isolationLevel);
180+
}
181+
182+
public void commitTransaction(List<TableCommit> commits) {
183+
BaseSessionCatalog.this.commitTransaction(context, commits);
184+
}
162185
}
163186
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.catalog;
20+
21+
public interface CatalogTransaction {
22+
23+
enum IsolationLevel {
24+
25+
/**
26+
* All reads that are being made will see the last committed values that existed when the table
27+
* was loaded first inside the catalog transaction. Subsequent changes to a table that happened
28+
* outside the catalog transaction after the table was read will not be seen to prevent <b>read
29+
* skew</b> (reading a table multiple times within the catalog transaction should always return
30+
* the same results). <br>
31+
* <br>
32+
* Will successfully commit only if the values updated by the transaction do not conflict with
33+
* other concurrent updates. <br>
34+
* <br>
35+
*
36+
* <p>Note that under SNAPSHOT isolation a <b>write skew anomaly</b> is acceptable and
37+
* permitted. In a <b>write skew anomaly</b>, two transactions (T1 and T2) concurrently read an
38+
* overlapping data set (e.g. values V1 and V2), concurrently make disjoint updates (e.g. T1
39+
* updates V1, T2 updates V2), and finally concurrently commit, neither having seen the update
40+
* performed by the other.
41+
*/
42+
SNAPSHOT,
43+
44+
/**
45+
* All reads that are being made will see the last committed values that existed when the table
46+
* was loaded first inside the catalog transaction. Subsequent changes to a table that happened
47+
* outside the catalog transaction after the table was read will not be seen to prevent <b>read
48+
* skew</b>.<br>
49+
* <br>
50+
* All tables participating in the transaction must be in the same state when committing
51+
* compared to when the table was loaded first within the catalog transaction.<br>
52+
* <br>
53+
*
54+
* <p>Note that a <b>write skew anomaly</b> is not possible under SERIALIZABLE isolation, where
55+
* two transactions (T1 and T2) concurrently read an overlapping data set (e.g. values V1 and
56+
* V2), concurrently make disjoint updates (e.g. T1 updates V1, T2 updates V2). This is because
57+
* under SERIALIZABLE isolation either T1 or T2 would have to occur first and be visible to the
58+
* other transaction.
59+
*/
60+
SERIALIZABLE;
61+
}
62+
63+
/**
64+
* Performs an atomic commit of all the pending changes across multiple tables. Engine-specific
65+
* implementations must ensure that all pending changes are applied atomically.
66+
*/
67+
void commitTransaction();
68+
69+
/**
70+
* Returns this catalog transaction as a {@link Catalog} API so that any actions that are called
71+
* through this API are participating in this catalog transaction.
72+
*
73+
* @return This catalog transaction as a {@link Catalog} API. Any actions that are called through
74+
* this API are participating in this catalog transaction.
75+
*/
76+
Catalog asCatalog();
77+
78+
/**
79+
* Returns the current {@link IsolationLevel} for this transaction.
80+
*
81+
* @return The {@link IsolationLevel} for this transaction.
82+
*/
83+
IsolationLevel isolationLevel();
84+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.catalog;
20+
21+
import org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel;
22+
23+
public interface SupportsCatalogTransactions {
24+
25+
/**
26+
* Create a new {@link CatalogTransaction} with the given {@link IsolationLevel}.
27+
*
28+
* @param isolationLevel The isolation level to use.
29+
* @return A new {@link CatalogTransaction}.
30+
*/
31+
CatalogTransaction createTransaction(IsolationLevel isolationLevel);
32+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.catalog;
20+
21+
import java.util.List;
22+
import org.apache.iceberg.MetadataUpdate;
23+
import org.apache.iceberg.TableMetadata;
24+
import org.immutables.value.Value;
25+
26+
@Value.Immutable
27+
public interface TableCommit {
28+
TableIdentifier identifier();
29+
30+
TableMetadata base();
31+
32+
List<MetadataUpdate> changes();
33+
}

core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.iceberg.TableMetadata;
4141
import org.apache.iceberg.TableOperations;
4242
import org.apache.iceberg.Transaction;
43+
import org.apache.iceberg.Transactions;
4344
import org.apache.iceberg.catalog.Catalog;
4445
import org.apache.iceberg.catalog.Namespace;
4546
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -49,8 +50,10 @@
4950
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
5051
import org.apache.iceberg.exceptions.NoSuchTableException;
5152
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
53+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
5254
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
5355
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
56+
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
5457
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
5558
import org.apache.iceberg.rest.requests.CreateTableRequest;
5659
import org.apache.iceberg.rest.requests.RenameTableRequest;
@@ -361,4 +364,37 @@ private static TableMetadata commit(TableOperations ops, UpdateTableRequest requ
361364

362365
return ops.current();
363366
}
367+
368+
/**
369+
* This is a very simplistic approach that only validates the requirements for each table and does
370+
* not do any other conflict detection. Therefore, it does not guarantee true transactional
371+
* atomicity, which is left to the implementation details of a REST server.
372+
*/
373+
public static void commitTransaction(Catalog catalog, CommitTransactionRequest request) {
374+
List<Transaction> transactions = Lists.newArrayList();
375+
376+
for (CommitTransactionRequest.CommitTableRequest tableChange : request.tableChanges()) {
377+
Table table = catalog.loadTable(tableChange.identifier());
378+
if (table instanceof BaseTable) {
379+
UpdateTableRequest updateTableRequest =
380+
new UpdateTableRequest(tableChange.requirements(), tableChange.updates());
381+
382+
Transaction transaction =
383+
Transactions.newTransaction(
384+
tableChange.identifier().toString(), ((BaseTable) table).operations());
385+
transactions.add(transaction);
386+
387+
BaseTransaction.TransactionTable txTable =
388+
(BaseTransaction.TransactionTable) transaction.table();
389+
390+
// this performs validations and makes temporary commits that are in-memory
391+
commit(txTable.operations(), updateTableRequest);
392+
} else {
393+
throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
394+
}
395+
}
396+
397+
// only commit if validations passed previously
398+
transactions.forEach(Transaction::commitTransaction);
399+
}
364400
}

0 commit comments

Comments
 (0)