Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit f82d780

Browse files
authoredJun 19, 2025··
HIVE-28976 : Enhance Commit message in notification_log (#5841)
to correctly filter events during incremental replication Details: * Added write ID, database name into commit & abort message * Updated filter in CommitTxnHandler to utilise it during replication dump process Testing: * Added test cases * Tested on live cluster
1 parent 1cc9fd4 commit f82d780

File tree

17 files changed

+405
-48
lines changed

17 files changed

+405
-48
lines changed
 

‎hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGen
668668
return;
669669
}
670670
CommitTxnMessage msg =
671-
MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId());
671+
MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId(), commitTxnEvent.getDatabases(), commitTxnEvent.getWriteId());
672672

673673
NotificationEvent event =
674674
new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(),
@@ -688,7 +688,7 @@ public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenera
688688
return;
689689
}
690690
AbortTxnMessage msg =
691-
MessageBuilder.getInstance().buildAbortTxnMessage(abortTxnEvent.getTxnId(), abortTxnEvent.getDbsUpdated());
691+
MessageBuilder.getInstance().buildAbortTxnMessage(abortTxnEvent.getTxnId(), abortTxnEvent.getDbsUpdated(), abortTxnEvent.getWriteId());
692692
NotificationEvent event =
693693
new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(),
694694
msgEncoder.getSerializer().serialize(msg));

‎itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationFilterTransactions.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,10 @@ public void setup() throws Throwable {
272272
PrimaryEventListenerTestImpl.reset();
273273
ReplicaEventListenerTestImpl.reset();
274274

275-
// Each test always has 8 openTxns, 6 commitTxn, and 2 abortTxns.
275+
// Each test always has 9 openTxns, 7 commitTxn, and 2 abortTxns.
276276
// Note that this is the number that was done on the primary,
277277
// and some are done on non-replicated database.
278-
expected = new EventCount(8, 6, 2);
278+
expected = new EventCount(9, 7, 2);
279279
}
280280

281281
static void updateTxnMapping(Map<Long, Long> map) throws Exception {
@@ -323,8 +323,11 @@ private void prepareBootstrapData() throws Throwable {
323323
.run("insert into t999 values (99908)")
324324
.run("insert into t999 values (99909)")
325325
.run("insert into t999 values (99910)")
326-
.run("drop table t999");
327-
txnOffset = 10;
326+
.run("drop table t999")
327+
.run("create table t10 (id int) clustered by(id) into 3 buckets stored as orc " +
328+
"tblproperties (\"transactional\"=\"true\")")
329+
.run("insert into t10 values (10)");
330+
txnOffset = 11;
328331

329332
// primaryDbName is replicated, t2 and t2 are ACID tables with initial data.
330333
// t3 is an ACID table with 2 initial rows, later t3 will be locked to force aborted transaction.
@@ -400,7 +403,8 @@ private void prepareIncrementalData() throws Throwable {
400403
primary.run("use " + primaryDbName)
401404
.run("insert into t1 values (2), (3)")
402405
.run("insert into t2 partition(country='india') values ('chennai')")
403-
.run("insert into t2 partition(country='india') values ('pune')");
406+
.run("insert into t2 partition(country='india') values ('pune')")
407+
.run("truncate table t10");
404408
prepareAbortTxn(primaryDbName, 222);
405409
primary.run("use " + otherDbName)
406410
.run("insert into t1 values (200), (300)")
@@ -481,14 +485,14 @@ private void assertTxnOptimization(boolean optimizationOn, WarehouseInstance.Tup
481485

482486
// Assert the number of Txn events that occurred on the replica.
483487
// When optimization is on, filtered has the number of Txn events that are expected to have been filtered.
484-
// When optimization is off, filtered should be all all 0s.
488+
// When optimization is off, filtered should be all 0s.
485489
Assert.assertEquals(expected.getCountOpenTxn() - filtered.getCountOpenTxn(), ReplicaEventListenerTestImpl.getCountOpenTxn());
486490
Assert.assertEquals(expected.getCountCommitTxn() - filtered.getCountCommitTxn(), ReplicaEventListenerTestImpl.getCountCommitTxn());
487491
Assert.assertEquals(expected.getCountAbortTxn() - filtered.getCountAbortTxn(), ReplicaEventListenerTestImpl.getCountAbortTxn());
488492

489493
// Assert the number of Txn event files found.
490494
// When optimization is on, filtered has the number of Txn events that are expected to have been filtered.
491-
// When optimization is off, filtered should be all all 0s.
495+
// When optimization is off, filtered should be all 0s.
492496
// Note that when optimization is on, there should never be optnTxn events.
493497
Assert.assertEquals(optimizationOn ? 0 : expected.getCountOpenTxn(), openTxns.size());
494498
Assert.assertEquals(expected.getCountCommitTxn() - filtered.getCountCommitTxn(), commitTxns.size());
@@ -501,5 +505,9 @@ private void assertTxnOptimization(boolean optimizationOn, WarehouseInstance.Tup
501505
for (Map.Entry<Long, Long> mapping : replicaTxnMapping.entrySet()) {
502506
Assert.assertEquals(mapping.getKey().longValue() - txnOffset, mapping.getValue().longValue());
503507
}
508+
Map<Long, Long> postReplicationReplTxnMap = new HashMap<>();
509+
// In both the cases, the post replication REPL_TXN_MAP should be empty.
510+
TestReplicationFilterTransactions.updateTxnMapping(postReplicationReplTxnMap);
511+
Assert.assertEquals(0, postReplicationReplTxnMap.size());
504512
}
505513
}

‎ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.ArrayList;
3232
import java.util.List;
33+
import java.util.stream.Collectors;
3334

3435
class AbortTxnHandler extends AbstractEventHandler<AbortTxnMessage> {
3536

@@ -50,8 +51,12 @@ public void handle(Context withinContext) throws Exception {
5051

5152
if (ReplUtils.filterTransactionOperations(withinContext.hiveConf)) {
5253
String contextDbName = StringUtils.normalizeIdentifier(withinContext.replScope.getDbName());
53-
JSONAbortTxnMessage abortMsg = (JSONAbortTxnMessage)eventMessage;
54-
if ((abortMsg.getDbsUpdated() == null) || !abortMsg.getDbsUpdated().contains(contextDbName)) {
54+
List<Long> writeIds = eventMessage.getWriteIds();
55+
List<String> dbsUpdated = eventMessage.getDbsUpdated()
56+
.stream()
57+
.map(StringUtils::normalizeIdentifier)
58+
.collect(Collectors.toList());
59+
if ((writeIds == null || writeIds.isEmpty() || !dbsUpdated.contains(contextDbName))) {
5560
LOG.info("Filter out #{} ABORT_TXN message : {}", fromEventId(), eventMessageAsJSON);
5661
return;
5762
}

‎ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
2828
import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
2929
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
30+
import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
3031
import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
3132
import org.apache.hadoop.hive.metastore.utils.StringUtils;
3233
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -43,7 +44,9 @@
4344
import java.io.File;
4445
import java.io.IOException;
4546
import java.util.ArrayList;
47+
import java.util.Collections;
4648
import java.util.List;
49+
import java.util.Optional;
4750
import java.util.stream.Collectors;
4851

4952
class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
@@ -173,20 +176,48 @@ public void handle(Context withinContext) throws Exception {
173176
}
174177

175178
List<WriteEventInfo> writeEventInfoList = null;
179+
List<WriteEventInfo> allWriteEventInfoExceptMV = null;
176180
if (replicatingAcidEvents) {
177181
writeEventInfoList = getAllWriteEventInfo(withinContext);
178182

179-
if (ReplUtils.filterTransactionOperations(withinContext.hiveConf)
180-
&& (writeEventInfoList == null || writeEventInfoList.size() == 0)) {
181-
// If optimizing transactions, no need to dump this one
182-
// if there were no write events.
183-
return;
183+
if (writeEventInfoList != null) {
184+
allWriteEventInfoExceptMV = getAllWriteEventInfoExceptMV(writeEventInfoList);
185+
}
186+
String dbName = StringUtils.normalizeIdentifier(withinContext.replScope.getDbName());
187+
188+
if (ReplUtils.filterTransactionOperations(withinContext.hiveConf)) {
189+
List<Long> writeIds = eventMessage.getWriteIds();
190+
List<String> databases = Optional.ofNullable(eventMessage.getDatabases())
191+
.orElse(Collections.emptyList())
192+
.stream()
193+
.map(StringUtils::normalizeIdentifier)
194+
.collect(Collectors.toList());
195+
196+
// Truth Table
197+
// Operation | writeIds | writeEventInfoList | databases | allWriteEventInfoExceptMV | Output
198+
// Read | null | null | null | same as writeEventInfoList | Skip
199+
// Insert | not null | not null | not null | same | Dump
200+
// Truncate | not null | null | not null | same | Dump
201+
// Materialized view | not null | not null | not null | different | Skip
202+
203+
boolean shouldSkip = (writeIds == null || writeIds.isEmpty() || !databases.contains(dbName));
204+
if (writeEventInfoList != null && !writeEventInfoList.isEmpty()) {
205+
shouldSkip = writeEventInfoList.size() != allWriteEventInfoExceptMV.size();
206+
}
207+
208+
if (shouldSkip) {
209+
// If optimizing transactions, no need to dump this one
210+
// if there were no write events.
211+
LOG.debug("skipping commit txn event for db: {}, writeIds: {}, writeEventInfoList: {}, databases: {}",
212+
dbName, writeIds, writeEventInfoList, databases);
213+
return;
214+
}
184215
}
185216
}
186217

187-
// Filtering out all write event i related to materialized view
218+
// Filtering out all write event info related to materialized view
188219
if (writeEventInfoList != null) {
189-
writeEventInfoList = getAllWriteEventInfoExceptMV(writeEventInfoList);
220+
writeEventInfoList = allWriteEventInfoExceptMV;
190221
}
191222
int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0);
192223
if (numEntry != 0) {

‎standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,32 @@ public class AbortTxnEvent extends ListenerEvent {
3737
private final Long txnId;
3838
private final TxnType txnType;
3939
private final List<String> dbsUpdated;
40+
private final List<Long> writeId;
4041

4142
public AbortTxnEvent(Long transactionId, IHMSHandler handler) {
42-
this(transactionId, null, handler, null);
43+
this(transactionId, null, handler, null, null);
4344
}
4445

4546
public AbortTxnEvent(Long transactionId, TxnType txnType) {
46-
this(transactionId, txnType, null, null);
47+
this(transactionId, txnType, null, null, null);
4748
}
4849

4950
/**
5051
* @param transactionId Unique identification for the transaction that got rolledback.
5152
* @param txnType type of transaction
5253
* @param handler handler that is firing the event
5354
* @param dbsUpdated list of databases that had update events
55+
* @param writeId write id for transaction
5456
*/
55-
public AbortTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler, List<String> dbsUpdated) {
57+
public AbortTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler, List<String> dbsUpdated, List<Long> writeId) {
5658
super(true, handler);
5759
this.txnId = transactionId;
5860
this.txnType = txnType;
5961
this.dbsUpdated = new ArrayList<String>();
6062
if (dbsUpdated != null) {
61-
this.dbsUpdated.addAll(dbsUpdated);;
63+
this.dbsUpdated.addAll(dbsUpdated);
6264
}
65+
this.writeId = writeId == null ? new ArrayList<>() : writeId;
6366
}
6467

6568
/**
@@ -83,4 +86,11 @@ public TxnType getTxnType() {
8386
public List<String> getDbsUpdated() {
8487
return dbsUpdated;
8588
}
89+
90+
/**
91+
* @return List of write ids which are associated with abort txn
92+
*/
93+
public List<Long> getWriteId() {
94+
return writeId;
95+
}
8696
}

‎standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.hadoop.hive.metastore.IHMSHandler;
2424
import org.apache.hadoop.hive.metastore.api.TxnType;
2525

26+
import java.util.List;
27+
2628
/**
2729
* CommitTxnEvent
2830
* Event generated for commit transaction operation
@@ -33,24 +35,30 @@ public class CommitTxnEvent extends ListenerEvent {
3335

3436
private final Long txnId;
3537
private final TxnType txnType;
38+
private final List<Long> writeId;
39+
private final List<String> databases;
3640

3741
public CommitTxnEvent(Long transactionId, IHMSHandler handler) {
38-
this(transactionId, null, handler);
42+
this(transactionId, null, handler, null, null);
3943
}
4044

4145
public CommitTxnEvent(Long transactionId, TxnType txnType) {
42-
this(transactionId, txnType, null);
46+
this(transactionId, txnType, null, null, null);
4347
}
4448

4549
/**
4650
* @param transactionId Unique identification for the transaction just got committed.
4751
* @param txnType type of transaction
4852
* @param handler handler that is firing the event
53+
* @param databases list of databases for which commit txn event is fired
54+
* @param writeId write id for transaction
4955
*/
50-
public CommitTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler) {
56+
public CommitTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler, List<String> databases, List<Long> writeId) {
5157
super(true, handler);
5258
this.txnId = transactionId;
5359
this.txnType = txnType;
60+
this.writeId = writeId;
61+
this.databases = databases;
5462
}
5563

5664
/**
@@ -66,4 +74,18 @@ public Long getTxnId() {
6674
public TxnType getTxnType() {
6775
return txnType;
6876
}
77+
78+
/**
79+
* @return List of write ids for which commit txn event is fired
80+
*/
81+
public List<Long> getWriteId() {
82+
return writeId;
83+
}
84+
85+
/**
86+
* @return List of databases for which commit txn event is fired
87+
*/
88+
public List<String> getDatabases() {
89+
return databases;
90+
}
6991
}

‎standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AbortTxnMessage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ protected AbortTxnMessage() {
3636
public abstract Long getTxnId();
3737

3838
public abstract List<String> getDbsUpdated();
39+
40+
public abstract List<Long> getWriteIds();
3941
}

‎standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,12 +303,12 @@ public OpenTxnMessage buildOpenTxnMessage(Long fromTxnId, Long toTxnId) {
303303
return new JSONOpenTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fromTxnId, toTxnId, now());
304304
}
305305

306-
public CommitTxnMessage buildCommitTxnMessage(Long txnId) {
307-
return new JSONCommitTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
306+
public CommitTxnMessage buildCommitTxnMessage(Long txnId, List<String> databases, List<Long> writeIds) {
307+
return new JSONCommitTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now(), databases, writeIds);
308308
}
309309

310-
public AbortTxnMessage buildAbortTxnMessage(Long txnId, List<String> dbsUpdated) {
311-
return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now(), dbsUpdated);
310+
public AbortTxnMessage buildAbortTxnMessage(Long txnId, List<String> dbsUpdated, List<Long> writeIds) {
311+
return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now(), dbsUpdated, writeIds);
312312
}
313313

314314
public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList,

‎standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAbortTxnMessage.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,22 @@ public class JSONAbortTxnMessage extends AbortTxnMessage {
4343

4444
@JsonProperty
4545
private List<String> dbsUpdated;
46+
@JsonProperty
47+
private List<Long> writeIds;
4648

4749
/**
4850
* Default constructor, needed for Jackson.
4951
*/
5052
public JSONAbortTxnMessage() {
5153
}
5254

53-
public JSONAbortTxnMessage(String server, String servicePrincipal, Long txnid, Long timestamp, List<String> dbsUpdated) {
55+
public JSONAbortTxnMessage(String server, String servicePrincipal, Long txnid, Long timestamp, List<String> dbsUpdated, List<Long> writeIds) {
5456
this.timestamp = timestamp;
5557
this.txnid = txnid;
5658
this.server = server;
5759
this.servicePrincipal = servicePrincipal;
5860
this.dbsUpdated = dbsUpdated;
61+
this.writeIds = writeIds;
5962
}
6063

6164
@Override
@@ -88,6 +91,11 @@ public List<String> getDbsUpdated() {
8891
return dbsUpdated;
8992
}
9093

94+
@Override
95+
public List<Long> getWriteIds() {
96+
return writeIds;
97+
}
98+
9199
@Override
92100
public String toString() {
93101
try {

‎standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ public JSONCommitTxnMessage(String server, String servicePrincipal, Long txnid,
7272
this.files = null;
7373
}
7474

75+
public JSONCommitTxnMessage(String server, String servicePrincipal, Long txnid, Long timestamp, List<String> databases, List<Long> writeIds) {
76+
this(server, servicePrincipal, txnid, timestamp);
77+
this.databases = databases;
78+
this.writeIds = writeIds;
79+
}
80+
7581
@Override
7682
public Long getTxnId() {
7783
return txnid;

‎standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
8282
import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
8383
import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
84+
import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
8485
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
8586
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
8687
import org.apache.hadoop.hive.metastore.metrics.Metrics;
@@ -90,6 +91,7 @@
9091
import org.apache.hadoop.hive.metastore.txn.entities.LockInfo;
9192
import org.apache.hadoop.hive.metastore.txn.entities.MetricsInfo;
9293
import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
94+
import org.apache.hadoop.hive.metastore.txn.entities.TxnWriteDetails;
9395
import org.apache.hadoop.hive.metastore.txn.jdbc.commands.*;
9496
import org.apache.hadoop.hive.metastore.txn.jdbc.functions.*;
9597
import org.apache.hadoop.hive.metastore.txn.jdbc.queries.*;
@@ -129,6 +131,7 @@
129131
import java.util.concurrent.TimeUnit;
130132
import java.util.concurrent.atomic.AtomicInteger;
131133
import java.util.function.BiPredicate;
134+
import java.util.stream.Collectors;
132135

133136
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
134137
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
@@ -513,23 +516,53 @@ public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaExcep
513516

514517
@Override
515518
public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException {
519+
List<TxnWriteDetails> txnWriteDetails = new ArrayList<>();
520+
if (transactionalListeners != null) {
521+
//Find the write details for this transaction.
522+
//Doing it here before the metadata tables are updated below.
523+
txnWriteDetails = getWriteIdsForTxnID(rqst.getTxnid());
524+
}
516525
TxnType txnType = new AbortTxnFunction(rqst).execute(jdbcResource);
517526
if (txnType != null) {
518527
if (transactionalListeners != null && (!rqst.isSetReplPolicy() || !TxnType.DEFAULT.equals(rqst.getTxn_type()))) {
519-
List<String> dbsUpdated = getTxnDbsUpdated(rqst.getTxnid());
520-
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.ABORT_TXN,
521-
new AbortTxnEvent(rqst.getTxnid(), txnType, null, dbsUpdated), jdbcResource.getConnection(), sqlGenerator);
528+
notifyCommitOrAbortEvent(rqst.getTxnid(),EventMessage.EventType.ABORT_TXN, txnType, jdbcResource.getConnection(), txnWriteDetails, transactionalListeners);
522529
}
523530
}
524531
}
525532

533+
public static void notifyCommitOrAbortEvent(long txnId, EventMessage.EventType eventType, TxnType txnType, Connection dbConn,
534+
List<TxnWriteDetails> txnWriteDetails, List<TransactionalMetaStoreEventListener> transactionalListeners) throws MetaException {
535+
List<Long> writeIds = txnWriteDetails.stream()
536+
.map(TxnWriteDetails::getWriteId)
537+
.collect(Collectors.toList());
538+
List<String> databases = txnWriteDetails.stream()
539+
.map(TxnWriteDetails::getDbName)
540+
.collect(Collectors.toList());
541+
ListenerEvent txnEvent;
542+
if (eventType.equals(EventMessage.EventType.ABORT_TXN)) {
543+
txnEvent = new AbortTxnEvent(txnId, txnType, null, databases, writeIds);
544+
} else {
545+
txnEvent = new CommitTxnEvent(txnId, txnType, null, databases, writeIds);
546+
}
547+
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
548+
eventType, txnEvent, dbConn, sqlGenerator);
549+
}
550+
551+
526552
@Override
527553
public void abortTxns(AbortTxnsRequest rqst) throws MetaException {
528554
List<Long> txnIds = rqst.getTxn_ids();
529555
TxnErrorMsg txnErrorMsg = TxnErrorMsg.NONE;
530556
if (rqst.isSetErrorCode()) {
531557
txnErrorMsg = TxnErrorMsg.getTxnErrorMsg(rqst.getErrorCode());
532558
}
559+
HashMap<Long, List<TxnWriteDetails>> txnWriteDetailsMap = new HashMap<>();
560+
if (transactionalListeners != null) {
561+
//Find the write details for this transaction.
562+
//Doing it here before the metadata tables are updated below.
563+
for(Long txnId : txnIds)
564+
txnWriteDetailsMap.put(txnId, getWriteIdsForTxnID(txnId));
565+
}
533566

534567
List<String> queries = new ArrayList<>();
535568
StringBuilder prefix =
@@ -562,10 +595,8 @@ public void abortTxns(AbortTxnsRequest rqst) throws MetaException {
562595

563596
if (transactionalListeners != null) {
564597
for (Long txnId : txnIds) {
565-
List<String> dbsUpdated = getTxnDbsUpdated(txnId);
566-
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
567-
EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId,
568-
nonReadOnlyTxns.getOrDefault(txnId, TxnType.READ_ONLY), null, dbsUpdated), dbConn, sqlGenerator);
598+
notifyCommitOrAbortEvent(txnId,EventMessage.EventType.ABORT_TXN,
599+
nonReadOnlyTxns.getOrDefault(txnId, TxnType.READ_ONLY), dbConn, txnWriteDetailsMap.get(txnId), transactionalListeners);
569600
}
570601
}
571602
} catch (SQLException e) {
@@ -1141,4 +1172,24 @@ private List<String> getTxnDbsUpdated(long txnId) throws MetaException {
11411172
}
11421173
}
11431174

1175+
/**
1176+
* Returns the databases and writeID updated by txnId.
1177+
* Queries TXN_TO_WRITE_ID using txnId.
1178+
*
1179+
* @param txnId Transaction ID for which write IDs are requested.
1180+
* @throws MetaException
1181+
*/
1182+
public List<TxnWriteDetails> getWriteIdsForTxnID(long txnId) throws MetaException {
1183+
try {
1184+
return sqlRetryHandler.executeWithRetry(
1185+
new SqlRetryCallProperties().withCallerId("GetWriteIdsForTxnIDHandler"),
1186+
() -> jdbcResource.execute(new GetWriteIdsForTxnIDHandler(txnId)));
1187+
} catch (MetaException e) {
1188+
throw e;
1189+
} catch (TException e) {
1190+
throw new MetaException(e.getMessage());
1191+
}
1192+
}
1193+
1194+
11441195
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hive.metastore.txn.entities;
19+
20+
/**
21+
* Class to hold transaction id, database name and write id.
22+
*/
23+
public class TxnWriteDetails {
24+
private final long txnId;
25+
private final String dbName;
26+
private final long writeId;
27+
28+
public TxnWriteDetails(long txnId, String dbName, long writeId) {
29+
this.txnId = txnId;
30+
this.dbName = dbName;
31+
this.writeId = writeId;
32+
}
33+
34+
@Override
35+
public String toString() {
36+
return "TxnToWriteID{" +
37+
"txnId=" + txnId +
38+
", dbName='" + dbName + '\'' +
39+
", writeId=" + writeId +
40+
'}';
41+
}
42+
43+
public long getTxnId() {
44+
return txnId;
45+
}
46+
47+
public String getDbName() {
48+
return dbName;
49+
}
50+
51+
public long getWriteId() {
52+
return writeId;
53+
}
54+
}

‎standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
4242
import org.apache.hadoop.hive.metastore.txn.TxnStore;
4343
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
44+
import org.apache.hadoop.hive.metastore.txn.entities.TxnWriteDetails;
4445
import org.apache.hadoop.hive.metastore.txn.jdbc.commands.DeleteReplTxnMapEntryCommand;
4546
import org.apache.hadoop.hive.metastore.txn.jdbc.commands.InsertCompletedTxnComponentsCommand;
4647
import org.apache.hadoop.hive.metastore.txn.jdbc.commands.RemoveTxnsFromMinHistoryLevelCommand;
@@ -49,6 +50,7 @@
4950
import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetCompactionInfoHandler;
5051
import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetHighWaterMarkHandler;
5152
import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetOpenTxnTypeAndLockHandler;
53+
import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsForTxnIDHandler;
5254
import org.apache.hadoop.hive.metastore.txn.jdbc.queries.TargetTxnIdListHandler;
5355
import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
5456
import org.apache.hadoop.hive.metastore.txn.jdbc.RollbackException;
@@ -72,6 +74,7 @@
7274
import java.util.stream.Collectors;
7375
import java.util.stream.IntStream;
7476

77+
import static org.apache.hadoop.hive.metastore.txn.TxnHandler.notifyCommitOrAbortEvent;
7578
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
7679
import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
7780

@@ -95,6 +98,14 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
9598

9699
boolean isReplayedReplTxn = TxnType.REPL_CREATED.equals(rqst.getTxn_type());
97100
boolean isHiveReplTxn = rqst.isSetReplPolicy() && TxnType.DEFAULT.equals(rqst.getTxn_type());
101+
//Find the write details for this transaction.
102+
//Doing it here before the metadata tables are updated below.
103+
List<TxnWriteDetails> txnWriteDetails = new ArrayList<>();
104+
105+
if (!isHiveReplTxn) {
106+
txnWriteDetails = jdbcResource.execute(new GetWriteIdsForTxnIDHandler(rqst.getTxnid()));
107+
108+
}
98109
// Get the current TXN
99110
TransactionContext context = jdbcResource.getTransactionManager().getActiveTransaction();
100111
Long commitId = null;
@@ -262,7 +273,7 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
262273
}
263274

264275
if (!isHiveReplTxn) {
265-
createCommitNotificationEvent(jdbcResource, txnid , txnType);
276+
createCommitNotificationEvent(jdbcResource, txnid , txnType, txnWriteDetails);
266277
}
267278

268279
LOG.debug("Going to commit");
@@ -575,15 +586,16 @@ private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
575586

576587
/**
577588
* Create Notifiaction Events on txn commit
589+
*
578590
* @param txnid committed txn
579591
* @param txnType transaction type
592+
* @param txnWriteDetails write details of the transaction
580593
* @throws MetaException ex
581594
*/
582-
private void createCommitNotificationEvent(MultiDataSourceJdbcResource jdbcResource, long txnid, TxnType txnType)
595+
private void createCommitNotificationEvent(MultiDataSourceJdbcResource jdbcResource, long txnid, TxnType txnType, List<TxnWriteDetails> txnWriteDetails)
583596
throws MetaException {
584597
if (transactionalListeners != null) {
585-
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
586-
EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, txnType), jdbcResource.getConnection(), jdbcResource.getSqlGenerator());
598+
notifyCommitOrAbortEvent(txnid, EventMessage.EventType.COMMIT_TXN, txnType, jdbcResource.getConnection(), txnWriteDetails, transactionalListeners);
587599

588600
CompactionInfo compactionInfo = jdbcResource.execute(new GetCompactionInfoHandler(txnid, true));
589601
if (compactionInfo != null) {

‎standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929
import org.apache.hadoop.hive.metastore.txn.TxnErrorMsg;
3030
import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
3131
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
32+
import org.apache.hadoop.hive.metastore.txn.entities.TxnWriteDetails;
3233
import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
3334
import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionContext;
3435
import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction;
3536
import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetTxnDbsUpdatedHandler;
37+
import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsForTxnIDHandler;
3638
import org.slf4j.Logger;
3739
import org.slf4j.LoggerFactory;
3840
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
@@ -45,6 +47,7 @@
4547
import java.util.Set;
4648
import java.util.TreeSet;
4749

50+
import static org.apache.hadoop.hive.metastore.txn.TxnHandler.notifyCommitOrAbortEvent;
4851
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
4952

5053
public class PerformTimeoutsFunction implements TransactionalFunction<Void> {
@@ -129,11 +132,9 @@ public Void execute(MultiDataSourceJdbcResource jdbcResource) {
129132
LOG.info("Aborted the following transactions due to timeout: {}", batchToAbort);
130133
if (transactionalListeners != null) {
131134
for (Map.Entry<Long, TxnType> txnEntry : batchToAbort.entrySet()) {
132-
List<String> dbsUpdated = jdbcResource.execute(new GetTxnDbsUpdatedHandler(txnEntry.getKey()));
133-
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
134-
EventMessage.EventType.ABORT_TXN,
135-
new AbortTxnEvent(txnEntry.getKey(), txnEntry.getValue(), null, dbsUpdated),
136-
jdbcResource.getConnection(), jdbcResource.getSqlGenerator());
135+
List<TxnWriteDetails> txnWriteDetails = jdbcResource.execute(new GetWriteIdsForTxnIDHandler(txnEntry.getKey()));
136+
notifyCommitOrAbortEvent(txnEntry.getKey(), EventMessage.EventType.ABORT_TXN , txnEntry.getValue(),
137+
jdbcResource.getConnection(), txnWriteDetails, transactionalListeners);
137138
}
138139
LOG.debug("Added Notifications for the transactions that are aborted due to timeout: {}", batchToAbort);
139140
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hive.metastore.txn.jdbc.queries;
19+
20+
import org.apache.hadoop.hive.metastore.DatabaseProduct;
21+
import org.apache.hadoop.hive.metastore.api.MetaException;
22+
import org.apache.hadoop.hive.metastore.txn.entities.TxnWriteDetails;
23+
import org.apache.hadoop.hive.metastore.txn.jdbc.QueryHandler;
24+
import org.springframework.dao.DataAccessException;
25+
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
26+
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
27+
28+
import java.sql.ResultSet;
29+
import java.sql.SQLException;
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
33+
/**
34+
* Returns the databases and writeID updated by txnId.
35+
* Queries TXN_TO_WRITE_ID using txnId.
36+
*/
37+
public class GetWriteIdsForTxnIDHandler implements QueryHandler<List<TxnWriteDetails>> {
38+
39+
private final long txnId;
40+
41+
public GetWriteIdsForTxnIDHandler(long txnId) {
42+
this.txnId = txnId;
43+
}
44+
45+
@Override
46+
public String getParameterizedQueryString(DatabaseProduct databaseProduct) throws MetaException {
47+
return "SELECT DISTINCT \"T2W_DATABASE\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" \"COMMITTED\" WHERE \"T2W_TXNID\" = :txnId";
48+
}
49+
50+
@Override
51+
public SqlParameterSource getQueryParameters() {
52+
return new MapSqlParameterSource().addValue("txnId", txnId);
53+
}
54+
55+
@Override
56+
public List<TxnWriteDetails> extractData(ResultSet rs) throws SQLException, DataAccessException {
57+
List<TxnWriteDetails> dbsUpdated = new ArrayList<>();
58+
while (rs.next()) {
59+
TxnWriteDetails entry = new TxnWriteDetails(txnId, rs.getString(1), rs.getLong(2));
60+
dbsUpdated.add(entry);
61+
}
62+
return dbsUpdated;
63+
}
64+
}
65+

‎standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/events/TestAbortTxnEventDbsUpdated.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ public void testBackwardsCompatibility() {
4949
@Test
5050
public void testSerializeDeserialize() {
5151
List dbsUpdated = Arrays.asList("db1", "db22");
52-
AbortTxnEvent event = new AbortTxnEvent(999L, TxnType.DEFAULT, null, dbsUpdated);
52+
List writeIds = Arrays.asList(1L, 2L);
53+
AbortTxnEvent event = new AbortTxnEvent(999L, TxnType.DEFAULT, null, dbsUpdated, writeIds);
5354
AbortTxnMessage msg =
54-
MessageBuilder.getInstance().buildAbortTxnMessage(event.getTxnId(), event.getDbsUpdated());
55+
MessageBuilder.getInstance().buildAbortTxnMessage(event.getTxnId(), event.getDbsUpdated(), event.getWriteId());
5556
JSONMessageEncoder msgEncoder = new JSONMessageEncoder();
5657
String json = msgEncoder.getSerializer().serialize(msg);
5758

@@ -63,6 +64,12 @@ public void testSerializeDeserialize() {
6364
Assert.assertTrue(actual.remove("db1"));
6465
Assert.assertTrue(actual.remove("db22"));
6566
Assert.assertTrue(actual.isEmpty());
67+
68+
List actualWriteIds = abortTxnMsg.getWriteIds();
69+
Assert.assertTrue(actualWriteIds.remove(1L));
70+
Assert.assertTrue(actualWriteIds.remove(2L));
71+
Assert.assertTrue(actualWriteIds.isEmpty());
72+
6673
Assert.assertEquals(999L, abortTxnMsg.getTxnId().longValue());
6774
}
6875
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hive.metastore.events;
19+
20+
import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
21+
import org.apache.hadoop.hive.metastore.api.TxnType;
22+
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
23+
import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
24+
import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer;
25+
import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
import org.junit.experimental.categories.Category;
29+
30+
import java.util.Arrays;
31+
import java.util.HashSet;
32+
import java.util.List;
33+
import java.util.Set;
34+
35+
@Category(MetastoreUnitTest.class)
36+
public class TestCommitTxnEventWithDbAndWriteId {
37+
@Test
38+
public void testBackwardsCompatibility() {
39+
final String json = "{\"txnid\":12787,\"timestamp\":1654116516,\"server\":\"\",\"servicePrincipal\":\"\"}";
40+
JSONMessageDeserializer deserializer = new JSONMessageDeserializer();
41+
CommitTxnMessage commitTxnMsg = deserializer.getCommitTxnMessage(json);
42+
Assert.assertNull(commitTxnMsg.getDatabases());
43+
Assert.assertEquals(12787L, commitTxnMsg.getTxnId().longValue());
44+
}
45+
46+
@Test
47+
public void testSerializeDeserialize() {
48+
49+
List<String> databases = Arrays.asList("db1", "db22");
50+
List<Long> writeIds = Arrays.asList(1L, 2L);
51+
CommitTxnEvent event = new CommitTxnEvent(999L, TxnType.DEFAULT, null, databases, writeIds);
52+
CommitTxnMessage msg =
53+
MessageBuilder.getInstance().buildCommitTxnMessage(event.getTxnId(), event.getDatabases(), event.getWriteId());
54+
JSONMessageEncoder msgEncoder = new JSONMessageEncoder();
55+
String json = msgEncoder.getSerializer().serialize(msg);
56+
57+
JSONMessageDeserializer deserializer = new JSONMessageDeserializer();
58+
CommitTxnMessage commitTxnMsg = deserializer.getCommitTxnMessage(json);
59+
Set<String> expected = new HashSet(databases);
60+
Assert.assertEquals(expected.size(), commitTxnMsg.getDatabases().size());
61+
List actual = commitTxnMsg.getDatabases();
62+
Assert.assertTrue(actual.remove("db1"));
63+
Assert.assertTrue(actual.remove("db22"));
64+
Assert.assertTrue(actual.isEmpty());
65+
66+
List actualWriteIds = commitTxnMsg.getWriteIds();
67+
Assert.assertTrue(actualWriteIds.remove(1L));
68+
Assert.assertTrue(actualWriteIds.remove(2L));
69+
Assert.assertTrue(actualWriteIds.isEmpty());
70+
71+
Assert.assertEquals(999L, commitTxnMsg.getTxnId().longValue());
72+
}
73+
74+
75+
}

0 commit comments

Comments
 (0)
Please sign in to comment.