Skip to content

Commit f42ffcd

Browse files
Add additional metrics for parked persistent subscription messages (ported from #5062) (#5068) (#5081)
* Add additional metrics for parked persistent subscription messages kurrentdb_persistent_sub_park_message_requests kurrentdb_persistent_sub_parked_message_replays # Conflicts: in using statements only Co-authored-by: Shaan Nobee <[email protected]>
1 parent f403c92 commit f42ffcd

12 files changed

+228
-28
lines changed

src/EventStore.Core.Tests/Services/PersistentSubscription/PersistentSubscriptionMessageParkerTests.cs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,10 @@ protected override void Given() {
215215

216216
[Test]
217217
public async Task should_have_one_parked_message() {
218-
_messageParker.BeginParkMessage(CreateResolvedEvent(0, 0), "testing", (_, __) => {
218+
_messageParker.BeginParkMessage(CreateResolvedEvent(0, 0), "testing", ParkReason.ClientNak, (_, __) => {
219219
Assert.AreEqual(1, _messageParker.ParkedMessageCount);
220+
Assert.AreEqual(1, _messageParker.ParkedDueToClientNak);
221+
Assert.AreEqual(0, _messageParker.ParkedDueToMaxRetries);
220222
Assert.AreEqual(EventTimeStamps[0], _messageParker.GetOldestParkedMessage);
221223
_done.TrySetResult(true);
222224
});
@@ -239,8 +241,8 @@ protected override void Given() {
239241

240242
_parked = new TaskCompletionSource<bool>();
241243
_messageParker = new PersistentSubscriptionMessageParker(_streamId, _ioDispatcher);
242-
_messageParker.BeginParkMessage(CreateResolvedEvent(0, 0), "testing", (_, __) => {
243-
_messageParker.BeginParkMessage(CreateResolvedEvent(1, 100), "testing", (_, __) => {
244+
_messageParker.BeginParkMessage(CreateResolvedEvent(0, 0), "testing", ParkReason.MaxRetries, (_, __) => {
245+
_messageParker.BeginParkMessage(CreateResolvedEvent(1, 100), "testing", ParkReason.MaxRetries, (_, __) => {
244246
_parked.SetResult(true);
245247
});
246248
});
@@ -251,6 +253,8 @@ public async Task should_have_no_parked_messages() {
251253
await _parked.Task;
252254
_messageParker.BeginMarkParkedMessagesReprocessed(2, null, true, () => {
253255
Assert.Zero(_messageParker.ParkedMessageCount);
256+
Assert.AreEqual(0, _messageParker.ParkedDueToClientNak);
257+
Assert.AreEqual(2, _messageParker.ParkedDueToMaxRetries);
254258
Assert.Null(_messageParker.GetOldestParkedMessage);
255259
_done.TrySetResult(true);
256260
});
@@ -273,8 +277,8 @@ protected override void Given() {
273277

274278
_replayParked = new TaskCompletionSource<bool>();
275279
_messageParker = new PersistentSubscriptionMessageParker(_streamId, _ioDispatcher);
276-
_messageParker.BeginParkMessage(CreateResolvedEvent(0, 0), "testing", (_, __) => {
277-
_messageParker.BeginParkMessage(CreateResolvedEvent(1, 100), "testing", (_, __) => {
280+
_messageParker.BeginParkMessage(CreateResolvedEvent(0, 0), "testing", ParkReason.ClientNak, (_, __) => {
281+
_messageParker.BeginParkMessage(CreateResolvedEvent(1, 100), "testing", ParkReason.ClientNak, (_, __) => {
278282
_messageParker.BeginMarkParkedMessagesReprocessed(2, null, true, () => {
279283
_replayParked.SetResult(true);
280284
});
@@ -286,8 +290,10 @@ protected override void Given() {
286290
public async Task should_have_one_parked_message() {
287291
await _replayParked.Task;
288292
_timeProvider.AddToUtcTime(new TimeSpan(0, 0, 1, 0));
289-
_messageParker.BeginParkMessage(CreateResolvedEvent(2, 200), "testing", (_, __) => {
293+
_messageParker.BeginParkMessage(CreateResolvedEvent(2, 200), "testing", ParkReason.MaxRetries, (_, __) => {
290294
Assert.AreEqual(1, _messageParker.ParkedMessageCount);
295+
Assert.AreEqual(2, _messageParker.ParkedDueToClientNak);
296+
Assert.AreEqual(1, _messageParker.ParkedDueToMaxRetries);
291297
Assert.AreEqual(EventTimeStamps[2], _messageParker.GetOldestParkedMessage);
292298
_done.TrySetResult(true);
293299
});

src/EventStore.Core.Tests/Services/PersistentSubscription/PersistentSubscriptionTests.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1511,7 +1511,7 @@ public void
15111511
//park an event (this can be done earlier too)
15121512
var parkedEventId = Guid.NewGuid();
15131513
var parkedEvent = Helper.BuildFakeEvent(parkedEventId, "type", "$persistentsubscription-streamName::groupName-parked", 15, 15, 15);
1514-
messageParker.BeginParkMessage(parkedEvent, "parked", (ev, res) => { });
1514+
messageParker.BeginParkMessage(parkedEvent, "parked", ParkReason.None, (ev, res) => { });
15151515

15161516
//retry parked events (this sets correct _state flag so that we can call HandleParkedReadCompleted below)
15171517
sub.RetryParkedMessages(null);
@@ -2335,7 +2335,7 @@ public void retrying_parked_messages_without_stop_at_replays_all_parkedEvents()
23352335
var parkedEvents = Enumerable.Range(0, 19)
23362336
.Select(v => Helper.BuildFakeEvent(Guid.NewGuid(), "type", "$persistentsubscription-streamName::groupName-parked", v, v, v)).ToArray();
23372337
foreach (var parkedEvent in parkedEvents) {
2338-
messageParker.BeginParkMessage(parkedEvent, "parked", (ev, res) => { });
2338+
messageParker.BeginParkMessage(parkedEvent, "parked", ParkReason.None, (ev, res) => { });
23392339
}
23402340

23412341
sub.RetryParkedMessages(null);
@@ -2381,7 +2381,7 @@ public void retrying_parked_messages_with_stop_at_replays_parkedEvents_until_tha
23812381
var parkedEvents = Enumerable.Range(0, 19)
23822382
.Select(v => Helper.BuildFakeEvent(Guid.NewGuid(), "type", "$persistentsubscription-streamName::groupName-parked", v, v, v)).ToArray();
23832383
foreach (var parkedEvent in parkedEvents) {
2384-
messageParker.BeginParkMessage(parkedEvent, "parked", (ev, res) => { });
2384+
messageParker.BeginParkMessage(parkedEvent, "parked", ParkReason.None, (ev, res) => { });
23852385
}
23862386

23872387
var stopAt = 7L;
@@ -2593,7 +2593,7 @@ public void ParkMessageCompleted(int idx, OperationResult result) {
25932593
_parkMessageCompleted?.Invoke(ParkedEvents[idx], result);
25942594
}
25952595

2596-
public void BeginParkMessage(ResolvedEvent ev, string reason,
2596+
public void BeginParkMessage(ResolvedEvent ev, string reason, ParkReason parkReason,
25972597
Action<ResolvedEvent, OperationResult> completed) {
25982598
ParkedEvents.Add(ev);
25992599
_lastParkedEventNumber = ev.OriginalEventNumber;
@@ -2629,6 +2629,9 @@ public void BeginLoadStats(Action completed) {
26292629
}
26302630

26312631
public DateTime? GetOldestParkedMessage { get; }
2632+
public long ParkedDueToClientNak { get; }
2633+
public long ParkedDueToMaxRetries { get; }
2634+
public long ParkedMessageReplays { get; }
26322635
}
26332636

26342637

src/EventStore.Core.XUnit.Tests/Metrics/MetricsEndpointTests.cs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
22
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
33

4+
using System;
45
using System.Collections.Generic;
56
using System.Net;
67
using System.Threading.Tasks;
78
using EventStore.Common.Configuration;
9+
using EventStore.Core.Bus;
810
using EventStore.Core.Configuration.Sources;
11+
using EventStore.Core.Messages;
12+
using EventStore.Core.Messaging;
13+
using EventStore.Core.Services.UserManagement;
914
using EventStore.Core.Tests;
1015
using EventStore.Core.Tests.Helpers;
1116
using Microsoft.Extensions.Configuration;
@@ -29,6 +34,39 @@ public async Task can_produce_legacy_metrics() {
2934
Assert.Contains(expected, content);
3035
}
3136

37+
private async static Task CreatePersistentSubscription(IPublisher publisher) {
38+
var tcs = new TaskCompletionSource();
39+
publisher.Publish(new ClientMessage.CreatePersistentSubscriptionToStream(
40+
internalCorrId: Guid.NewGuid(),
41+
correlationId: Guid.NewGuid(),
42+
envelope: new CallbackEnvelope(msg => {
43+
var completed = msg as ClientMessage.CreatePersistentSubscriptionToStreamCompleted;
44+
Assert.NotNull(completed);
45+
Assert.Equal(ClientMessage.CreatePersistentSubscriptionToStreamCompleted.
46+
CreatePersistentSubscriptionToStreamResult.Success, completed.Result);
47+
tcs.SetResult();
48+
}),
49+
eventStreamId: "stream",
50+
groupName: "group",
51+
resolveLinkTos: false,
52+
startFrom: 0,
53+
messageTimeoutMilliseconds: 1000,
54+
recordStatistics: false,
55+
maxRetryCount: 10,
56+
bufferSize: 100,
57+
liveBufferSize: 10,
58+
readbatchSize: 10,
59+
checkPointAfterMilliseconds: 1000,
60+
minCheckPointCount: 10,
61+
maxCheckPointCount: 10,
62+
maxSubscriberCount: 10,
63+
namedConsumerStrategy: "RoundRobin",
64+
user: SystemAccounts.System));
65+
66+
await tcs.Task;
67+
await Task.Delay(TimeSpan.FromSeconds(1));
68+
}
69+
3270
async Task<string> Query(bool legacy) {
3371
var configuration = new ConfigurationBuilder()
3472
.AddSection($"{KurrentConfigurationKeys.Prefix}:Metrics", x => x
@@ -44,6 +82,9 @@ async Task<string> Query(bool legacy) {
4482
.Build();
4583
await using var sut = new MiniNode<LogFormat.V2, string>(Fixture.Directory, configuration: configuration);
4684
await sut.Start();
85+
86+
await CreatePersistentSubscription(sut.Node.MainQueue);
87+
4788
sut.HttpClient.DefaultRequestHeaders.Add(
4889
"Accept",
4990
"application/openmetrics-text;version=1.0.0,application/openmetrics-text;version=0.0.1;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1");
@@ -96,6 +137,15 @@ async Task<string> Query(bool legacy) {
96137
"# TYPE kurrentdb_sys_mem_bytes gauge",
97138
"# TYPE kurrentdb_writer_flush_duration_max_seconds gauge",
98139
"# TYPE kurrentdb_writer_flush_size_max gauge",
140+
"# TYPE kurrentdb_persistent_sub_connections gauge",
141+
"# TYPE kurrentdb_persistent_sub_parked_messages gauge",
142+
"# TYPE kurrentdb_persistent_sub_in_flight_messages gauge",
143+
"# TYPE kurrentdb_persistent_sub_oldest_parked_message_seconds gauge",
144+
"# TYPE kurrentdb_persistent_sub_last_known_event_number gauge",
145+
"# TYPE kurrentdb_persistent_sub_park_message_requests gauge",
146+
"# TYPE kurrentdb_persistent_sub_parked_message_replays gauge",
147+
"# TYPE kurrentdb_persistent_sub_checkpointed_event_number gauge",
148+
"# TYPE kurrentdb_persistent_sub_items_processed counter",
99149

100150
"# UNIT kurrentdb_cache_resources_entries entries",
101151
"# UNIT kurrentdb_disk_io_bytes bytes",
@@ -157,6 +207,15 @@ async Task<string> Query(bool legacy) {
157207
"kurrentdb_sys_mem_bytes{",
158208
"kurrentdb_writer_flush_duration_max_seconds{",
159209
"kurrentdb_writer_flush_size_max{",
210+
"kurrentdb_persistent_sub_connections{",
211+
"kurrentdb_persistent_sub_parked_messages{",
212+
"kurrentdb_persistent_sub_in_flight_messages{",
213+
"kurrentdb_persistent_sub_oldest_parked_message_seconds{",
214+
"kurrentdb_persistent_sub_last_known_event_number{",
215+
"kurrentdb_persistent_sub_park_message_requests{",
216+
"kurrentdb_persistent_sub_parked_message_replays{",
217+
"kurrentdb_persistent_sub_checkpointed_event_number{",
218+
"kurrentdb_persistent_sub_items_processed_total{",
160219
];
161220

162221
static IEnumerable<string> EventStoreMetrics => [
@@ -193,6 +252,15 @@ async Task<string> Query(bool legacy) {
193252
"# TYPE eventstore_sys_mem_bytes gauge",
194253
"# TYPE eventstore_writer_flush_duration_max_seconds gauge",
195254
"# TYPE eventstore_writer_flush_size_max gauge",
255+
"# TYPE eventstore_persistent_sub_connections gauge",
256+
"# TYPE eventstore_persistent_sub_parked_messages gauge",
257+
"# TYPE eventstore_persistent_sub_in_flight_messages gauge",
258+
"# TYPE eventstore_persistent_sub_oldest_parked_message_seconds gauge",
259+
"# TYPE eventstore_persistent_sub_last_known_event_number gauge",
260+
"# TYPE eventstore_persistent_sub_park_message_requests gauge",
261+
"# TYPE eventstore_persistent_sub_parked_message_replays gauge",
262+
"# TYPE eventstore_persistent_sub_checkpointed_event_number gauge",
263+
"# TYPE eventstore_persistent_sub_items_processed counter",
196264

197265
"eventstore_cache_hits_misses{",
198266
"eventstore_cache_resources_entries{",
@@ -229,5 +297,14 @@ async Task<string> Query(bool legacy) {
229297
"eventstore_sys_mem_bytes{",
230298
"eventstore_writer_flush_duration_max_seconds{",
231299
"eventstore_writer_flush_size_max{",
300+
"eventstore_persistent_sub_connections{",
301+
"eventstore_persistent_sub_parked_messages{",
302+
"eventstore_persistent_sub_in_flight_messages{",
303+
"eventstore_persistent_sub_oldest_parked_message_seconds{",
304+
"eventstore_persistent_sub_last_known_event_number{",
305+
"eventstore_persistent_sub_park_message_requests{",
306+
"eventstore_persistent_sub_parked_message_replays{",
307+
"eventstore_persistent_sub_checkpointed_event_number{",
308+
"eventstore_persistent_sub_items_processed{",
232309
];
233310
}

src/EventStore.Core.XUnit.Tests/Metrics/PersistentSubscriptionMetricsTests.cs

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
33

44
using System;
5+
using System.Collections.Generic;
56
using System.Diagnostics.Metrics;
67
using System.Linq;
78
using EventStore.Core.Messages;
@@ -36,6 +37,9 @@ public PersistentSubscriptionMetricsTests() {
3637
NamedConsumerStrategy = "Round Robin",
3738
OldestParkedMessage = 1007,
3839
OutstandingMessagesCount = 2,
40+
ParkedDueToClientNak = 2001,
41+
ParkedDueToMaxRetries = 2002,
42+
ParkedMessageReplays = 2003,
3943
ParkedMessageCount = 1003,
4044
ReadBatchSize = 20,
4145
ReadBufferCount = 0,
@@ -68,6 +72,9 @@ public PersistentSubscriptionMetricsTests() {
6872
NamedConsumerStrategy = "Round Robin",
6973
OldestParkedMessage = 1008,
7074
OutstandingMessagesCount = 2,
75+
ParkedDueToClientNak = 2004,
76+
ParkedDueToMaxRetries = 2005,
77+
ParkedMessageReplays = 2006,
7178
ParkedMessageCount = 1004,
7279
ReadBatchSize = 20,
7380
ReadBufferCount = 0,
@@ -101,6 +108,52 @@ public void ObserveParkedMessages() {
101108
AssertMeasurement("$all", "testGroup", 1004));
102109
}
103110

111+
[Fact]
112+
public void ObserveParkMessageRequests() {
113+
var measurements = _sut.ObserveParkMessageRequests();
114+
Assert.Collection(measurements,
115+
actual => {
116+
Assert.Equal(2001, actual.Value);
117+
Assert.Collection(
118+
actual.Tags.ToArray(),
119+
AssertTag("event_stream_id", "test"),
120+
AssertTag("group_name", "testGroup"),
121+
AssertTag("reason", "client-nak"));
122+
},
123+
actual => {
124+
Assert.Equal(2002, actual.Value);
125+
Assert.Collection(
126+
actual.Tags.ToArray(),
127+
AssertTag("event_stream_id", "test"),
128+
AssertTag("group_name", "testGroup"),
129+
AssertTag("reason", "max-retries"));
130+
},
131+
actual => {
132+
Assert.Equal(2004, actual.Value);
133+
Assert.Collection(
134+
actual.Tags.ToArray(),
135+
AssertTag("event_stream_id", "$all"),
136+
AssertTag("group_name", "testGroup"),
137+
AssertTag("reason", "client-nak"));
138+
},
139+
actual => {
140+
Assert.Equal(2005, actual.Value);
141+
Assert.Collection(
142+
actual.Tags.ToArray(),
143+
AssertTag("event_stream_id", "$all"),
144+
AssertTag("group_name", "testGroup"),
145+
AssertTag("reason", "max-retries"));
146+
});
147+
}
148+
149+
[Fact]
150+
public void ObserveParkedMessageReplays() {
151+
var measurements = _sut.ObserveParkedMessageReplays();
152+
Assert.Collection(measurements,
153+
AssertMeasurement("test", "testGroup", 2003),
154+
AssertMeasurement("$all", "testGroup", 2006));
155+
}
156+
104157
[Fact]
105158
public void ObserveInFlightMessages() {
106159
var measurements = _sut.ObserveInFlightMessages();
@@ -162,14 +215,13 @@ static Action<Measurement<long>> AssertMeasurement(
162215
Assert.Equal(expectedValue, actualMeasurement.Value);
163216
Assert.Collection(
164217
actualMeasurement.Tags.ToArray(),
165-
tag => {
166-
Assert.Equal("event_stream_id", tag.Key);
167-
Assert.Equal(sourceName, tag.Value);
168-
},
169-
tag => {
170-
Assert.Equal("group_name", tag.Key);
171-
Assert.Equal(groupName, tag.Value);
172-
}
173-
);
218+
AssertTag("event_stream_id", sourceName),
219+
AssertTag("group_name", groupName));
220+
};
221+
222+
static Action<KeyValuePair<string, object>> AssertTag(string key, object value) =>
223+
actualTag => {
224+
Assert.Equal(key, actualTag.Key);
225+
Assert.Equal(value, actualTag.Value);
174226
};
175227
}

src/EventStore.Core/Messages/MonitoringMessage.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ public class PersistentSubscriptionInfo {
132132
public string NamedConsumerStrategy { get; set; }
133133
public int MaxSubscriberCount { get; set; }
134134
public long ParkedMessageCount { get; set; }
135+
public long ParkedDueToClientNak { get; set; }
136+
public long ParkedDueToMaxRetries { get; set; }
137+
public long ParkedMessageReplays { get; set; }
135138
public long OldestParkedMessage { get; set; }
136139
}
137140

src/EventStore.Core/Metrics/PersistentSubscriptionTracker.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,28 @@ public IEnumerable<Measurement<long>> ObserveParkedMessages() =>
2929
new("group_name", x.GroupName)
3030
]));
3131

32+
public IEnumerable<Measurement<long>> ObserveParkMessageRequests() =>
33+
_currentStats.SelectMany<MonitoringMessage.PersistentSubscriptionInfo, Measurement<long>>(
34+
x => [
35+
new Measurement<long>(x.ParkedDueToClientNak, [
36+
new("event_stream_id", x.EventSource),
37+
new("group_name", x.GroupName),
38+
new("reason", "client-nak"),
39+
]),
40+
new Measurement<long>(x.ParkedDueToMaxRetries, [
41+
new("event_stream_id", x.EventSource),
42+
new("group_name", x.GroupName),
43+
new("reason", "max-retries"),
44+
]),
45+
]);
46+
47+
public IEnumerable<Measurement<long>> ObserveParkedMessageReplays() =>
48+
_currentStats.Select(x =>
49+
new Measurement<long>(x.ParkedMessageReplays, [
50+
new("event_stream_id", x.EventSource),
51+
new("group_name", x.GroupName)
52+
]));
53+
3254
public IEnumerable<Measurement<long>> ObserveInFlightMessages() =>
3355
_currentStats.Select(x =>
3456
new Measurement<long>(x.TotalInFlightMessages, [

src/EventStore.Core/MetricsBootstrapper.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ public static void Bootstrap(
179179
// these only go up, but are not strictly counters; should not have `_total` appended
180180
coreMeter.CreateObservableUpDownCounter($"{serviceName}-persistent-sub-last-known-event-number", tracker.ObserveLastKnownEvent);
181181
coreMeter.CreateObservableUpDownCounter($"{serviceName}-persistent-sub-last-known-event-commit-position", tracker.ObserveLastKnownEventCommitPosition);
182+
coreMeter.CreateObservableUpDownCounter($"{serviceName}-persistent-sub-park-message-requests", tracker.ObserveParkMessageRequests);
183+
coreMeter.CreateObservableUpDownCounter($"{serviceName}-persistent-sub-parked-message-replays", tracker.ObserveParkedMessageReplays);
182184
coreMeter.CreateObservableUpDownCounter($"{serviceName}-persistent-sub-checkpointed-event-number", tracker.ObserveLastCheckpointedEvent);
183185
coreMeter.CreateObservableUpDownCounter($"{serviceName}-persistent-sub-checkpointed-event-commit-position", tracker.ObserveLastCheckpointedEventCommitPosition);
184186

0 commit comments

Comments
 (0)