Skip to content

Commit 5d2f74b

Browse files
committed
Merge branch 2.4.2.x
2 parents 050f2a5 + 6babfc1 commit 5d2f74b

File tree

4 files changed

+39
-68
lines changed

4 files changed

+39
-68
lines changed

modules/cpr/src/main/java/org/atmosphere/cache/UUIDBroadcasterCache.java

Lines changed: 34 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

27-
import java.io.Serializable;
2827
import java.util.ArrayList;
2928
import java.util.Collections;
3029
import java.util.HashSet;
@@ -53,7 +52,7 @@ public class UUIDBroadcasterCache implements BroadcasterCache {
5352

5453
private final static Logger logger = LoggerFactory.getLogger(UUIDBroadcasterCache.class);
5554

56-
private final Map<String, ClientQueue> messages = new ConcurrentHashMap<String, ClientQueue>();
55+
private final Map<String, ConcurrentLinkedQueue> messages = new ConcurrentHashMap<String, ConcurrentLinkedQueue>();
5756
private final Map<String, Long> activeClients = new ConcurrentHashMap<String, Long>();
5857
protected final List<BroadcasterCacheInspector> inspectors = new LinkedList<BroadcasterCacheInspector>();
5958
private ScheduledFuture scheduledFuture;
@@ -65,30 +64,6 @@ public class UUIDBroadcasterCache implements BroadcasterCache {
6564
protected final List<BroadcasterCacheListener> listeners = new LinkedList<BroadcasterCacheListener>();
6665
private UUIDProvider uuidProvider;
6766

68-
/**
69-
* This class wraps all messages to be delivered to a client. The class is thread safe to be accessed in a
70-
* concurrent context.
71-
*/
72-
public final static class ClientQueue implements Serializable {
73-
private static final long serialVersionUID = -126253550299206646L;
74-
75-
private final ConcurrentLinkedQueue<CacheMessage> queue = new ConcurrentLinkedQueue<CacheMessage>();
76-
private final Set<String> ids = Collections.synchronizedSet(new HashSet<String>());
77-
78-
public ConcurrentLinkedQueue<CacheMessage> getQueue() {
79-
return queue;
80-
}
81-
82-
public Set<String> getIds() {
83-
return ids;
84-
}
85-
86-
@Override
87-
public String toString() {
88-
return queue.toString();
89-
}
90-
}
91-
9267
@Override
9368
public void configure(AtmosphereConfig config) {
9469
Object o = config.properties().get("shared");
@@ -156,7 +131,7 @@ public CacheMessage addToCache(String broadcasterId, String uuid, BroadcastMessa
156131
cache = false;
157132
}
158133

159-
CacheMessage cacheMessage = new CacheMessage(messageId, message.message(), uuid);;
134+
CacheMessage cacheMessage = new CacheMessage(messageId, message.message(), uuid);
160135
if (cache) {
161136
if (uuid.equals(NULL)) {
162137
//no clients are connected right now, caching message for all active clients
@@ -174,38 +149,33 @@ public CacheMessage addToCache(String broadcasterId, String uuid, BroadcastMessa
174149
@Override
175150
public List<Object> retrieveFromCache(String broadcasterId, String uuid) {
176151

152+
ConcurrentLinkedQueue<CacheMessage> clientQueue = messages.get(uuid);
153+
CacheMessage message;
177154
List<Object> result = new ArrayList<Object>();
178-
179-
ClientQueue clientQueue;
180-
cacheCandidate(broadcasterId, uuid);
181-
clientQueue = messages.remove(uuid);
182-
ConcurrentLinkedQueue<CacheMessage> clientMessages;
183-
if (clientQueue != null) {
184-
clientMessages = clientQueue.getQueue();
185-
186-
for (CacheMessage cacheMessage : clientMessages) {
187-
result.add(cacheMessage.getMessage());
188-
}
155+
if (clientQueue == null) {
156+
logger.debug("client queue is null (not yet created), hence returning back for broadcaster %s and uuid %s",
157+
broadcasterId, uuid);
158+
return result;
189159
}
190160

191-
if (logger.isTraceEnabled()) {
192-
logger.trace("Retrieved for AtmosphereResource {} cached messages {}", uuid, result);
193-
logger.trace("Available cached message {}", messages);
161+
while ((message = clientQueue.poll()) != null) {
162+
result.add(message);
194163
}
195164

196165
return result;
197166
}
198167

199168
@Override
200169
public BroadcasterCache clearCache(String broadcasterId, String uuid, CacheMessage message) {
201-
ClientQueue clientQueue;
202-
clientQueue = messages.get(uuid);
203-
if (clientQueue != null) {
204-
logger.trace("Removing for AtmosphereResource {} cached message {}", uuid, message.getMessage());
205-
notifyRemoveCache(broadcasterId, message);
206-
clientQueue.getQueue().remove(message);
207-
clientQueue.getIds().remove(message.getId());
170+
ConcurrentLinkedQueue<CacheMessage> clientQueue = messages.remove(uuid);
171+
172+
if (clientQueue == null) {
173+
logger.error("Invalid State, no Queue available");
174+
return this;
208175
}
176+
177+
logger.trace("Removing for AtmosphereResource {} cached message {}", uuid, message.getMessage());
178+
notifyRemoveCache(broadcasterId, message);
209179
return this;
210180
}
211181

@@ -240,21 +210,22 @@ private void addMessageIfNotExists(String broadcasterId, String clientId, CacheM
240210
}
241211

242212
private void addMessage(String broadcasterId, String clientId, CacheMessage message) {
243-
ClientQueue clientQueue = messages.get(clientId);
213+
ConcurrentLinkedQueue clientQueue = messages.get(clientId);
244214
if (clientQueue == null) {
245-
clientQueue = new ClientQueue();
246-
// Make sure the client is not in the process of being invalidated
247-
if (activeClients.get(clientId) != null) {
248-
messages.put(clientId, clientQueue);
249-
} else {
250-
// The entry has been invalidated
251-
logger.debug("Client {} is no longer active. Not caching message {}}", clientId, message);
252-
return;
215+
synchronized (message) {
216+
clientQueue = new ConcurrentLinkedQueue();
217+
// Make sure the client is not in the process of being invalidated
218+
if (activeClients.get(clientId) != null) {
219+
messages.put(clientId, clientQueue);
220+
} else {
221+
// The entry has been invalidated
222+
logger.debug("Client {} is no longer active. Not caching message {}}", clientId, message);
223+
return;
224+
}
253225
}
254226
}
255227
notifyAddCache(broadcasterId, message);
256-
clientQueue.getQueue().offer(message);
257-
clientQueue.getIds().add(message.getId());
228+
messages.put(clientId, clientQueue);
258229
}
259230

260231
private void notifyAddCache(String broadcasterId, CacheMessage message) {
@@ -278,11 +249,11 @@ private void notifyRemoveCache(String broadcasterId, CacheMessage message) {
278249
}
279250

280251
private boolean hasMessage(String clientId, String messageId) {
281-
ClientQueue clientQueue = messages.get(clientId);
282-
return clientQueue != null && clientQueue.getIds().contains(messageId);
252+
ConcurrentLinkedQueue clientQueue = messages.get(clientId);
253+
return clientQueue != null && clientQueue.contains(messageId);
283254
}
284255

285-
public Map<String, ClientQueue> messages() {
256+
public Map<String, ConcurrentLinkedQueue> messages() {
286257
return messages;
287258
}
288259

@@ -325,7 +296,7 @@ protected void invalidateExpiredEntries() {
325296

326297
for (String msg : messages().keySet()) {
327298
if (!activeClients().containsKey(msg)) {
328-
messages().remove(msg);
299+
messages.remove(msg);
329300
}
330301
}
331302
}

modules/cpr/src/test/java/org/atmosphere/cpr/UUIDBroadcasterCacheTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void testBasicCache() throws ExecutionException, InterruptedException, Se
7676
broadcaster.broadcast("e2").get();
7777
broadcaster.broadcast("e3").get();
7878

79-
assertEquals(broadcasterCache.messages().get(ar.uuid()).getQueue().size(), 2);
79+
assertEquals(broadcasterCache.messages().get(ar.uuid()).size(), 2);
8080
}
8181

8282
@Test
@@ -90,7 +90,7 @@ public void addRemoveAddTest() throws ExecutionException, InterruptedException,
9090
broadcaster.broadcast("e3").get();
9191

9292
assertEquals(broadcasterCache.messages().size(), 1);
93-
assertEquals(broadcasterCache.messages().get(ar.uuid()).getQueue().size(), 1);
93+
assertEquals(broadcasterCache.messages().get(ar.uuid()).size(), 1);
9494
}
9595

9696
@Test
@@ -137,7 +137,7 @@ public void run() {
137137

138138
latch.await(10, TimeUnit.SECONDS);
139139

140-
assertEquals(broadcasterCache.messages().get(ar.uuid()).getQueue().size(), 100);
140+
assertEquals(broadcasterCache.messages().get(ar.uuid()).size(), 100);
141141
}
142142

143143
public final static class AR implements AtmosphereHandler {

modules/native/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
<artifactId>atmosphere-runtime-native</artifactId>
1212
<packaging>bundle</packaging>
1313
<version>2.4.10-SNAPSHOT</version>
14+
1415
<name>atmosphere-runtime-native</name>
1516
<url>https://github.com/Atmosphere/atmosphere</url>
1617
<build>

pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
<connection>scm:git:[email protected]:Atmosphere/atmosphere.git</connection>
1818
<developerConnection>scm:git:[email protected]:Atmosphere/atmosphere.git</developerConnection>
1919
<url>http://github.com/Atmosphere/atmosphere</url>
20-
<tag>HEAD</tag>
21-
</scm>
20+
</scm>
2221
<prerequisites>
2322
<maven>3.0</maven>
2423
</prerequisites>

0 commit comments

Comments
 (0)