@@ -15,11 +15,10 @@ type LocalSubscriber struct {
15
15
16
16
disconnected atomic.Uint32
17
17
out chan * Update
18
- outMutex sync.Mutex
18
+ mutex sync.Mutex
19
19
responseLastEventID chan string
20
20
ready atomic.Uint32
21
21
liveQueue []* Update
22
- liveMutex sync.RWMutex
23
22
}
24
23
25
24
const outBufferLength = 1000
@@ -44,45 +43,39 @@ func NewLocalSubscriber(lastEventID string, logger Logger, topicSelectorStore *T
44
43
// Security checks must (topics matching) be done before calling Dispatch,
45
44
// for instance by calling Match.
46
45
func (s * LocalSubscriber ) Dispatch (u * Update , fromHistory bool ) bool {
47
- s .outMutex .Lock ()
46
+ s .mutex .Lock ()
48
47
49
48
if s .disconnected .Load () > 0 {
50
- s .outMutex .Unlock ()
49
+ s .mutex .Unlock ()
51
50
52
51
return false
53
52
}
54
53
55
54
if ! fromHistory && s .ready .Load () < 1 {
56
- s .liveMutex .Lock ()
57
55
s .liveQueue = append (s .liveQueue , u )
58
- s .liveMutex .Unlock ()
59
-
60
- s .outMutex .Unlock ()
56
+ s .mutex .Unlock ()
61
57
62
58
return true
63
59
}
64
60
65
61
select {
66
62
case s .out <- u :
67
- s .outMutex .Unlock ()
63
+ s .mutex .Unlock ()
64
+
65
+ return true
68
66
default :
69
67
s .handleFullChan ()
70
68
71
69
return false
72
70
}
73
-
74
- return true
75
71
}
76
72
77
73
// Ready flips the ready flag to true and flushes queued live updates returning number of events flushed.
78
74
func (s * LocalSubscriber ) Ready () (n int ) {
79
- s .outMutex .Lock ()
80
-
81
- s .liveMutex .RLock ()
82
- defer s .liveMutex .RUnlock ()
75
+ s .mutex .Lock ()
83
76
84
77
if s .disconnected .Load () > 0 || s .ready .Load () > 0 {
85
- s .outMutex .Unlock ()
78
+ s .mutex .Unlock ()
86
79
87
80
return 0
88
81
}
@@ -103,7 +96,7 @@ func (s *LocalSubscriber) Ready() (n int) {
103
96
s .ready .Store (1 )
104
97
s .liveQueue = nil
105
98
106
- s .outMutex .Unlock ()
99
+ s .mutex .Unlock ()
107
100
108
101
return n
109
102
}
@@ -120,8 +113,8 @@ func (s *LocalSubscriber) HistoryDispatched(responseLastEventID string) {
120
113
121
114
// Disconnect disconnects the subscriber.
122
115
func (s * LocalSubscriber ) Disconnect () {
123
- s .outMutex .Lock ()
124
- defer s .outMutex .Unlock ()
116
+ s .mutex .Lock ()
117
+ defer s .mutex .Unlock ()
125
118
126
119
if s .disconnected .Load () > 0 {
127
120
return // already disconnected
@@ -133,7 +126,7 @@ func (s *LocalSubscriber) Disconnect() {
133
126
134
127
// handleFullChan disconnects the subscriber when the out channel is full.
135
128
func (s * LocalSubscriber ) handleFullChan () {
136
- defer s .outMutex .Unlock ()
129
+ defer s .mutex .Unlock ()
137
130
if s .disconnected .Load () > 0 {
138
131
return // already disconnected
139
132
}
0 commit comments