Skip to content

Commit a6c9f05

Browse files
[KDB-698] Add option to limit the maximum projection state size (#4884)
* Add option for setting max projection state size * Fault a projection if the state or checkpoint exceeds the max projection state size * add important parens (introduced before this pr but relevant) * Don't fail in checkpoint writer if state size is too large, we can rely on the checkpoint manager to fault the projection before we reach the point of writing a checkpoint. * Don't update the state in checkpoint manager if it is too large * Update checkpoint too large warning message * Add test for max projection state size * Default max projection state size to int.max --------- Co-authored-by: Timothy Coleman <[email protected]>
1 parent d34d064 commit a6c9f05

File tree

35 files changed

+205
-88
lines changed

35 files changed

+205
-88
lines changed

src/EventStore.ClusterNode/ClusterVNodeHostedService.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ public ClusterVNodeHostedService(
8080
TimeSpan.FromMinutes(options.Projection.ProjectionsQueryExpiry),
8181
options.Projection.FaultOutOfOrderProjections,
8282
options.Projection.ProjectionCompilationTimeout,
83-
options.Projection.ProjectionExecutionTimeout)))
83+
options.Projection.ProjectionExecutionTimeout,
84+
options.Projection.MaxProjectionStateSize)))
8485
: options;
8586

8687
if (!_options.Database.MemDb) {

src/EventStore.Core/Configuration/ClusterVNodeOptions.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,9 @@ public record ProjectionOptions {
755755
[Description("The maximum execution time in milliseconds for executing a handler in a user projection. It can be overridden for a specific projection by setting ProjectionExecutionTimeout config for that projection"),
756756
Unit("ms")]
757757
public int ProjectionExecutionTimeout { get; set; } = DefaultProjectionExecutionTimeout;
758+
759+
[Description("The maximum size, in bytes, of a projection's state and result. A projection will fault if its state size exceeds this value. May not exceed 16mb.")]
760+
public int MaxProjectionStateSize { get; set; } = Opts.MaxProjectionStateSizeDefault;
758761
}
759762

760763
public record UnknownOptions(IReadOnlyList<(string, string)> Options) {

src/EventStore.Core/Util/Opts.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace EventStore.Core.Util;
1010

1111
public static class Opts {
1212
public const int ConnectionPendingSendBytesThresholdDefault = 10 * 1024 * 1024;
13-
13+
1414
public const int ConnectionQueueSizeThresholdDefault = 50000;
1515

1616
public const int HashCollisionReadLimitDefault = 100;
@@ -19,6 +19,8 @@ public static class Opts {
1919

2020
public const int ProjectionsQueryExpiryDefault = 5;
2121

22+
public const int MaxProjectionStateSizeDefault = int.MaxValue;
23+
2224
public const byte IndexBitnessVersionDefault = Index.PTableVersions.IndexV4;
2325

2426
public static readonly string AuthenticationTypeDefault = "internal";

src/EventStore.Projections.Core.Javascript.Tests/Integration/ProjectionRuntimeScenario.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using EventStore.Core.TransactionLog.Checkpoint;
1212
using EventStore.Core.TransactionLog.Chunks;
1313
using EventStore.Core.TransactionLog.FileNamingStrategy;
14+
using EventStore.Core.Util;
1415
using Microsoft.AspNetCore.Builder;
1516
using Microsoft.AspNetCore.Hosting;
1617
using Microsoft.Extensions.Configuration;
@@ -28,7 +29,7 @@ public abstract class ProjectionRuntimeScenario: SubsystemScenario {
2829
}
2930

3031
static (Action, IPublisher) CreateRuntime(SynchronousScheduler mainBus, IQueuedHandler mainQueue, ICheckpoint writerCheckpoint) {
31-
var options = new ProjectionSubsystemOptions(3, ProjectionType.All, true, TimeSpan.FromMinutes(5), false, 500, 500);
32+
var options = new ProjectionSubsystemOptions(3, ProjectionType.All, true, TimeSpan.FromMinutes(5), false, 500, 500, Opts.MaxProjectionStateSizeDefault);
3233
var config = new TFChunkDbConfig("mem", new VersionedPatternFileNamingStrategy("mem", "chunk-"), 10000, 0, writerCheckpoint, new InMemoryCheckpoint(-1), new InMemoryCheckpoint(-1), new InMemoryCheckpoint(-1), new InMemoryCheckpoint(-1), new InMemoryCheckpoint(-1), new InMemoryCheckpoint(-1), new InMemoryCheckpoint(-1), true);
3334
var db = new TFChunkDb(config);
3435
var qs = new QueueStatsManager();

src/EventStore.Projections.Core.Tests/ClientAPI/Cluster/specification_with_standard_projections_runnning.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public override async Task TestFixtureSetUp() {
119119
}
120120

121121
private MiniClusterNode<TLogFormat, TStreamId> CreateNode(int index, Endpoints endpoints, EndPoint[] gossipSeeds) {
122-
_projections[index] = new ProjectionsSubsystem(new ProjectionSubsystemOptions(1, ProjectionType.All, false, TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault), Opts.FaultOutOfOrderProjectionsDefault, 500, 250));
122+
_projections[index] = new ProjectionsSubsystem(new ProjectionSubsystemOptions(1, ProjectionType.All, false, TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault), Opts.FaultOutOfOrderProjectionsDefault, 500, 250, Opts.MaxProjectionStateSizeDefault));
123123
var node = new MiniClusterNode<TLogFormat, TStreamId>(
124124
PathName, index, endpoints.InternalTcp,
125125
endpoints.ExternalTcp, endpoints.HttpEndPoint,

src/EventStore.Projections.Core.Tests/ClientAPI/projectionsManager/SpecificationWithNodeAndProjectionsManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public override async Task TestFixtureTearDown() {
7474
public abstract Task When();
7575

7676
protected MiniNode<TLogFormat, TStreamId> CreateNode() {
77-
_projectionsSubsystem = new ProjectionsSubsystem(new ProjectionSubsystemOptions(1, ProjectionType.All, false, TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault), Opts.FaultOutOfOrderProjectionsDefault, 500, 250));
77+
_projectionsSubsystem = new ProjectionsSubsystem(new ProjectionSubsystemOptions(1, ProjectionType.All, false, TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault), Opts.FaultOutOfOrderProjectionsDefault, 500, 250, Opts.MaxProjectionStateSizeDefault));
7878
_systemProjectionsCreated = SystemProjections.Created(_projectionsSubsystem.LeaderInputBus);
7979
return new MiniNode<TLogFormat, TStreamId>(
8080
PathName, inMemDb: true,

src/EventStore.Projections.Core.Tests/ClientAPI/specification_with_standard_projections_runnning.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public override async Task TestFixtureSetUp() {
4343
TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault),
4444
Opts.FaultOutOfOrderProjectionsDefault,
4545
500,
46-
250);
46+
250, Opts.MaxProjectionStateSizeDefault);
4747
_projections = new ProjectionsSubsystem(configuration);
4848
_node = new MiniNode<TLogFormat, TStreamId>(
4949
PathName, inMemDb: true,

src/EventStore.Projections.Core.Tests/Services/TestFixtureWithProjectionCoreService.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using EventStore.Core.Services.TimerService;
1414
using EventStore.Core.Tests.Bus.Helpers;
1515
using EventStore.Core.TransactionLog.Checkpoint;
16+
using EventStore.Core.Util;
1617
using EventStore.Projections.Core.Messages;
1718
using EventStore.Projections.Core.Services;
1819
using EventStore.Projections.Core.Services.Processing;
@@ -106,7 +107,7 @@ public virtual void Setup() {
106107
_workerId = Guid.NewGuid();
107108
var guardBus = new GuardBusToTriggerFixingIfUsed();
108109
var configuration = new ProjectionsStandardComponents(1, ProjectionType.All, guardBus, guardBus, guardBus, guardBus, true,
109-
500, 250);
110+
500, 250, Opts.MaxProjectionStateSizeDefault);
110111
_service = new ProjectionCoreService(
111112
_workerId, _bus, _bus, _subscriptionDispatcher, new RealTimeProvider(), ioDispatcher, configuration);
112113
_bus.Subscribe(

src/EventStore.Projections.Core.Tests/Services/core_projection/TestFixtureWithCoreProjection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using EventStore.Core.Messages;
88
using EventStore.Core.Services.UserManagement;
99
using EventStore.Core.Tests.Bus.Helpers;
10+
using EventStore.Core.Util;
1011
using EventStore.Projections.Core.Messages;
1112
using EventStore.Projections.Core.Services;
1213
using EventStore.Projections.Core.Services.Management;
@@ -84,13 +85,13 @@ protected virtual ProjectionProcessingStrategy GivenProjectionProcessingStrategy
8485
protected ProjectionProcessingStrategy CreateProjectionProcessingStrategy() {
8586
return new ContinuousProjectionProcessingStrategy(
8687
_projectionName, _version, _stateHandler, _projectionConfig, _stateHandler.GetSourceDefinition(), null,
87-
_subscriptionDispatcher, true);
88+
_subscriptionDispatcher, true, Opts.MaxProjectionStateSizeDefault);
8889
}
8990

9091
protected ProjectionProcessingStrategy CreateQueryProcessingStrategy() {
9192
return new QueryProcessingStrategy(
9293
_projectionName, _version, _stateHandler, _projectionConfig, _stateHandler.GetSourceDefinition(), null,
93-
_subscriptionDispatcher, true);
94+
_subscriptionDispatcher, true, Opts.MaxProjectionStateSizeDefault);
9495
}
9596

9697
protected virtual ProjectionConfig GivenProjectionConfig() {

src/EventStore.Projections.Core.Tests/Services/core_projection/checkpoint_manager/FakeCoreProjection.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ public class FakeCoreProjection : ICoreProjection,
2424
public readonly List<CoreProjectionProcessingMessage.PrerecordedEventsLoaded> _prerecordedEventsLoadedMessages =
2525
new List<CoreProjectionProcessingMessage.PrerecordedEventsLoaded>();
2626

27+
public readonly List<CoreProjectionProcessingMessage.Failed> _failedMessages =
28+
new List<CoreProjectionProcessingMessage.Failed>();
29+
2730
public void Handle(CoreProjectionProcessingMessage.CheckpointCompleted message) {
2831
_checkpointCompletedMessages.Add(message);
2932
}
@@ -37,7 +40,7 @@ public void Handle(CoreProjectionProcessingMessage.RestartRequested message) {
3740
}
3841

3942
public void Handle(CoreProjectionProcessingMessage.Failed message) {
40-
throw new System.NotImplementedException();
43+
_failedMessages.Add(message);
4144
}
4245

4346
public void Handle(CoreProjectionProcessingMessage.PrerecordedEventsLoaded message) {

src/EventStore.Projections.Core.Tests/Services/core_projection/checkpoint_manager/TestFixtureWithCoreProjectionCheckpointManager.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).
33

44
using System;
5+
using EventStore.Core.Util;
56
using EventStore.Projections.Core.Messages;
67
using EventStore.Projections.Core.Services;
78
using EventStore.Projections.Core.Services.Processing;
@@ -35,6 +36,7 @@ public abstract class TestFixtureWithCoreProjectionCheckpointManager<TLogFormat,
3536
protected CoreProjectionCheckpointReader _checkpointReader;
3637
protected string _projectionName;
3738
protected ProjectionVersion _projectionVersion;
39+
protected int _maxProjectionStateSize = Opts.MaxProjectionStateSizeDefault;
3840

3941
[SetUp]
4042
public void setup() {
@@ -63,7 +65,7 @@ protected virtual DefaultCheckpointManager GivenCheckpointManager() {
6365
_bus, _projectionCorrelationId, _projectionVersion, null, _ioDispatcher, _config, _projectionName,
6466
new StreamPositionTagger(0, "stream"), _namingBuilder, _checkpointsEnabled, _producesResults,
6567
_definesFold,
66-
_checkpointWriter);
68+
_checkpointWriter, _maxProjectionStateSize);
6769
}
6870

6971
protected new virtual void Given() {

src/EventStore.Projections.Core.Tests/Services/core_projection/checkpoint_manager/multi_stream/TestFixtureWithMultiStreamCheckpointManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
22
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).
33

4+
using EventStore.Core.Util;
45
using EventStore.Projections.Core.Services.Processing;
56
using EventStore.Projections.Core.Services.Processing.Checkpointing;
67
using EventStore.Projections.Core.Services.Processing.MultiStream;
@@ -20,6 +21,6 @@ protected override DefaultCheckpointManager GivenCheckpointManager() {
2021
return new MultiStreamMultiOutputCheckpointManager(
2122
_bus, _projectionCorrelationId, _projectionVersion, null, _ioDispatcher, _config, _projectionName,
2223
new MultiStreamPositionTagger(0, _streams), _namingBuilder, _checkpointsEnabled, true, true,
23-
_checkpointWriter);
24+
_checkpointWriter, Opts.MaxProjectionStateSizeDefault);
2425
}
2526
}

src/EventStore.Projections.Core.Tests/Services/core_projection/checkpoint_manager/multi_stream/with_multi_stream_checkpoint_manager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using EventStore.Core.Messages;
1010
using EventStore.Core.Services.UserManagement;
1111
using EventStore.Core.Tests.Helpers.IODispatcherTests;
12+
using EventStore.Core.Util;
1213
using EventStore.Projections.Core.Services;
1314
using EventStore.Projections.Core.Services.Processing;
1415
using EventStore.Projections.Core.Services.Processing.Checkpointing;
@@ -56,7 +57,7 @@ public void TestFixtureSetUp() {
5657
_checkpointManager = new MultiStreamMultiOutputCheckpointManager(_bus, _projectionId, _projectionVersion,
5758
SystemAccounts.System,
5859
_ioDispatcher, _projectionConfig, _projectionName, _positionTagger, _namingBuilder, true, true, false,
59-
_coreProjectionCheckpointWriter);
60+
_coreProjectionCheckpointWriter, Opts.MaxProjectionStateSizeDefault);
6061

6162
When();
6263
}

src/EventStore.Projections.Core.Tests/Services/core_projection/checkpoint_manager/when_creating_a_default_checkpoint_manager.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using EventStore.Core.Tests;
6+
using EventStore.Core.Util;
67
using EventStore.Projections.Core.Services.Processing;
78
using EventStore.Projections.Core.Services.Processing.Checkpointing;
89
using EventStore.Projections.Core.Services.Processing.SingleStream;
@@ -28,7 +29,7 @@ public void it_can_be_created() {
2829
_manager = new DefaultCheckpointManager(
2930
_bus, _projectionCorrelationId, new ProjectionVersion(1, 0, 0), null, _ioDispatcher, _config,
3031
"projection", new StreamPositionTagger(0, "stream"), _namingBuilder, _checkpointsEnabled,
31-
_producesResults, _definesFold, _coreProjectionCheckpointWriter);
32+
_producesResults, _definesFold, _coreProjectionCheckpointWriter, Opts.MaxProjectionStateSizeDefault);
3233
}
3334

3435
[Test]
@@ -37,7 +38,7 @@ public void null_publisher_throws_argument_null_exception() {
3738
_manager = new DefaultCheckpointManager(
3839
null, _projectionCorrelationId, new ProjectionVersion(1, 0, 0), null, _ioDispatcher, _config,
3940
"projection", new StreamPositionTagger(0, "stream"), _namingBuilder, _checkpointsEnabled,
40-
_producesResults, _definesFold, _coreProjectionCheckpointWriter);
41+
_producesResults, _definesFold, _coreProjectionCheckpointWriter, Opts.MaxProjectionStateSizeDefault);
4142
});
4243
}
4344

@@ -47,7 +48,7 @@ public void null_io_dispatcher_throws_argument_null_exception() {
4748
_manager = new DefaultCheckpointManager(
4849
_bus, _projectionCorrelationId, new ProjectionVersion(1, 0, 0), null, null, _config, "projection",
4950
new StreamPositionTagger(0, "stream"), _namingBuilder, _checkpointsEnabled, _producesResults,
50-
_definesFold, _coreProjectionCheckpointWriter);
51+
_definesFold, _coreProjectionCheckpointWriter, Opts.MaxProjectionStateSizeDefault);
5152
});
5253
}
5354

@@ -58,7 +59,7 @@ public void null_projection_config_throws_argument_null_exception() {
5859
_bus, _projectionCorrelationId, new ProjectionVersion(1, 0, 0), null, _ioDispatcher, null,
5960
"projection",
6061
new StreamPositionTagger(0, "stream"), _namingBuilder, _checkpointsEnabled, _producesResults,
61-
_definesFold, _coreProjectionCheckpointWriter);
62+
_definesFold, _coreProjectionCheckpointWriter, Opts.MaxProjectionStateSizeDefault);
6263
});
6364
}
6465

@@ -68,7 +69,7 @@ public void null_projection_name_throws_argument_null_exception() {
6869
_manager = new DefaultCheckpointManager(
6970
_bus, _projectionCorrelationId, new ProjectionVersion(1, 0, 0), null, _ioDispatcher, _config, null,
7071
new StreamPositionTagger(0, "stream"), _namingBuilder, _checkpointsEnabled, _producesResults,
71-
_definesFold, _coreProjectionCheckpointWriter);
72+
_definesFold, _coreProjectionCheckpointWriter, Opts.MaxProjectionStateSizeDefault);
7273
});
7374
}
7475

@@ -78,7 +79,7 @@ public void null_position_tagger_throws_argument_null_exception() {
7879
_manager = new DefaultCheckpointManager(
7980
_bus, _projectionCorrelationId, new ProjectionVersion(1, 0, 0), null, _ioDispatcher, _config,
8081
"projection", null, _namingBuilder, _checkpointsEnabled, _producesResults,
81-
_definesFold, _coreProjectionCheckpointWriter);
82+
_definesFold, _coreProjectionCheckpointWriter, Opts.MaxProjectionStateSizeDefault);
8283
});
8384
}
8485

@@ -88,7 +89,7 @@ public void empty_projection_checkpoint_stream_id_throws_argument_exception() {
8889
_manager = new DefaultCheckpointManager(
8990
_bus, _projectionCorrelationId, new ProjectionVersion(1, 0, 0), null, _ioDispatcher, _config, "",
9091
new StreamPositionTagger(0, "stream"), _namingBuilder, _checkpointsEnabled, _producesResults,
91-
_definesFold, _coreProjectionCheckpointWriter);
92+
_definesFold, _coreProjectionCheckpointWriter, Opts.MaxProjectionStateSizeDefault);
9293
});
9394
}
9495

@@ -98,7 +99,7 @@ public void empty_projection_name_throws_argument_exception() {
9899
_manager = new DefaultCheckpointManager(
99100
_bus, _projectionCorrelationId, new ProjectionVersion(1, 0, 0), null, _ioDispatcher, _config, "",
100101
new StreamPositionTagger(0, "stream"), _namingBuilder, _checkpointsEnabled, _producesResults,
101-
_definesFold, _coreProjectionCheckpointWriter);
102+
_definesFold, _coreProjectionCheckpointWriter, Opts.MaxProjectionStateSizeDefault);
102103
});
103104
}
104105
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
2+
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).
3+
4+
using System;
5+
using System.Linq;
6+
using EventStore.Core.Tests;
7+
using EventStore.Projections.Core.Services;
8+
using EventStore.Projections.Core.Services.Processing.Checkpointing;
9+
using EventStore.Projections.Core.Services.Processing.Partitioning;
10+
using NUnit.Framework;
11+
12+
namespace EventStore.Projections.Core.Tests.Services.core_projection.checkpoint_manager;
13+
14+
[TestFixture(typeof(LogFormat.V2), typeof(string))]
15+
public class when_projection_state_is_too_large<TLogFormat, TStreamId> :
16+
TestFixtureWithCoreProjectionCheckpointManager<TLogFormat, TStreamId> {
17+
private Exception _exception;
18+
19+
protected override void Given() {
20+
AllWritesSucceed();
21+
base.Given();
22+
this._checkpointHandledThreshold = 2;
23+
this._maxProjectionStateSize = 1024 * 1024;
24+
}
25+
26+
protected override void When() {
27+
base.When();
28+
_exception = null;
29+
try {
30+
_checkpointReader.BeginLoadState();
31+
var checkpointLoaded =
32+
_consumer.HandledMessages.OfType<CoreProjectionProcessingMessage.CheckpointLoaded>().First();
33+
_checkpointWriter.StartFrom(checkpointLoaded.CheckpointTag, checkpointLoaded.CheckpointEventNumber);
34+
_manager.BeginLoadPrerecordedEvents(checkpointLoaded.CheckpointTag);
35+
36+
// Initial checkpoint and state
37+
var initialCheckpointTag = CheckpointTag.FromStreamPosition(0, "stream", 10);
38+
_manager.Start(initialCheckpointTag, null);
39+
var oldState = new PartitionState("", "", initialCheckpointTag);
40+
41+
// checkpoint and state after first event processed
42+
var firstEventCheckpointTag = CheckpointTag.FromStreamPosition(0, "stream", 11);
43+
var newState = new PartitionState("{ \"state\": \"foo\"}", "",
44+
firstEventCheckpointTag);
45+
_manager.StateUpdated("", oldState, newState);
46+
_manager.EventProcessed(firstEventCheckpointTag, 55.5f);
47+
48+
// second event processed fails, as the state is too large
49+
var secondEventCheckpointTag = CheckpointTag.FromStreamPosition(0, "stream", 12);
50+
oldState = newState;
51+
newState = new PartitionState($"{{ \"state\": \"{new string('*', _maxProjectionStateSize)}\"}}", "",
52+
firstEventCheckpointTag);
53+
_manager.StateUpdated("", oldState, newState);
54+
_manager.EventProcessed(secondEventCheckpointTag, 77.7f);
55+
} catch (Exception ex) {
56+
_exception = ex;
57+
}
58+
}
59+
60+
[Test]
61+
public void messages_are_handled() {
62+
Assert.IsNull(_exception);
63+
}
64+
65+
[Test]
66+
public void publishes_projection_failed_message() {
67+
var failedMessages = _consumer.HandledMessages.OfType<CoreProjectionProcessingMessage.Failed>().ToArray();
68+
Assert.AreEqual(1, failedMessages.Length);
69+
Assert.True(failedMessages[0].Reason.Contains("exceeds the configured MaxProjectionStateSize"));
70+
}
71+
72+
[Test]
73+
public void the_second_event_is_not_processed() {
74+
var stats = new ProjectionStatistics();
75+
_manager.GetStatistics(stats);
76+
Assert.AreEqual(1, stats.EventsProcessedAfterRestart);
77+
Assert.AreEqual(55.5f, stats.Progress);
78+
Assert.AreEqual(CheckpointTag.FromStreamPosition(0, "stream", 11).ToString(), stats.Position);
79+
}
80+
}

src/EventStore.Projections.Core.Tests/Services/core_projection/multi_phase/specification_with_multi_phase_core_projection.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using EventStore.Core.Bus;
77
using EventStore.Core.Helpers;
88
using EventStore.Core.Services.TimerService;
9+
using EventStore.Core.Util;
910
using EventStore.Projections.Core.Messages;
1011
using EventStore.Projections.Core.Services;
1112
using EventStore.Projections.Core.Services.Processing;
@@ -38,7 +39,7 @@ class FakeProjectionProcessingStrategy : ProjectionProcessingStrategy {
3839
public FakeProjectionProcessingStrategy(
3940
string name, ProjectionVersion projectionVersion, ILogger logger, FakeProjectionProcessingPhase phase1,
4041
FakeProjectionProcessingPhase phase2)
41-
: base(name, projectionVersion, logger) {
42+
: base(name, projectionVersion, logger, Opts.MaxProjectionStateSizeDefault) {
4243
_phase1 = phase1;
4344
_phase2 = phase2;
4445
}

0 commit comments

Comments
 (0)