24
24
import org .slf4j .Logger ;
25
25
import org .slf4j .LoggerFactory ;
26
26
27
+ import java .io .Serializable ;
27
28
import java .util .ArrayList ;
28
29
import java .util .Collections ;
29
30
import java .util .HashSet ;
@@ -52,7 +53,7 @@ public class UUIDBroadcasterCache implements BroadcasterCache {
52
53
53
54
private final static Logger logger = LoggerFactory .getLogger (UUIDBroadcasterCache .class );
54
55
55
- private final Map <String , ConcurrentLinkedQueue > messages = new ConcurrentHashMap <String , ConcurrentLinkedQueue >();
56
+ private final Map <String , ClientQueue > messages = new ConcurrentHashMap <String , ClientQueue >();
56
57
private final Map <String , Long > activeClients = new ConcurrentHashMap <String , Long >();
57
58
protected final List <BroadcasterCacheInspector > inspectors = new LinkedList <BroadcasterCacheInspector >();
58
59
private ScheduledFuture scheduledFuture ;
@@ -64,6 +65,30 @@ public class UUIDBroadcasterCache implements BroadcasterCache {
64
65
protected final List <BroadcasterCacheListener > listeners = new LinkedList <BroadcasterCacheListener >();
65
66
private UUIDProvider uuidProvider ;
66
67
68
+ /**
69
+ * This class wraps all messages to be delivered to a client. The class is thread safe to be accessed in a
70
+ * concurrent context.
71
+ */
72
+ public final static class ClientQueue implements Serializable {
73
+ private static final long serialVersionUID = -126253550299206646L ;
74
+
75
+ private final ConcurrentLinkedQueue <CacheMessage > queue = new ConcurrentLinkedQueue <CacheMessage >();
76
+ private final Set <String > ids = Collections .synchronizedSet (new HashSet <String >());
77
+
78
+ public ConcurrentLinkedQueue <CacheMessage > getQueue () {
79
+ return queue ;
80
+ }
81
+
82
+ public Set <String > getIds () {
83
+ return ids ;
84
+ }
85
+
86
+ @ Override
87
+ public String toString () {
88
+ return queue .toString ();
89
+ }
90
+ }
91
+
67
92
@ Override
68
93
public void configure (AtmosphereConfig config ) {
69
94
Object o = config .properties ().get ("shared" );
@@ -131,7 +156,7 @@ public CacheMessage addToCache(String broadcasterId, String uuid, BroadcastMessa
131
156
cache = false ;
132
157
}
133
158
134
- CacheMessage cacheMessage = new CacheMessage (messageId , message .message (), uuid );
159
+ CacheMessage cacheMessage = new CacheMessage (messageId , message .message (), uuid );;
135
160
if (cache ) {
136
161
if (uuid .equals (NULL )) {
137
162
//no clients are connected right now, caching message for all active clients
@@ -149,33 +174,38 @@ public CacheMessage addToCache(String broadcasterId, String uuid, BroadcastMessa
149
174
@ Override
150
175
public List <Object > retrieveFromCache (String broadcasterId , String uuid ) {
151
176
152
- ConcurrentLinkedQueue <CacheMessage > clientQueue = messages .get (uuid );
153
- CacheMessage message ;
154
177
List <Object > result = new ArrayList <Object >();
155
- if (clientQueue == null ) {
156
- logger .debug ("client queue is null (not yet created), hence returning back for broadcaster %s and uuid %s" ,
157
- broadcasterId , uuid );
158
- return result ;
178
+
179
+ ClientQueue clientQueue ;
180
+ cacheCandidate (broadcasterId , uuid );
181
+ clientQueue = messages .remove (uuid );
182
+ ConcurrentLinkedQueue <CacheMessage > clientMessages ;
183
+ if (clientQueue != null ) {
184
+ clientMessages = clientQueue .getQueue ();
185
+
186
+ for (CacheMessage cacheMessage : clientMessages ) {
187
+ result .add (cacheMessage .getMessage ());
188
+ }
159
189
}
160
190
161
- while ((message = clientQueue .poll ()) != null ) {
162
- result .add (message );
191
+ if (logger .isTraceEnabled ()) {
192
+ logger .trace ("Retrieved for AtmosphereResource {} cached messages {}" , uuid , result );
193
+ logger .trace ("Available cached message {}" , messages );
163
194
}
164
195
165
196
return result ;
166
197
}
167
198
168
199
@ Override
169
200
public BroadcasterCache clearCache (String broadcasterId , String uuid , CacheMessage message ) {
170
- ConcurrentLinkedQueue <CacheMessage > clientQueue = messages .remove (uuid );
171
-
172
- if (clientQueue == null ) {
173
- logger .error ("Invalid State, no Queue available" );
174
- return this ;
201
+ ClientQueue clientQueue ;
202
+ clientQueue = messages .get (uuid );
203
+ if (clientQueue != null ) {
204
+ logger .trace ("Removing for AtmosphereResource {} cached message {}" , uuid , message .getMessage ());
205
+ notifyRemoveCache (broadcasterId , message );
206
+ clientQueue .getQueue ().remove (message );
207
+ clientQueue .getIds ().remove (message .getId ());
175
208
}
176
-
177
- logger .trace ("Removing for AtmosphereResource {} cached message {}" , uuid , message .getMessage ());
178
- notifyRemoveCache (broadcasterId , message );
179
209
return this ;
180
210
}
181
211
@@ -210,22 +240,21 @@ private void addMessageIfNotExists(String broadcasterId, String clientId, CacheM
210
240
}
211
241
212
242
private void addMessage (String broadcasterId , String clientId , CacheMessage message ) {
213
- ConcurrentLinkedQueue clientQueue = messages .get (clientId );
243
+ ClientQueue clientQueue = messages .get (clientId );
214
244
if (clientQueue == null ) {
215
- synchronized (message ) {
216
- clientQueue = new ConcurrentLinkedQueue ();
217
- // Make sure the client is not in the process of being invalidated
218
- if (activeClients .get (clientId ) != null ) {
219
- messages .put (clientId , clientQueue );
220
- } else {
221
- // The entry has been invalidated
222
- logger .debug ("Client {} is no longer active. Not caching message {}}" , clientId , message );
223
- return ;
224
- }
245
+ clientQueue = new ClientQueue ();
246
+ // Make sure the client is not in the process of being invalidated
247
+ if (activeClients .get (clientId ) != null ) {
248
+ messages .put (clientId , clientQueue );
249
+ } else {
250
+ // The entry has been invalidated
251
+ logger .debug ("Client {} is no longer active. Not caching message {}}" , clientId , message );
252
+ return ;
225
253
}
226
254
}
227
255
notifyAddCache (broadcasterId , message );
228
- messages .put (clientId , clientQueue );
256
+ clientQueue .getQueue ().offer (message );
257
+ clientQueue .getIds ().add (message .getId ());
229
258
}
230
259
231
260
private void notifyAddCache (String broadcasterId , CacheMessage message ) {
@@ -249,11 +278,11 @@ private void notifyRemoveCache(String broadcasterId, CacheMessage message) {
249
278
}
250
279
251
280
private boolean hasMessage (String clientId , String messageId ) {
252
- ConcurrentLinkedQueue clientQueue = messages .get (clientId );
253
- return clientQueue != null && clientQueue .contains (messageId );
281
+ ClientQueue clientQueue = messages .get (clientId );
282
+ return clientQueue != null && clientQueue .getIds (). contains (messageId );
254
283
}
255
284
256
- public Map <String , ConcurrentLinkedQueue > messages () {
285
+ public Map <String , ClientQueue > messages () {
257
286
return messages ;
258
287
}
259
288
@@ -296,7 +325,7 @@ protected void invalidateExpiredEntries() {
296
325
297
326
for (String msg : messages ().keySet ()) {
298
327
if (!activeClients ().containsKey (msg )) {
299
- messages .remove (msg );
328
+ messages () .remove (msg );
300
329
}
301
330
}
302
331
}
0 commit comments