22
22
import java .util .Map ;
23
23
24
24
import org .apache .commons .lang .StringUtils ;
25
- import org .apache .flume .Context ;
26
- import org .apache .flume .CounterGroup ;
27
- import org .apache .flume .Event ;
28
- import org .apache .flume .EventDeliveryException ;
29
- import org .apache .flume .PollableSource ;
30
- import org .apache .flume .RabbitMQConstants ;
31
- import org .apache .flume .RabbitMQUtil ;
32
- import org .apache .flume .conf .Configurable ;
25
+ import org .apache .flume .*;
33
26
import org .apache .flume .event .SimpleEvent ;
34
- import org .apache .flume .source .AbstractSource ;
27
+ import org .apache .flume .source .AbstractPollableSource ;
35
28
import org .slf4j .Logger ;
36
29
import org .slf4j .LoggerFactory ;
37
30
41
34
import com .rabbitmq .client .QueueingConsumer ;
42
35
43
36
44
- public class RabbitMQSource extends AbstractSource implements Configurable , PollableSource {
37
+ public class RabbitMQSource extends AbstractPollableSource {
45
38
private static final Logger log = LoggerFactory .getLogger (RabbitMQSource .class );
46
39
private CounterGroup _CounterGroup ;
47
40
private ConnectionFactory _ConnectionFactory ;
@@ -55,48 +48,22 @@ public class RabbitMQSource extends AbstractSource implements Configurable, Poll
55
48
public RabbitMQSource (){
56
49
_CounterGroup = new CounterGroup ();
57
50
}
58
-
59
-
60
- @ Override
61
- public void configure (Context context ) {
62
- _ConnectionFactory = RabbitMQUtil .getFactory (context );
63
- _QueueName = RabbitMQUtil .getQueueName (context );
64
- _ExchangeName = RabbitMQUtil .getExchangeName (context );
65
- _Topics = RabbitMQUtil .getTopics (context );
66
-
67
- ensureConfigCompleteness ( context );
68
- }
69
-
70
- @ Override
71
- public synchronized void stop () {
72
- RabbitMQUtil .close (_Connection , _Channel );
73
- super .stop ();
74
- }
75
-
76
- private void resetConnection (){
77
- _CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_EXCEPTION );
78
- if (log .isWarnEnabled ())log .warn (this .getName () + " - Closing RabbitMQ connection and channel due to exception." );
79
- RabbitMQUtil .close (_Connection , _Channel );
80
- _Connection =null ;
81
- _Channel =null ;
82
- _Consumer =null ;
83
- }
84
-
51
+
85
52
@ Override
86
- public PollableSource . Status process () throws EventDeliveryException {
53
+ protected Status doProcess () throws EventDeliveryException {
87
54
if (null ==_Connection ){
88
55
try {
89
56
if (log .isInfoEnabled ())log .info (this .getName () + " - Opening connection to " + _ConnectionFactory .getHost () + ":" + _ConnectionFactory .getPort ());
90
57
_Connection = _ConnectionFactory .newConnection ();
91
- _CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_NEW_CONNECTION );
58
+ _CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_NEW_CONNECTION );
92
59
_Channel = null ;
93
60
} catch (Exception ex ) {
94
61
if (log .isErrorEnabled ()) log .error (this .getName () + " - Exception while establishing connection." , ex );
95
62
resetConnection ();
96
63
return Status .BACKOFF ;
97
- }
64
+ }
98
65
}
99
-
66
+
100
67
if (null ==_Channel ){
101
68
try {
102
69
if (log .isInfoEnabled ())log .info (this .getName () + " - creating channel..." );
@@ -105,96 +72,125 @@ public PollableSource.Status process() throws EventDeliveryException {
105
72
if (log .isInfoEnabled ())log .info (this .getName () + " - Connected to " + _ConnectionFactory .getHost () + ":" + _ConnectionFactory .getPort ());
106
73
_Consumer =null ;
107
74
if ( StringUtils .isNotEmpty (_ExchangeName ) ) {
108
- try {
109
- //declare an exchange
110
- _Channel .exchangeDeclarePassive (_ExchangeName );
111
-
112
- //only grab a default queuename if one is not specified in config
113
- if ( StringUtils .isEmpty ( _QueueName ) ) {
114
- _QueueName = _Channel .queueDeclare ().getQueue ();
115
- }
116
-
117
- //for each topic, bind to the key
118
- if ( null != _Topics ) {
119
- for ( String topic : _Topics ) {
120
- _Channel .queueBind (_QueueName , _ExchangeName , topic );
121
- }
122
- }
123
- }
124
- catch ( Exception ex ) {
75
+ try {
76
+ //declare an exchange
77
+ _Channel .exchangeDeclarePassive (_ExchangeName );
78
+
79
+ //only grab a default queuename if one is not specified in config
80
+ if ( StringUtils .isEmpty ( _QueueName ) ) {
81
+ _QueueName = _Channel .queueDeclare ().getQueue ();
82
+ }
83
+
84
+ //for each topic, bind to the key
85
+ if ( null != _Topics ) {
86
+ for ( String topic : _Topics ) {
87
+ _Channel .queueBind (_QueueName , _ExchangeName , topic );
88
+ }
89
+ }
90
+ }
91
+ catch ( Exception ex ) {
125
92
if (log .isErrorEnabled ()) log .error (this .getName () + " - Exception while declaring exchange." , ex );
126
93
resetConnection ();
127
94
return Status .BACKOFF ;
128
- }
95
+ }
129
96
}
130
- } catch (Exception ex ) {
97
+ } catch (Exception ex ) {
131
98
if (log .isErrorEnabled ()) log .error (this .getName () + " - Exception while creating channel." , ex );
132
99
resetConnection ();
133
100
return Status .BACKOFF ;
134
- }
101
+ }
135
102
}
136
103
if (null == _Consumer ){
137
- try {
138
- _Consumer = new QueueingConsumer (_Channel );
139
- _Channel .basicConsume (_QueueName , false , _Consumer );
140
- }catch ( Exception ex ) {
104
+ try {
105
+ _Consumer = new QueueingConsumer (_Channel );
106
+ _Channel .basicConsume (_QueueName , false , _Consumer );
107
+ }catch ( Exception ex ) {
141
108
if (log .isErrorEnabled ()) log .error (this .getName () + " - Exception while registering consumer" , ex );
142
109
resetConnection ();
143
110
return Status .BACKOFF ;
144
- }
111
+ }
112
+ }
113
+
114
+ QueueingConsumer .Delivery delivery ;
115
+
116
+ try {
117
+ delivery = _Consumer .nextDelivery ();
118
+ _CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_GET );
119
+ }
120
+ catch (Exception ex ) {
121
+ _CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_EXCEPTION );
122
+ if (log .isErrorEnabled ())
123
+ log .error (this .getName () + " - Exception thrown while pulling from queue." , ex );
124
+ resetConnection ();
125
+ return Status .BACKOFF ;
126
+ }
127
+
128
+ if (null == delivery ) {
129
+ _CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_GET_MISS );
130
+ return Status .BACKOFF ;
145
131
}
146
132
147
- QueueingConsumer .Delivery delivery ;
148
-
149
- try {
150
- delivery = _Consumer .nextDelivery ();
151
- _CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_GET );
152
- }
153
- catch (Exception ex ) {
154
- _CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_EXCEPTION );
155
- if (log .isErrorEnabled ())
156
- log .error (this .getName () + " - Exception thrown while pulling from queue." , ex );
157
- resetConnection ();
158
- return Status .BACKOFF ;
159
- }
160
-
161
- if (null == delivery ) {
162
- _CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_GET_MISS );
163
- return Status .BACKOFF ;
164
- }
165
-
166
- try {
167
- Map <String , String > properties = RabbitMQUtil .getHeaders (delivery .getProperties ());
168
- Event event = new SimpleEvent ();
169
- event .setBody (delivery .getBody ());
170
- event .setHeaders (properties );
171
-
172
- getChannelProcessor ().processEvent (event );
173
- } catch (Exception ex ) {
174
- if (log .isErrorEnabled ())
175
- log .error (this .getName () + " - Exception thrown while processing event" , ex );
176
-
177
- return Status .BACKOFF ;
178
- }
133
+ try {
134
+ Map <String , String > properties = RabbitMQUtil .getHeaders (delivery .getProperties ());
135
+ Event event = new SimpleEvent ();
136
+ event .setBody (delivery .getBody ());
137
+ event .setHeaders (properties );
138
+
139
+ getChannelProcessor ().processEvent (event );
140
+ } catch (Exception ex ) {
141
+ if (log .isErrorEnabled ())
142
+ log .error (this .getName () + " - Exception thrown while processing event" , ex );
143
+
144
+ return Status .BACKOFF ;
145
+ }
179
146
180
147
try {
181
- _Channel .basicAck (delivery .getEnvelope ().getDeliveryTag (), false );
148
+ _Channel .basicAck (delivery .getEnvelope ().getDeliveryTag (), false );
182
149
_CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_ACK );
183
150
} catch (Exception ex ){
184
151
_CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_EXCEPTION );
185
152
if (log .isErrorEnabled ())log .error (this .getName () + " - Exception thrown while sending ack to queue" , ex );
186
153
resetConnection ();
187
- return Status .BACKOFF ;
154
+ return Status .BACKOFF ;
188
155
}
189
-
190
- return Status .READY ;
156
+
157
+ return Status .READY ;
158
+ }
159
+
160
+ @ Override
161
+ protected void doConfigure (Context context ) throws FlumeException {
162
+ _ConnectionFactory = RabbitMQUtil .getFactory (context );
163
+ _QueueName = RabbitMQUtil .getQueueName (context );
164
+ _ExchangeName = RabbitMQUtil .getExchangeName (context );
165
+ _Topics = RabbitMQUtil .getTopics (context );
166
+
167
+ ensureConfigCompleteness ( context );
168
+ }
169
+
170
+ @ Override
171
+ protected void doStart () throws FlumeException {
172
+
173
+ }
174
+
175
+ @ Override
176
+ protected void doStop () throws FlumeException {
177
+ RabbitMQUtil .close (_Connection , _Channel );
178
+ super .stop ();
191
179
}
192
180
181
+ private void resetConnection (){
182
+ _CounterGroup .incrementAndGet (RabbitMQConstants .COUNTER_EXCEPTION );
183
+ if (log .isWarnEnabled ())log .warn (this .getName () + " - Closing RabbitMQ connection and channel due to exception." );
184
+ RabbitMQUtil .close (_Connection , _Channel );
185
+ _Connection =null ;
186
+ _Channel =null ;
187
+ _Consumer =null ;
188
+ }
193
189
194
190
/**
195
191
* Verify that the required configuration is set
196
192
*
197
- * @param context
193
+ * @param context Context
198
194
*/
199
195
private void ensureConfigCompleteness ( Context context ) {
200
196
0 commit comments