Skip to content

Commit bc30009

Browse files
authored
Merge pull request #2824 from EventStore/prevent-checkpoint-at-same-position
Prevent a checkpoint from being emitted at same position twice
2 parents d3cec82 + ea61ac7 commit bc30009

File tree

2 files changed

+7
-6
lines changed

2 files changed

+7
-6
lines changed

src/EventStore.Projections.Core.Tests/Services/projection_subscription/when_handling_events_with_different_event_filters.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,8 @@ namespace EventStore.Projections.Core.Tests.Services.projection_subscription {
2121
[TestFixture("good-stream", "good-event-type", "good-stream", "bad-event-type", true, false)]
2222
[TestFixture("good-stream", "good-event-type", "good-stream", "bad-event-type", false, true)]
2323
[TestFixture("good-stream", "good-event-type", "good-stream", "bad-event-type", false, false)]
24-
[TestFixture("good-stream", "good-event-type", "bad-stream", "good-event-type", true, true)]
25-
[TestFixture("good-stream", "good-event-type", "bad-stream", "good-event-type", true, false)]
2624
[TestFixture("good-stream", "good-event-type", "bad-stream", "good-event-type", false, true)]
2725
[TestFixture("good-stream", "good-event-type", "bad-stream", "good-event-type", false, false)]
28-
[TestFixture("good-stream", "good-event-type", "bad-stream", "bad-event-type", true, true)]
29-
[TestFixture("good-stream", "good-event-type", "bad-stream", "bad-event-type", true, false)]
3026
[TestFixture("good-stream", "good-event-type", "bad-stream", "bad-event-type", false, true)]
3127
[TestFixture("good-stream", "good-event-type", "bad-stream", "bad-event-type", false, false)]
3228
public class

src/EventStore.Projections.Core/Services/Processing/ReaderSubscriptionBase.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class ReaderSubscriptionBase {
3030
private DateTime _lastCheckpointTime = DateTime.MinValue;
3131
private bool _enableContentTypeValidation;
3232
private ILogger _logger;
33+
private CheckpointTag _lastCheckpointTag;
3334

3435
protected ReaderSubscriptionBase(
3536
IPublisher publisher,
@@ -65,6 +66,7 @@ protected ReaderSubscriptionBase(
6566
_positionTagger = readerStrategy.PositionTagger;
6667
_positionTracker = new PositionTracker(_positionTagger);
6768
_positionTracker.UpdateByCheckpointTagInitial(@from);
69+
_lastCheckpointTag = _positionTracker.LastTag;
6870
_enableContentTypeValidation = enableContentTypeValidation;
6971
_logger = Serilog.Log.ForContext<ReaderSubscriptionBase>();
7072
}
@@ -125,7 +127,8 @@ protected void ProcessOne(ReaderSubscriptionMessage.CommittedEventDistributed me
125127
_eventsSinceLastCheckpointSuggestedOrStart++;
126128
if (_checkpointProcessedEventsThreshold > 0
127129
&& timeDifference > _checkpointAfter
128-
&& _eventsSinceLastCheckpointSuggestedOrStart >= _checkpointProcessedEventsThreshold)
130+
&& _eventsSinceLastCheckpointSuggestedOrStart >= _checkpointProcessedEventsThreshold
131+
&& _lastCheckpointTag != _positionTracker.LastTag)
129132
SuggestCheckpoint(message);
130133
if (_stopAfterNEvents > 0 && _eventsSinceLastCheckpointSuggestedOrStart >= _stopAfterNEvents)
131134
NEventsReached();
@@ -134,7 +137,8 @@ protected void ProcessOne(ReaderSubscriptionMessage.CommittedEventDistributed me
134137
&& timeDifference > _checkpointAfter
135138
&& (_lastPassedOrCheckpointedEventPosition != null
136139
&& message.Data.Position.PreparePosition - _lastPassedOrCheckpointedEventPosition.Value
137-
> _checkpointUnhandledBytesThreshold))
140+
> _checkpointUnhandledBytesThreshold)
141+
&& _lastCheckpointTag != _positionTracker.LastTag)
138142
SuggestCheckpoint(message);
139143
else if (progressChanged)
140144
PublishProgress(roundedProgress);
@@ -178,6 +182,7 @@ private void PublishStartingAt(long startingLastCommitPosition) {
178182

179183
private void SuggestCheckpoint(ReaderSubscriptionMessage.CommittedEventDistributed message) {
180184
_lastPassedOrCheckpointedEventPosition = message.Data.Position.PreparePosition;
185+
_lastCheckpointTag = _positionTracker.LastTag;
181186
_publisher.Publish(
182187
new EventReaderSubscriptionMessage.CheckpointSuggested(
183188
_subscriptionId, _positionTracker.LastTag, message.Progress,

0 commit comments

Comments
 (0)