Skip to content

add redis channel #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: trunk
Choose a base branch
from
Open

add redis channel #176

wants to merge 18 commits into from

Conversation

JunLuo
Copy link

@JunLuo JunLuo commented Sep 16, 2017

In some cases, we need events can not be lost. But we don't want to install zk & kafka.
To make event deliver only once and to deploy many flume agents in different nodes but share only one channel, we developed redis channel.
We already use the redis channel in out production environment, and we want to feed back to flume.

add redis channel
@bessbd
Copy link
Member

bessbd commented Sep 16, 2017

Hi @JunLuo ,

Thanks for the pull request!

This looks like a very useful change to me. However, Travis CI has reported a compilation error:

[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /home/travis/build/apache/flume/flume-ng-channels/flume-redis-channel/src/test/java/org/flume/redis/channel/AppTest.java:[3,22] error: package junit.framework does not exist
[ERROR] /home/travis/build/apache/flume/flume-ng-channels/flume-redis-channel/src/test/java/org/flume/redis/channel/AppTest.java:[4,22] error: package junit.framework does not exist
[ERROR] /home/travis/build/apache/flume/flume-ng-channels/flume-redis-channel/src/test/java/org/flume/redis/channel/AppTest.java:[5,22] error: package junit.framework does not exist
[ERROR] /home/travis/build/apache/flume/flume-ng-channels/flume-redis-channel/src/test/java/org/flume/redis/channel/AppTest.java:[11,12] error: cannot find symbol
[ERROR] class TestCase
/home/travis/build/apache/flume/flume-ng-channels/flume-redis-channel/src/test/java/org/flume/redis/channel/AppTest.java:[26,18] error: cannot find symbol
[ERROR] class AppTest
/home/travis/build/apache/flume/flume-ng-channels/flume-redis-channel/src/test/java/org/flume/redis/channel/AppTest.java:[28,19] error: cannot find symbol
[ERROR] class AppTest
/home/travis/build/apache/flume/flume-ng-channels/flume-redis-channel/src/test/java/org/flume/redis/channel/AppTest.java:[36,8] error: cannot find symbol
[INFO] 7 errors

Can you please fix it, so that we can proceed with the reviews?

Thank you,

Donat

delete unused testapp
add license comments
fix codestyle check errors
fix codestyle check errors
fix codestyle check errors
init redis channel version
fix a stupid error
@JunLuo
Copy link
Author

JunLuo commented Sep 16, 2017

Hi @bessbd ,
I have fixed the errors about Junit and code style :)

fix a bug
@bessbd
Copy link
Member

bessbd commented Sep 25, 2017

Hi @JunLuo ,

Thank you for the changes!

On a first pass, I've noticed that a new user guide entry is missing.
For this change to get merged, some documentation would be necessary.
For an example, I'd recommend Kafka Channel.
Do you think you can make the necessary additions?

Thank you,

Donat

add redis channel documentation
@JunLuo
Copy link
Author

JunLuo commented Sep 26, 2017

Hi @bessbd ,
I've add some documentation about redis channel :)

Copy link
Contributor

@szaboferee szaboferee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the new channel, it is going to be very useful!
I added some questions and suggestions bellow.

It would be great if you could write unit and/or integration test.
https://github.com/kstyrc/embedded-redis
Could be a good tool to create the integration tests.

*/
package org.apache.flume.channels.redis;

import com.google.common.base.Optional;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use java 8 optional instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@Override
protected void doPut(Event event) throws InterruptedException {
type = TransactionType.PUT;
if (!serializedEvents.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just initialize with the empty list?

or we could use the Optional.ifPresent(Consumer<? super T> consumer) at the end, where the get() is called

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

try {
if (!tempOutStream.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the optional wrapper seems unnecessary because there is a single initialization and 3 get in the same method.
Initialization could be moved to declaration

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (!tempOutStream.isPresent()) {
tempOutStream = Optional.of(new ByteArrayOutputStream());
}
if (!writer.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the optional might be an overhead here too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

} catch (Exception ex) {
LOGGER.warn("Error while get multi message", ex);
}
if (!events.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable has only this initialization and a few get in this class. I guess the Optional here is not needed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

.. code-block:: properties

a1.channels.channel1.type = org.flume.channels.redis.RedisChannel
a1.channels.channel1.server = redis-server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to have an example host (localhost) here or an IP address

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


================================================ ================================ ========================================================
Property Name Default Description
================================================ ================================ ========================================================
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cluster scenario is not documented

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

private int maxWaitMillis;

private JedisPoolConfig createJedisConfig() {
redisConnTimeout = 60000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if these settings could be set from the configuration

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return jedisConfig;
}

public static RedisController getRedisController(String host, int port,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we would like to have two channel with different redis servers?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

maxIdle = 300;
minIdle = 10;
maxWaitMillis = 60000;
JedisPoolConfig jedisConfig = new JedisPoolConfig();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the cluster related config names but not the actual implementation that uses them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

remove useness optional wrapper
`add sentinel and cluster type
`move redis config to channel config
`add unit test
fix code style
fix code style error
add cluster and redis config documention
@JunLuo
Copy link
Author

JunLuo commented Sep 29, 2017

Hi @szaboferee ,
I've done these changes:

  1. Add cluster and sentinel mode
  2. Remove useless optional
  3. Add the documention about cluster and sentinel
  4. Move redis config to flume configuration
  5. Fix the error of singleton
  6. Add unit test

@JunLuo
Copy link
Author

JunLuo commented Oct 28, 2017

Hi @szaboferee
I've fix all problems you mentions in commits, What should I do next?

Redis Channel
~~~~~~~~~~~~~

The events are stored in a redis server (must be installed separately). Redis provides high concurrency. If you have many agents but want to share one channel which can provide high concurrency, Redis Channel is a good choice. Redis also can provide high availability and replication, you can choose the best way to adapt your application.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be "Redis" instead of "redis".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the sentence Redis provides high concurrency. seems redundant.


.. code-block:: properties

a1.channels.channel1.type = org.flume.channels.redis.RedisChannel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add RedisChannel to flume/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java so that people don't have to type org.flume.channels.redis.RedisChannel

redisOperator = RedisInit.getInstance(redisConf, jedisPoolConfig);
counter.start();
super.start();
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have noticed a few occurrences of catch (Exception e). According to https://stackoverflow.com/a/2416334/5323166 we should catch the most specific type we can. Can we catch more specific exceptions here?

@bessbd
Copy link
Member

bessbd commented Oct 29, 2017

Overall, this change looks fine to me. After receiving feedback for the comments I have left, I'd like to request contributors and committers to do a review of this PR so that we can merge it.

change redis description
change channel type
channel exception type
@JunLuo
Copy link
Author

JunLuo commented Nov 11, 2017

Hi @bessbd ,
I've fix all problems you mentions in commits. :)

@asfgit
Copy link

asfgit commented Aug 17, 2018

Can one of the admins verify this patch?

waidr pushed a commit to waidr/flume that referenced this pull request Jul 24, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants