Skip to content

Commit a9d8df5

Browse files
authored
Merge pull request #2745 from EventStore/condron-proposal-tracking
Add checkpoint tracking for proposed epochs
2 parents 5e754cd + ae19001 commit a9d8df5

File tree

50 files changed

+1303
-383
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1303
-383
lines changed

src/EventStore.Core.Tests/Helpers/MiniClusterNode.cs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,30 +261,47 @@ private TFChunkDbConfig CreateDbConfig(int chunkSize, string dbPath, long chunks
261261
ICheckpoint writerChk;
262262
ICheckpoint chaserChk;
263263
ICheckpoint epochChk;
264+
ICheckpoint proposalChk;
264265
ICheckpoint truncateChk;
265266
ICheckpoint replicationCheckpoint = new InMemoryCheckpoint(-1);
266267
ICheckpoint indexCheckpoint = new InMemoryCheckpoint(-1);
267268
if (inMemDb) {
268269
writerChk = new InMemoryCheckpoint(Checkpoint.Writer);
269270
chaserChk = new InMemoryCheckpoint(Checkpoint.Chaser);
270271
epochChk = new InMemoryCheckpoint(Checkpoint.Epoch, initValue: -1);
272+
proposalChk = new InMemoryCheckpoint(Checkpoint.Proposal, initValue: -1);
271273
truncateChk = new InMemoryCheckpoint(Checkpoint.Truncate, initValue: -1);
272274
} else {
273275
var writerCheckFilename = Path.Combine(dbPath, Checkpoint.Writer + ".chk");
274276
var chaserCheckFilename = Path.Combine(dbPath, Checkpoint.Chaser + ".chk");
275277
var epochCheckFilename = Path.Combine(dbPath, Checkpoint.Epoch + ".chk");
278+
var proposalFilename = Path.Combine(dbPath, Checkpoint.Proposal + ".chk");
276279
var truncateCheckFilename = Path.Combine(dbPath, Checkpoint.Truncate + ".chk");
277280
writerChk = new MemoryMappedFileCheckpoint(writerCheckFilename, Checkpoint.Writer, cached: true);
278281
chaserChk = new MemoryMappedFileCheckpoint(chaserCheckFilename, Checkpoint.Chaser, cached: true);
279282
epochChk = new MemoryMappedFileCheckpoint(
280283
epochCheckFilename, Checkpoint.Epoch, cached: true, initValue: -1);
284+
proposalChk = new MemoryMappedFileCheckpoint(
285+
proposalFilename, Checkpoint.Proposal, cached: true, initValue: -1);
281286
truncateChk = new MemoryMappedFileCheckpoint(
282287
truncateCheckFilename, Checkpoint.Truncate, cached: true, initValue: -1);
283288
}
284289

285290
var nodeConfig = new TFChunkDbConfig(
286-
dbPath, new VersionedPatternFileNamingStrategy(dbPath, "chunk-"), chunkSize, chunksCacheSize, writerChk,
287-
chaserChk, epochChk, truncateChk, replicationCheckpoint, indexCheckpoint, Constants.TFChunkInitialReaderCountDefault, Constants.TFChunkMaxReaderCountDefault, inMemDb);
291+
dbPath,
292+
new VersionedPatternFileNamingStrategy(dbPath, "chunk-"),
293+
chunkSize,
294+
chunksCacheSize,
295+
writerChk,
296+
chaserChk,
297+
epochChk,
298+
proposalChk,
299+
truncateChk,
300+
replicationCheckpoint,
301+
indexCheckpoint,
302+
Constants.TFChunkInitialReaderCountDefault,
303+
Constants.TFChunkMaxReaderCountDefault,
304+
inMemDb);
288305
return nodeConfig;
289306
}
290307
}

src/EventStore.Core.Tests/Services/ElectionsService/ElectionServiceUnit.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public class ElectionsServiceUnit {
1818
private const int LastCommitPosition = -1;
1919
private const int WriterCheckpoint = 0;
2020
private const int ChaserCheckpoint = 0;
21+
private const int ProposalCheckpoint = -1;
2122
private static readonly DateTime InitialDate = new DateTime(2012, 6, 1);
2223

2324
public ClusterInfo ClusterInfo { get; private set; }
@@ -54,6 +55,7 @@ public ElectionsServiceUnit(ClusterSettings clusterSettings) {
5455
clusterSettings.ClusterNodesCount,
5556
new InMemoryCheckpoint(WriterCheckpoint),
5657
new InMemoryCheckpoint(ChaserCheckpoint),
58+
new InMemoryCheckpoint(ProposalCheckpoint),
5759
new FakeEpochManager(),
5860
() => -1, 0, new FakeTimeProvider());
5961
ElectionsService.SubscribeMessages(_bus);

src/EventStore.Core.Tests/Services/ElectionsService/ElectionsServiceTests.cs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ protected ElectionsFixture(VNodeInfo node, VNodeInfo nodeTwo, VNodeInfo nodeThre
5353
MemberInfoFromVNode(_node, _timeProvider.UtcNow, VNodeState.Unknown, true, 0, _epochId, 0), 3,
5454
new InMemoryCheckpoint(0),
5555
new InMemoryCheckpoint(0),
56+
new InMemoryCheckpoint(-1),
5657
new FakeEpochManager(), () => 0L, 0, _timeProvider);
5758
_sut.SubscribeMessages(_bus);
5859
}
@@ -827,7 +828,7 @@ public void should_send_an_acceptance_to_other_members() {
827828
_nodeThree.HttpEndPoint, 0, 0, 0, _epochId, Guid.Empty, 0, 0, 0, 0));
828829

829830
var expected = new Message[] {
830-
new ElectionMessage.ElectionsDone(0,
831+
new ElectionMessage.ElectionsDone(0,0,
831832
MemberInfo.ForVNode(
832833
_nodeThree.InstanceId, _timeProvider.UtcNow, VNodeState.Unknown, true,
833834
_nodeThree.InternalTcp,
@@ -967,7 +968,7 @@ public void should_complete_elections() {
967968
proposalMessage.LeaderId, proposalMessage.LeaderHttpEndPoint, 0));
968969

969970
var expected = new Message[] {
970-
new ElectionMessage.ElectionsDone(0,
971+
new ElectionMessage.ElectionsDone(0,0,
971972
MemberInfo.ForVNode(
972973
_nodeTwo.InstanceId, _timeProvider.UtcNow, VNodeState.Unknown, true,
973974
_nodeTwo.InternalTcp,
@@ -1186,7 +1187,7 @@ public void should_attempt_not_to_elect_previously_elected_leader() {
11861187
proposalMessage.LeaderId, proposalMessage.LeaderHttpEndPoint, 3));
11871188

11881189
var expected = new Message[] {
1189-
new ElectionMessage.ElectionsDone(3,
1190+
new ElectionMessage.ElectionsDone(3,1,
11901191
MemberInfo.ForVNode(
11911192
_nodeTwo.InstanceId, _timeProvider.UtcNow, VNodeState.Unknown, true,
11921193
_nodeTwo.InternalTcp,
@@ -1239,6 +1240,7 @@ public void should_throw_argument_exception() {
12391240
new Core.Services.ElectionsService(new FakePublisher(), nodeInfo, 3,
12401241
new InMemoryCheckpoint(0),
12411242
new InMemoryCheckpoint(0),
1243+
new InMemoryCheckpoint(-1),
12421244
new FakeEpochManager(), () => 0L, 0, new FakeTimeProvider());
12431245
});
12441246
}
@@ -1266,7 +1268,7 @@ public when_electing_a_leader_and_prepare_ok_is_received_from_previous_leader_in
12661268
[Test]
12671269
public void previous_leader_should_be_elected() {
12681270
var expected = new Message[] {
1269-
new ElectionMessage.ElectionsDone(0,
1271+
new ElectionMessage.ElectionsDone(0,0,
12701272
MemberInfo.ForVNode(
12711273
_nodeThree.InstanceId, _timeProvider.UtcNow, VNodeState.Unknown, true,
12721274
_nodeThree.InternalTcp,
@@ -1355,7 +1357,7 @@ public and_previous_leader_is_present_and_alive_in_cluster_info_in_received_prep
13551357
[Test]
13561358
public void previous_leader_should_not_be_elected() {
13571359
var expected = new Message[] {
1358-
new ElectionMessage.ElectionsDone(0,
1360+
new ElectionMessage.ElectionsDone(0,0,
13591361
MemberInfo.ForVNode(
13601362
_nodeTwo.InstanceId, _timeProvider.UtcNow, VNodeState.Unknown, true,
13611363
_nodeTwo.InternalTcp,
@@ -1388,7 +1390,7 @@ public and_previous_leader_is_present_and_alive_in_cluster_info_in_received_prep
13881390
[Test]
13891391
public void previous_leader_should_not_be_elected() {
13901392
var expected = new Message[] {
1391-
new ElectionMessage.ElectionsDone(0,
1393+
new ElectionMessage.ElectionsDone(0,0,
13921394
MemberInfo.ForVNode(
13931395
_nodeTwo.InstanceId, _timeProvider.UtcNow, VNodeState.Unknown, true,
13941396
_nodeTwo.InternalTcp,
@@ -1439,7 +1441,7 @@ public and_previous_leader_is_not_present_in_cluster_info_in_received_prepare_ok
14391441
[Test]
14401442
public void previous_leader_should_not_be_elected() {
14411443
var expected = new Message[] {
1442-
new ElectionMessage.ElectionsDone(0,
1444+
new ElectionMessage.ElectionsDone(0,0,
14431445
MemberInfo.ForVNode(
14441446
_nodeTwo.InstanceId, _timeProvider.UtcNow, VNodeState.Unknown, true,
14451447
_nodeTwo.InternalTcp,
@@ -1472,7 +1474,7 @@ public and_previous_leader_is_present_but_dead_in_cluster_info_in_received_prepa
14721474
[Test]
14731475
public void previous_leader_should_not_be_elected() {
14741476
var expected = new Message[] {
1475-
new ElectionMessage.ElectionsDone(0,
1477+
new ElectionMessage.ElectionsDone(0,0,
14761478
MemberInfo.ForVNode(
14771479
_nodeTwo.InstanceId, _timeProvider.UtcNow, VNodeState.Unknown, true,
14781480
_nodeTwo.InternalTcp,

src/EventStore.Core.Tests/Services/ElectionsService/FakeEpochManager.cs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,30 +35,34 @@ public EpochRecord[] GetLastEpochs(int maxCount) {
3535
}
3636
}
3737

38-
public EpochRecord GetEpoch(int epochNumber, bool throwIfNotFound) {
38+
public EpochRecord GetEpochAfter(int epochNumber, bool throwIfNotFound) {
3939
lock (_epochs) {
4040
var epoch = _epochs.FirstOrDefault(e => e.EpochNumber == epochNumber);
41+
if (epoch != null) {
42+
var index = _epochs.IndexOf(epoch);
43+
epoch = null;
44+
if (index + 1 < _epochs.Count) {
45+
epoch = _epochs[index + 1];
46+
}
47+
}
48+
4149
if (throwIfNotFound && epoch == null)
4250
throw new ArgumentOutOfRangeException(nameof(epochNumber), "Epoch not Found");
4351
return epoch;
4452
}
4553
}
4654

47-
public EpochRecord GetEpochWithAllEpochs(int epochNumber, bool throwIfNotFound) {
48-
lock (_epochs) {
49-
return GetEpoch(epochNumber, throwIfNotFound);
50-
}
51-
}
55+
5256

5357
public bool IsCorrectEpochAt(long epochPosition, int epochNumber, Guid epochId) {
5458
throw new NotImplementedException();
5559
}
56-
57-
public void WriteNewEpoch() {
60+
61+
public void WriteNewEpoch(int epochNumber) {
5862
throw new NotImplementedException();
5963
}
6064

61-
public void SetLastEpoch(EpochRecord epoch) {
65+
public void CacheEpoch(EpochRecord epoch) {
6266
lock (_epochs) {
6367
_epochs.Add(epoch);
6468
}

0 commit comments

Comments
 (0)