Skip to content

Commit e4400a4

Browse files
authored
Merge pull request #2354 from EventStore/release-v5.0.7
Backport PR #2353 for version 5.0.7
2 parents cb45243 + 3c72b74 commit e4400a4

File tree

5 files changed

+108
-58
lines changed

5 files changed

+108
-58
lines changed

src/EventStore.ClientAPI/Transport.Tcp/TcpConnection.cs

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,10 @@ public int SendQueueSize {
6767

6868
private readonly object _receivingLock = new object();
6969
private readonly object _sendLock = new object();
70+
private readonly object _closeLock = new object();
71+
7072
private bool _isSending;
71-
private volatile int _closed;
72-
private int _dispatchingData; //states: 0 - not dispatching data, 1 - dispatching data, 2 - final state, data should not be dispatched after reaching this state
73+
private volatile bool _isClosed;
7374

7475
private Action<ITcpConnection, IEnumerable<ArraySegment<byte>>> _receiveCallback;
7576
private readonly Action<ITcpConnection, SocketError> _onConnectionClosed;
@@ -172,7 +173,7 @@ private void ProcessSend(SocketAsyncEventArgs socketArgs) {
172173
CloseInternal(socketArgs.SocketError, "Socket send error.");
173174
} else {
174175
NotifySendCompleted(socketArgs.Count);
175-
if (_closed != 0)
176+
if (_isClosed)
176177
ReturnSendingSocketArgs();
177178
else {
178179
lock (_sendLock) {
@@ -261,15 +262,9 @@ private void TryDequeueReceivedData() {
261262
_receiveCallback = null;
262263
}
263264

264-
var oldState = Interlocked.CompareExchange(ref _dispatchingData, 1, 0);
265-
if (oldState == 0 || oldState == 1) { //oldState can be 1 if there's a recursive ReceiveAsync call in the callback
266-
try {
267-
callback(this, res);
268-
} finally {
269-
if (oldState == 0) {
270-
Interlocked.Exchange(ref _dispatchingData, 0);
271-
}
272-
}
265+
lock (_closeLock) {
266+
if(!_isClosed)
267+
callback(this, res);
273268
}
274269

275270
int bytes = 0;
@@ -285,14 +280,10 @@ public void Close(string reason) {
285280
}
286281

287282
private void CloseInternal(SocketError socketError, string reason) {
288-
#pragma warning disable 420
289-
if (Interlocked.CompareExchange(ref _closed, 1, 0) != 0)
290-
return;
291-
#pragma warning restore 420
292-
293-
SpinWait spinWait = new SpinWait();
294-
while(Interlocked.CompareExchange(ref _dispatchingData, 2, 0) != 0)
295-
spinWait.SpinOnce();
283+
lock (_closeLock) {
284+
if (_isClosed) return;
285+
_isClosed = true;
286+
}
296287

297288
NotifyClosed();
298289

src/EventStore.ClientAPI/Transport.Tcp/TcpConnectionSsl.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,10 @@ public int SendQueueSize {
6565
private readonly MemoryStream _memoryStream = new MemoryStream();
6666

6767
private readonly object _streamLock = new object();
68+
private readonly object _closeLock = new object();
6869
private bool _isSending;
69-
private int _receiveHandling; //states: 0 - not receiving data, 1 - receiving/dispatching data, 2 - final state, no data received/dispatched after reaching this state
70-
private int _isClosed;
70+
private int _receiveHandling;
71+
private volatile bool _isClosed;
7172

7273
private Action<ITcpConnection, IEnumerable<ArraySegment<byte>>> _receiveCallback;
7374
private readonly Action<ITcpConnection, SocketError> _onConnectionClosed;
@@ -371,7 +372,10 @@ private void TryDequeueReceivedData() {
371372
res.Add(piece);
372373
}
373374

374-
callback(this, res);
375+
lock (_closeLock) {
376+
if(!_isClosed)
377+
callback(this, res);
378+
}
375379

376380
int bytes = 0;
377381
for (int i = 0, n = res.Count; i < n; ++i) {
@@ -392,12 +396,10 @@ public void Close(string reason) {
392396
}
393397

394398
private void CloseInternal(SocketError socketError, string reason) {
395-
if (Interlocked.CompareExchange(ref _isClosed, 1, 0) != 0)
396-
return;
397-
398-
SpinWait spinWait = new SpinWait();
399-
while(Interlocked.CompareExchange(ref _receiveHandling, 2, 0) != 0)
400-
spinWait.SpinOnce();
399+
lock (_closeLock) {
400+
if (_isClosed) return;
401+
_isClosed = true;
402+
}
401403

402404
NotifyClosed();
403405

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Net;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using EventStore.Core.Tests.ClientAPI;
7+
using EventStore.Core.Tests.Integration;
8+
using EventStore.Transport.Tcp;
9+
using NUnit.Framework;
10+
11+
namespace EventStore.Core.Tests.Services.Transport.Tcp {
12+
13+
[TestFixture]
14+
public class when_invalid_data_is_sent_over_tcp : specification_with_cluster {
15+
16+
[Timeout(5000)]
17+
[TestCase("InternalTcpEndPoint", false)]
18+
[TestCase("InternalTcpSecEndPoint", true)]
19+
[TestCase("ExternalTcpEndPoint", false)]
20+
[TestCase("ExternalTcpSecEndPoint", true)]
21+
public void connection_should_be_closed_by_remote_party(string endpointProperty, bool secure) {
22+
IPEndPoint endpoint = (IPEndPoint)_nodes[0].GetType().GetProperty(endpointProperty).GetValue(_nodes[0], null);
23+
var connectedEvent = new ManualResetEvent(false);
24+
var closedEvent = new ManualResetEvent(false);
25+
ITcpConnection connection;
26+
if (!secure) {
27+
connection = TcpConnection.CreateConnectingTcpConnection(
28+
Guid.NewGuid(),
29+
endpoint,
30+
new TcpClientConnector(),
31+
TimeSpan.FromSeconds(5),
32+
(conn) => connectedEvent.Set(),
33+
(conn, error) => {
34+
Assert.Fail($"Connection failed: {error}");
35+
},
36+
false);
37+
} else {
38+
connection = TcpConnectionSsl.CreateConnectingConnection(
39+
Guid.NewGuid(),
40+
endpoint,
41+
"targethost",
42+
false,
43+
new TcpClientConnector(),
44+
TimeSpan.FromSeconds(5),
45+
(conn) => connectedEvent.Set(),
46+
(conn, error) => {
47+
Assert.Fail($"Connection failed: {error}");
48+
},
49+
false);
50+
}
51+
52+
connection.ConnectionClosed += (conn, error) => {
53+
closedEvent.Set();
54+
};
55+
56+
connectedEvent.WaitOne();
57+
var data = new List<ArraySegment<byte>> {
58+
new ArraySegment<byte>(new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
59+
};
60+
connection.EnqueueSend(data);
61+
closedEvent.WaitOne();
62+
connection.Close("intentional close");
63+
}
64+
}
65+
}

src/EventStore.Transport.Tcp/TcpConnection.cs

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,9 @@ public string ClientConnectionName {
8686

8787
private readonly object _receivingLock = new object();
8888
private readonly object _sendLock = new object();
89+
private readonly object _closeLock = new object();
8990
private bool _isSending;
90-
private volatile int _closed;
91-
private int _dispatchingData; //states: 0 - not dispatching data, 1 - dispatching data, 2 - final state, data should not be dispatched after reaching this state
91+
private volatile bool _isClosed;
9292

9393
private Action<ITcpConnection, IEnumerable<ArraySegment<byte>>> _receiveCallback;
9494

@@ -186,7 +186,7 @@ private void ProcessSend(SocketAsyncEventArgs socketArgs) {
186186
} else {
187187
NotifySendCompleted(socketArgs.Count);
188188

189-
if (_closed != 0)
189+
if (_isClosed)
190190
ReturnSendingSocketArgs();
191191
else {
192192
lock (_sendLock) {
@@ -297,15 +297,9 @@ private void TryDequeueReceivedData() {
297297
data[i] = new ArraySegment<byte>(d.Buf.Array, d.Buf.Offset, d.DataLen);
298298
}
299299

300-
var oldState = Interlocked.CompareExchange(ref _dispatchingData, 1, 0);
301-
if (oldState == 0 || oldState == 1) { //oldState can be 1 if there's a recursive ReceiveAsync call in the callback
302-
try {
303-
callback(this, data);
304-
} finally {
305-
if (oldState == 0) {
306-
Interlocked.Exchange(ref _dispatchingData, 0);
307-
}
308-
}
300+
lock (_closeLock) {
301+
if(!_isClosed)
302+
callback(this, data);
309303
}
310304

311305
for (int i = 0, n = res.Count; i < n; ++i) {
@@ -320,14 +314,10 @@ public void Close(string reason) {
320314
}
321315

322316
private void CloseInternal(SocketError socketError, string reason) {
323-
#pragma warning disable 420
324-
if (Interlocked.CompareExchange(ref _closed, 1, 0) != 0)
325-
return;
326-
#pragma warning restore 420
327-
328-
SpinWait spinWait = new SpinWait();
329-
while(Interlocked.CompareExchange(ref _dispatchingData, 2, 0) != 0)
330-
spinWait.SpinOnce();
317+
lock (_closeLock) {
318+
if (_isClosed) return;
319+
_isClosed = true;
320+
}
331321

332322
NotifyClosed();
333323

src/EventStore.Transport.Tcp/TcpConnectionSsl.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,10 @@ private readonly ConcurrentQueueWrapper<ReceivedData>
9696
private readonly MemoryStream _memoryStream = new MemoryStream();
9797

9898
private readonly object _streamLock = new object();
99+
private readonly object _closeLock = new object();
99100
private bool _isSending;
100-
private int _receiveHandling; //states: 0 - not receiving data, 1 - receiving/dispatching data, 2 - final state, no data received/dispatched after reaching this state
101-
private int _isClosed;
101+
private int _receiveHandling;
102+
private volatile bool _isClosed;
102103

103104
private Action<ITcpConnection, IEnumerable<ArraySegment<byte>>> _receiveCallback;
104105

@@ -488,7 +489,10 @@ private void TryDequeueReceivedData() {
488489
data[i] = new ArraySegment<byte>(d.Buf.Array, d.Buf.Offset, d.DataLen);
489490
}
490491

491-
callback(this, data);
492+
lock (_closeLock) {
493+
if(!_isClosed)
494+
callback(this, data);
495+
}
492496

493497
for (int i = 0, n = res.Count; i < n; ++i) {
494498
TcpConnection.BufferManager.CheckIn(res[i].Buf); // dispose buffers
@@ -508,12 +512,10 @@ public void Close(string reason) {
508512
}
509513

510514
private void CloseInternal(SocketError socketError, string reason) {
511-
if (Interlocked.CompareExchange(ref _isClosed, 1, 0) != 0)
512-
return;
513-
514-
SpinWait spinWait = new SpinWait();
515-
while(Interlocked.CompareExchange(ref _receiveHandling, 2, 0) != 0)
516-
spinWait.SpinOnce();
515+
lock (_closeLock) {
516+
if (_isClosed) return;
517+
_isClosed = true;
518+
}
517519

518520
NotifyClosed();
519521

0 commit comments

Comments
 (0)