-
Notifications
You must be signed in to change notification settings - Fork 1.6k
add redis channel #176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
add redis channel #176
Conversation
add redis channel
Hi @JunLuo , Thanks for the pull request! This looks like a very useful change to me. However, Travis CI has reported a compilation error:
Can you please fix it, so that we can proceed with the reviews? Thank you, Donat |
delete unused testapp
add license comments
fix codestyle check errors
fix codestyle check errors
fix codestyle check errors
init redis channel version
fix a stupid error
Hi @bessbd , |
Hi @JunLuo , Thank you for the changes! On a first pass, I've noticed that a new user guide entry is missing. Thank you, Donat |
add redis channel documentation
Hi @bessbd , |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the new channel, it is going to be very useful!
I added some questions and suggestions bellow.
It would be great if you could write unit and/or integration test.
https://github.com/kstyrc/embedded-redis
Could be a good tool to create the integration tests.
*/ | ||
package org.apache.flume.channels.redis; | ||
|
||
import com.google.common.base.Optional; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use java 8 optional instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@Override | ||
protected void doPut(Event event) throws InterruptedException { | ||
type = TransactionType.PUT; | ||
if (!serializedEvents.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just initialize with the empty list?
or we could use the Optional.ifPresent(Consumer<? super T> consumer) at the end, where the get() is called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} | ||
|
||
try { | ||
if (!tempOutStream.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the optional wrapper seems unnecessary because there is a single initialization and 3 get in the same method.
Initialization could be moved to declaration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
if (!tempOutStream.isPresent()) { | ||
tempOutStream = Optional.of(new ByteArrayOutputStream()); | ||
} | ||
if (!writer.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the optional might be an overhead here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} catch (Exception ex) { | ||
LOGGER.warn("Error while get multi message", ex); | ||
} | ||
if (!events.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable has only this initialization and a few get in this class. I guess the Optional here is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
.. code-block:: properties | ||
|
||
a1.channels.channel1.type = org.flume.channels.redis.RedisChannel | ||
a1.channels.channel1.server = redis-server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to have an example host (localhost) here or an IP address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
||
================================================ ================================ ======================================================== | ||
Property Name Default Description | ||
================================================ ================================ ======================================================== |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cluster scenario is not documented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
private int maxWaitMillis; | ||
|
||
private JedisPoolConfig createJedisConfig() { | ||
redisConnTimeout = 60000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great if these settings could be set from the configuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
return jedisConfig; | ||
} | ||
|
||
public static RedisController getRedisController(String host, int port, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if we would like to have two channel with different redis servers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
maxIdle = 300; | ||
minIdle = 10; | ||
maxWaitMillis = 60000; | ||
JedisPoolConfig jedisConfig = new JedisPoolConfig(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the cluster related config names but not the actual implementation that uses them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
remove useness optional wrapper
`add sentinel and cluster type `move redis config to channel config `add unit test
fix code style
fix code style error
add cluster and redis config documention
Hi @szaboferee ,
|
Hi @szaboferee , |
Redis Channel | ||
~~~~~~~~~~~~~ | ||
|
||
The events are stored in a redis server (must be installed separately). Redis provides high concurrency. If you have many agents but want to share one channel which can provide high concurrency, Redis Channel is a good choice. Redis also can provide high availability and replication, you can choose the best way to adapt your application. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be "Redis" instead of "redis".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the sentence Redis provides high concurrency.
seems redundant.
|
||
.. code-block:: properties | ||
|
||
a1.channels.channel1.type = org.flume.channels.redis.RedisChannel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add RedisChannel to flume/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
so that people don't have to type org.flume.channels.redis.RedisChannel
redisOperator = RedisInit.getInstance(redisConf, jedisPoolConfig); | ||
counter.start(); | ||
super.start(); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have noticed a few occurrences of catch (Exception e)
. According to https://stackoverflow.com/a/2416334/5323166 we should catch the most specific type we can. Can we catch more specific exceptions here?
Overall, this change looks fine to me. After receiving feedback for the comments I have left, I'd like to request contributors and committers to do a review of this PR so that we can merge it. |
change redis description
change channel type
channel exception type
Hi @bessbd , |
Can one of the admins verify this patch? |
pili-base -> pili
In some cases, we need events can not be lost. But we don't want to install zk & kafka.
To make event deliver only once and to deploy many flume agents in different nodes but share only one channel, we developed redis channel.
We already use the redis channel in out production environment, and we want to feed back to flume.