Skip to content

Commit 1a6ff41

Browse files
authored
[STORM-8019] Fixing kafka topic level metrics computation (#8047)
* Fixing the way Kafka topic level metrics are computed
1 parent ab524ea commit 1a6ff41

File tree

8 files changed

+729
-235
lines changed

8 files changed

+729
-235
lines changed

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Set;
4040
import java.util.concurrent.TimeUnit;
4141
import java.util.stream.Collectors;
42+
4243
import org.apache.commons.lang.Validate;
4344
import org.apache.kafka.clients.admin.Admin;
4445
import org.apache.kafka.clients.consumer.Consumer;
@@ -154,7 +155,7 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
154155

155156
tupleListener.open(conf, context);
156157
this.kafkaOffsetMetricManager
157-
= new KafkaOffsetMetricManager<>(() -> Collections.unmodifiableMap(offsetManagers), () -> admin, context);
158+
= new KafkaOffsetMetricManager<>(() -> Collections.unmodifiableMap(offsetManagers), () -> admin, context);
158159

159160
LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
160161
}
@@ -183,7 +184,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
183184
previousAssignment = partitions;
184185

185186
LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
186-
kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
187+
kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
187188

188189
if (isAtLeastOnceProcessing()) {
189190
commitOffsetsForAckedTuples();
@@ -193,7 +194,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
193194
@Override
194195
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
195196
LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]",
196-
context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
197+
context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
197198

198199
initialize(partitions);
199200
tupleListener.onPartitionsReassigned(partitions);
@@ -221,7 +222,7 @@ private void initialize(Collection<TopicPartition> partitions) {
221222
final OffsetAndMetadata committedOffset = consumer.committed(newTp);
222223
final long fetchOffset = doSeek(newTp, committedOffset);
223224
LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]",
224-
fetchOffset, newTp, firstPollOffsetStrategy, committedOffset);
225+
fetchOffset, newTp, firstPollOffsetStrategy, committedOffset);
225226
if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(newTp)) {
226227
offsetManagers.put(newTp, new OffsetManager(newTp, fetchOffset));
227228
}
@@ -234,13 +235,13 @@ private void initialize(Collection<TopicPartition> partitions) {
234235
*/
235236
private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) {
236237
LOG.trace("Seeking offset for topic-partition [{}] with [{}] and committed offset [{}]",
237-
newTp, firstPollOffsetStrategy, committedOffset);
238+
newTp, firstPollOffsetStrategy, committedOffset);
238239

239240
if (committedOffset != null) {
240241
// offset was previously committed for this consumer group and topic-partition, either by this or another topology.
241242
if (commitMetadataManager.isOffsetCommittedByThisTopology(newTp,
242-
committedOffset,
243-
Collections.unmodifiableMap(offsetManagers))) {
243+
committedOffset,
244+
Collections.unmodifiableMap(offsetManagers))) {
244245
// Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply.
245246
consumer.seek(newTp, committedOffset.offset());
246247
} else {
@@ -281,7 +282,7 @@ public void nextTuple() {
281282
commitOffsetsForAckedTuples();
282283
} else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
283284
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
284-
createFetchedOffsetsMetadata(consumer.assignment());
285+
createFetchedOffsetsMetadata(consumer.assignment());
285286
consumer.commitAsync(offsetsToCommit, null);
286287
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
287288
}
@@ -336,7 +337,7 @@ private PollablePartitionsInfo getPollablePartitionsInfo() {
336337
pollablePartitions.add(tp);
337338
} else {
338339
LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", tp,
339-
numUncommittedOffsets, maxUncommittedOffsets);
340+
numUncommittedOffsets, maxUncommittedOffsets);
340341
}
341342
}
342343
}
@@ -345,7 +346,7 @@ private PollablePartitionsInfo getPollablePartitionsInfo() {
345346

346347
private boolean isWaitingToEmit() {
347348
return waitingToEmit.values().stream()
348-
.anyMatch(list -> !list.isEmpty());
349+
.anyMatch(list -> !list.isEmpty());
349350
}
350351

351352
private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
@@ -365,11 +366,11 @@ private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePar
365366
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
366367
final int numPolledRecords = consumerRecords.count();
367368
LOG.debug("Polled [{}] records from Kafka",
368-
numPolledRecords);
369+
numPolledRecords);
369370
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
370371
//Commit polled records immediately to ensure delivery is at-most-once.
371372
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
372-
createFetchedOffsetsMetadata(consumer.assignment());
373+
createFetchedOffsetsMetadata(consumer.assignment());
373374
consumer.commitSync(offsetsToCommit);
374375
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
375376
}
@@ -387,7 +388,7 @@ private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEa
387388
}
388389

389390
private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earliestRetriableOffsets,
390-
ConsumerRecords<K, V> consumerRecords) {
391+
ConsumerRecords<K, V> consumerRecords) {
391392
for (Entry<TopicPartition, Long> entry : earliestRetriableOffsets.entrySet()) {
392393
TopicPartition tp = entry.getKey();
393394
List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
@@ -529,7 +530,7 @@ private void commitOffsetsForAckedTuples() {
529530
* to the committed offset.
530531
*/
531532
LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]",
532-
position, committedOffset);
533+
position, committedOffset);
533534
consumer.seek(tp, committedOffset);
534535
}
535536
/**
@@ -539,8 +540,8 @@ private void commitOffsetsForAckedTuples() {
539540
if (waitingToEmitForTp != null) {
540541
//Discard the pending records that are already committed
541542
waitingToEmit.put(tp, waitingToEmitForTp.stream()
542-
.filter(record -> record.offset() >= committedOffset)
543-
.collect(Collectors.toCollection(LinkedList::new)));
543+
.filter(record -> record.offset() >= committedOffset)
544+
.collect(Collectors.toCollection(LinkedList::new)));
544545
}
545546

546547
final OffsetManager offsetManager = offsetManagers.get(tp);
@@ -573,11 +574,11 @@ public void ack(Object messageId) {
573574

574575
if (!emitted.contains(msgId)) {
575576
LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
576-
+ "came from a topic-partition that this consumer group instance is no longer tracking "
577-
+ "due to rebalance/partition reassignment. No action taken.", msgId);
577+
+ "came from a topic-partition that this consumer group instance is no longer tracking "
578+
+ "due to rebalance/partition reassignment. No action taken.", msgId);
578579
} else {
579580
Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
580-
+ " This should never occur barring errors in the RetryService implementation or the spout code.");
581+
+ " This should never occur barring errors in the RetryService implementation or the spout code.");
581582
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
582583
emitted.remove(msgId);
583584
}
@@ -595,11 +596,11 @@ public void fail(Object messageId) {
595596
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
596597
if (!emitted.contains(msgId)) {
597598
LOG.debug("Received fail for tuple this spout is no longer tracking."
598-
+ " Partitions may have been reassigned. Ignoring message [{}]", msgId);
599+
+ " Partitions may have been reassigned. Ignoring message [{}]", msgId);
599600
return;
600601
}
601602
Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed."
602-
+ " This should never occur barring errors in the RetryService implementation or the spout code.");
603+
+ " This should never occur barring errors in the RetryService implementation or the spout code.");
603604

604605
msgId.incrementNumFails();
605606

@@ -630,7 +631,7 @@ private void refreshAssignment() {
630631
List<TopicPartition> allPartitionsSorted = new ArrayList<>(allPartitions);
631632
Collections.sort(allPartitionsSorted, TopicPartitionComparator.INSTANCE);
632633
Set<TopicPartition> assignedPartitions = kafkaSpoutConfig.getTopicPartitioner()
633-
.getPartitionsForThisTask(allPartitionsSorted, context);
634+
.getPartitionsForThisTask(allPartitionsSorted, context);
634635
boolean partitionChanged = topicAssigner.assignPartitions(consumer, assignedPartitions, rebalanceListener);
635636
if (partitionChanged && canRegisterMetrics()) {
636637
LOG.info("Partitions assignments has changed, updating metrics.");
@@ -683,9 +684,9 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
683684
@Override
684685
public String toString() {
685686
return "KafkaSpout{"
686-
+ "offsetManagers =" + offsetManagers
687-
+ ", emitted=" + emitted
688-
+ "}";
687+
+ "offsetManagers =" + offsetManagers
688+
+ ", emitted=" + emitted
689+
+ "}";
689690
}
690691

691692
@Override
@@ -718,8 +719,8 @@ private boolean isPrimitiveOrWrapper(Class<?> type) {
718719

719720
private boolean isWrapper(Class<?> type) {
720721
return type == Double.class || type == Float.class || type == Long.class
721-
|| type == Integer.class || type == Short.class || type == Character.class
722-
|| type == Byte.class || type == Boolean.class || type == String.class;
722+
|| type == Integer.class || type == Short.class || type == Character.class
723+
|| type == Byte.class || type == Boolean.class || type == String.class;
723724
}
724725

725726
private String getTopicsString() {
@@ -735,8 +736,8 @@ private static class PollablePartitionsInfo {
735736
PollablePartitionsInfo(Set<TopicPartition> pollablePartitions, Map<TopicPartition, Long> earliestRetriableOffsets) {
736737
this.pollablePartitions = pollablePartitions;
737738
this.pollableEarliestRetriableOffsets = earliestRetriableOffsets.entrySet().stream()
738-
.filter(entry -> pollablePartitions.contains(entry.getKey()))
739-
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
739+
.filter(entry -> pollablePartitions.contains(entry.getKey()))
740+
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
740741
}
741742

742743
public boolean shouldPoll() {

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
* to you under the Apache License, Version 2.0 (the
77
* "License"); you may not use this file except in compliance
88
* with the License. You may obtain a copy of the License at
9-
*
9+
* <p>
1010
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
11+
* <p>
1212
* Unless required by applicable law or agreed to in writing, software
1313
* distributed under the License is distributed on an "AS IS" BASIS,
1414
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,14 +22,15 @@
2222
import java.util.Map;
2323
import java.util.Set;
2424
import java.util.function.Supplier;
25-
2625
import org.apache.kafka.clients.admin.Admin;
2726
import org.apache.kafka.common.TopicPartition;
2827
import org.apache.storm.kafka.spout.internal.OffsetManager;
2928
import org.apache.storm.task.TopologyContext;
3029
import org.slf4j.Logger;
3130
import org.slf4j.LoggerFactory;
3231

32+
33+
3334
/**
3435
* This class is used to manage both the partition and topic level offset metrics.
3536
*/
@@ -55,20 +56,22 @@ public KafkaOffsetMetricManager(Supplier<Map<TopicPartition, OffsetManager>> off
5556
}
5657

5758
public void registerMetricsForNewTopicPartitions(Set<TopicPartition> newAssignment) {
59+
5860
for (TopicPartition topicPartition : newAssignment) {
5961
if (!topicPartitionMetricsMap.containsKey(topicPartition)) {
6062
LOG.info("Registering metric for topicPartition: {}", topicPartition);
6163
// create topic level metrics for given topic if absent
6264
String topic = topicPartition.topic();
6365
KafkaOffsetTopicMetrics topicMetrics = topicMetricsMap.get(topic);
6466
if (topicMetrics == null) {
65-
topicMetrics = new KafkaOffsetTopicMetrics(topic);
67+
LOG.info("Registering metric for topic: {}", topic);
68+
topicMetrics = new KafkaOffsetTopicMetrics(topic, offsetManagerSupplier, adminSupplier, newAssignment);
6669
topicMetricsMap.put(topic, topicMetrics);
6770
topologyContext.registerMetricSet("kafkaOffset", topicMetrics);
6871
}
6972

7073
KafkaOffsetPartitionMetrics topicPartitionMetricSet
71-
= new KafkaOffsetPartitionMetrics<>(offsetManagerSupplier, adminSupplier, topicPartition, topicMetrics);
74+
= new KafkaOffsetPartitionMetrics<>(offsetManagerSupplier, adminSupplier, topicPartition);
7275
topicPartitionMetricsMap.put(topicPartition, topicPartitionMetricSet);
7376
topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet);
7477
}

0 commit comments

Comments
 (0)