25
25
import org .apache .iceberg .AppendFiles ;
26
26
import org .apache .iceberg .BaseTable ;
27
27
import org .apache .iceberg .BaseTransaction ;
28
+ import org .apache .iceberg .DataTableScan ;
28
29
import org .apache .iceberg .DeleteFiles ;
29
30
import org .apache .iceberg .ExpireSnapshots ;
30
31
import org .apache .iceberg .HasTableOperations ;
37
38
import org .apache .iceberg .RewriteFiles ;
38
39
import org .apache .iceberg .RewriteManifests ;
39
40
import org .apache .iceberg .RowDelta ;
41
+ import org .apache .iceberg .SnapshotRef ;
40
42
import org .apache .iceberg .Table ;
41
43
import org .apache .iceberg .TableMetadata ;
42
44
import org .apache .iceberg .TableMetadataDiffAccess ;
43
45
import org .apache .iceberg .TableOperations ;
46
+ import org .apache .iceberg .TableScan ;
44
47
import org .apache .iceberg .Transaction ;
45
48
import org .apache .iceberg .Transactions ;
46
49
import org .apache .iceberg .UpdateLocation ;
59
62
import org .apache .iceberg .relocated .com .google .common .collect .Maps ;
60
63
import org .apache .iceberg .rest .requests .UpdateTableRequest ;
61
64
import org .apache .iceberg .util .Tasks ;
65
+ import org .immutables .value .Value ;
62
66
63
67
public class RESTCatalogTransaction implements CatalogTransaction {
64
68
private final Map <TableIdentifier , Transaction > txByTable ;
65
- private final Map <TableIdentifier , TableMetadata > initiallyReadTableMetadata ;
69
+ private final Map <TableRef , TableMetadata > initiallyReadTableMetadataByRef ;
66
70
private final Map <TableIdentifier , Table > initiallyReadTables ;
67
71
private final IsolationLevel isolationLevel ;
68
72
private final Catalog origin ;
@@ -83,17 +87,17 @@ public RESTCatalogTransaction(
83
87
this .isolationLevel = isolationLevel ;
84
88
this .sessionCatalog = sessionCatalog ;
85
89
this .context = context ;
86
- this .txByTable = Maps .newConcurrentMap ();
87
- this .initiallyReadTableMetadata = Maps .newConcurrentMap ();
88
- this .initiallyReadTables = Maps .newConcurrentMap ();
90
+ this .txByTable = Maps .newHashMap ();
91
+ this .initiallyReadTableMetadataByRef = Maps .newHashMap ();
92
+ this .initiallyReadTables = Maps .newHashMap ();
89
93
}
90
94
91
95
@ Override
92
96
public void commitTransaction () {
93
97
Preconditions .checkState (!hasCommitted , "Transaction has already committed changes" );
94
98
95
99
try {
96
- Map <TableIdentifier , List <PendingUpdate >> pendingUpdatesByTable = Maps .newConcurrentMap ();
100
+ Map <TableIdentifier , List <PendingUpdate >> pendingUpdatesByTable = Maps .newHashMap ();
97
101
txByTable .forEach ((key , value ) -> pendingUpdatesByTable .put (key , value .pendingUpdates ()));
98
102
Map <TableIdentifier , UpdateTableRequest > updatesByTable = Maps .newHashMap ();
99
103
@@ -153,33 +157,48 @@ public void commitTransaction() {
153
157
* happens due to a transaction taking action based on an outdated premise (a fact that was true
154
158
* when a table was initially loaded but then changed due to a concurrent update to the table
155
159
* while this TX was in-progress). When this TX wants to commit, the original premise might not
156
- * hold anymore, thus we need to check whether {@link TableMetadata} changed after it was
157
- * initially read inside this TX.
160
+ * hold anymore, thus we need to check whether the {@link org.apache.iceberg.Snapshot} a branch
161
+ * was pointing to changed after it was initially read inside this TX. If no information a
162
+ * branch's snapshot is available, we check whether {@link TableMetadata} changed after it was
163
+ * initially read.
158
164
*/
159
165
private void validateSerializableIsolation () {
160
- for (TableIdentifier readTable : initiallyReadTableMetadata .keySet ()) {
166
+ for (TableRef readTable : initiallyReadTableMetadataByRef .keySet ()) {
161
167
// we need to check all read tables to determine whether they changed outside the catalog
162
- // TX after we initially read them
168
+ // TX after we initially read them on a particular branch
163
169
if (IsolationLevel .SERIALIZABLE == isolationLevel ) {
164
- TableMetadata currentTableMetadata =
165
- ((BaseTable ) origin .loadTable (readTable )).operations ().current ();
166
-
167
- if (!currentTableMetadata
168
- .metadataFileLocation ()
169
- .equals (initiallyReadTableMetadata .get (readTable ).metadataFileLocation ())) {
170
+ BaseTable table = (BaseTable ) origin .loadTable (readTable .identifier ());
171
+ SnapshotRef snapshotRef = table .operations ().current ().ref (readTable .ref ());
172
+ SnapshotRef snapshotRefInsideTx =
173
+ initiallyReadTableMetadataByRef .get (readTable ).ref (readTable .ref ());
174
+
175
+ if (null != snapshotRef
176
+ && null != snapshotRefInsideTx
177
+ && snapshotRef .snapshotId () != snapshotRefInsideTx .snapshotId ()) {
170
178
throw new ValidationException (
171
- "%s isolation violation: Found table metadata updates to table '%s' after it was read" ,
172
- isolationLevel (), readTable );
179
+ "%s isolation violation: Found table metadata updates to table '%s' after it was read on branch '%s'" ,
180
+ isolationLevel (), readTable .identifier ().toString (), readTable .ref ());
181
+ }
182
+
183
+ if (null == snapshotRef || null == snapshotRefInsideTx ) {
184
+ TableMetadata currentTableMetadata = table .operations ().current ();
185
+
186
+ if (!currentTableMetadata
187
+ .metadataFileLocation ()
188
+ .equals (initiallyReadTableMetadataByRef .get (readTable ).metadataFileLocation ())) {
189
+ throw new ValidationException (
190
+ "%s isolation violation: Found table metadata updates to table '%s' after it was read" ,
191
+ isolationLevel (), readTable .identifier ());
192
+ }
173
193
}
174
194
}
175
195
}
176
196
}
177
197
178
- @ Override
179
- public void rollback () {
198
+ private void rollback () {
180
199
Tasks .foreach (txByTable .values ()).run (Transaction ::rollback );
181
200
txByTable .clear ();
182
- initiallyReadTableMetadata .clear ();
201
+ initiallyReadTableMetadataByRef .clear ();
183
202
initiallyReadTables .clear ();
184
203
}
185
204
@@ -246,19 +265,18 @@ public Table loadTable(TableIdentifier identifier) {
246
265
// we need to remember the very first version of table metadata that
247
266
// we read
248
267
if (IsolationLevel .SERIALIZABLE == isolationLevel ()) {
249
- initiallyReadTableMetadata .computeIfAbsent (
250
- identifier ,
251
- ident -> ((BaseTable ) loadTable ).operations ().current ());
268
+ initiallyReadTableMetadataByRef .computeIfAbsent (
269
+ ImmutableTableRef .builder ()
270
+ .identifier (identifier )
271
+ .ref (SnapshotRef .MAIN_BRANCH )
272
+ .build (),
273
+ ident -> opsFromTable (loadTable ).current ());
252
274
}
253
275
254
276
return loadTable ;
255
277
}));
256
278
257
- TableOperations tableOps =
258
- table instanceof BaseTransaction .TransactionTable
259
- ? ((BaseTransaction .TransactionTable ) table ).operations ()
260
- : ((BaseTable ) table ).operations ();
261
- return new TransactionalTable (table , tableOps );
279
+ return new TransactionalTable (table , opsFromTable (table ));
262
280
}
263
281
264
282
@ Override
@@ -277,6 +295,12 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {
277
295
}
278
296
}
279
297
298
+ private static TableOperations opsFromTable (Table table ) {
299
+ return table instanceof BaseTransaction .TransactionTable
300
+ ? ((BaseTransaction .TransactionTable ) table ).operations ()
301
+ : ((BaseTable ) table ).operations ();
302
+ }
303
+
280
304
private class TransactionalTable extends BaseTable {
281
305
private final Table table ;
282
306
@@ -285,6 +309,16 @@ private TransactionalTable(Table table, TableOperations ops) {
285
309
this .table = table ;
286
310
}
287
311
312
+ @ Override
313
+ public TableScan newScan () {
314
+ TableScan tableScan = super .newScan ();
315
+ if (tableScan instanceof DataTableScan ) {
316
+ return new TransactionalTableScan ((DataTableScan ) tableScan );
317
+ }
318
+
319
+ return tableScan ;
320
+ }
321
+
288
322
@ Override
289
323
public UpdateSchema updateSchema () {
290
324
return txForTable (table ).updateSchema ();
@@ -365,4 +399,39 @@ public ManageSnapshots manageSnapshots() {
365
399
return txForTable (table ).manageSnapshots ();
366
400
}
367
401
}
402
+
403
+ private class TransactionalTableScan extends DataTableScan {
404
+ protected TransactionalTableScan (DataTableScan delegate ) {
405
+ super (delegate .table (), delegate .schema (), delegate .context ());
406
+ }
407
+
408
+ @ Override
409
+ public TableScan useRef (String name ) {
410
+ DataTableScan tableScan = (DataTableScan ) super .useRef (name );
411
+
412
+ if (IsolationLevel .SERIALIZABLE == isolationLevel ()) {
413
+ // store which version of the table on the given branch we read the first time
414
+ initiallyReadTableMetadataByRef .computeIfAbsent (
415
+ ImmutableTableRef .builder ()
416
+ .identifier (identifierWithoutCatalog (table ().name ()))
417
+ .ref (name )
418
+ .build (),
419
+ ident -> opsFromTable (table ()).current ());
420
+ }
421
+
422
+ return tableScan ;
423
+ }
424
+ }
425
+
426
+ @ Value .Immutable
427
+ interface TableRef {
428
+ TableIdentifier identifier ();
429
+
430
+ String ref ();
431
+
432
+ @ Value .Lazy
433
+ default String name () {
434
+ return identifier ().toString () + "@" + ref ();
435
+ }
436
+ }
368
437
}
0 commit comments