diff --git a/flume-ng-channels/flume-redis-channel/pom.xml b/flume-ng-channels/flume-redis-channel/pom.xml
new file mode 100644
index 0000000000..dcfbbefba4
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/pom.xml
@@ -0,0 +1,56 @@
+
+
+
+
+ flume-ng-channels
+ org.apache.flume
+ 1.9.0-SNAPSHOT
+
+ 4.0.0
+
+ org.apache.flume.flume-ng-channels
+ flume-redis-channel
+
+
+
+ org.apache.flume
+ flume-ng-core
+
+
+ org.apache.flume
+ flume-ng-sdk
+
+
+ redis.clients
+ jedis
+ 2.9.0
+
+
+ com.github.kstyrc
+ embedded-redis
+ 0.6
+
+
+ junit
+ junit
+ test
+
+
+
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannel.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannel.java
new file mode 100755
index 0000000000..b2cd6e3cf5
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannel.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.channel.BasicChannelSemantics;
+import org.apache.flume.channel.BasicTransactionSemantics;
+import org.apache.flume.channels.redis.tools.RedisInit;
+import org.apache.flume.channels.redis.tools.RedisOperator;
+import org.apache.flume.channels.redis.exception.CannotGetRedisInstanceException;
+import org.apache.flume.channels.redis.exception.UnsupportRedisTypeException;
+import org.apache.flume.conf.ConfigurationException;
+
+import static org.apache.flume.channels.redis.RedisChannelConfiguration.*;
+
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.commons.lang.StringUtils;
+import org.jboss.netty.util.internal.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import redis.clients.jedis.JedisPoolConfig;
+
+import com.google.common.net.HostAndPort;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class RedisChannel extends BasicChannelSemantics {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(RedisChannel.class);
+
+ private final Properties redisConf = new Properties();
+ private final String channelUUID = UUID.randomUUID().toString();
+
+ private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT;
+ private String queue_key;
+ private RedisOperator redisOperator;
+ private JedisPoolConfig jedisPoolConfig;
+ private RedisChannelCounter counter;
+
+ private final ThreadLocal transFailoverController = new ThreadLocal() {
+ @Override
+ public TransEvents initialValue() {
+ return new TransEvents(channelUUID);
+ }
+
+ };
+
+ private class TransEvents {
+ final String uuid;
+ final LinkedList failedEvents = new LinkedList();
+
+ TransEvents(String uuid) {
+ this.uuid = uuid;
+ }
+ }
+
+ @Override
+ public void start() {
+ try {
+ LOGGER.info("Starting Redis Channel: " + getName());
+ redisOperator = RedisInit.getInstance(redisConf, jedisPoolConfig);
+ counter.start();
+ super.start();
+ } catch (CannotGetRedisInstanceException e) {
+ LOGGER.error("Could not start Redis Chnnel");
+ throw new FlumeException("Unable to create Redis Connections. "
+ + "Check whether Redis Server are up and that the "
+ + "Flume agent can connect to it.", e);
+ } catch (UnsupportRedisTypeException e) {
+ LOGGER.error("Could not start Redis Chnnel");
+ throw new FlumeException("Unable to create Redis Connections. "
+ + "Check whether Redis Type is correct.", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ counter.stop();
+ super.stop();
+ LOGGER.info("Redis channel {} stopped. Metrics: {}", getName(), counter);
+ }
+
+ @Override
+ protected BasicTransactionSemantics createTransaction() {
+ return new RedisTransaction();
+ }
+
+ Properties getRedisConf() {
+ return redisConf;
+ }
+
+ private JedisPoolConfig createJedisConfig(Properties redisConf)
+ throws ConfigurationException {
+ String maxTotal = redisConf.getProperty(MAX_TO_TOTAL, DEFAULT_MAX_TO_TOTAL);
+ String maxIdle = redisConf.getProperty(MAX_IDLE, DEFAULT_MAX_IDLE);
+ String minIdle = redisConf.getProperty(MIN_IDLE, DEFAULT_MIN_IDLE);
+ String maxWaitMillis = redisConf.getProperty(MAX_WAIT_MILLIS,
+ DEFAULT_MAX_WAIT_MILLIS);
+ String testOnBorrow = redisConf.getProperty(TEST_ON_BORROW,
+ DEFAULT_TEST_ON_BORROW);
+ String testOnReturn = redisConf.getProperty(TEST_ON_RETURN,
+ DEFAULT_TEST_ON_RETURN);
+ String testWhileIdle = redisConf.getProperty(TEST_WHILE_IDLE,
+ DEFAULT_TEST_WHILE_IDLE);
+ String timeBetweenEvictionRunsMillis = redisConf.getProperty(
+ TIME_BETWEEN_EVICTION_RUNMILLIS,
+ DEFAULT_TIME_BETWEEN_EVICTION_RUNMILLIS);
+ String numTestsPerEvictionRun = redisConf.getProperty(
+ NUM_TESTS_PER_EVICTION_RUN, DEFAULT_NUM_TESTS_PER_EVICTION_RUN);
+ String minEvictableIdleTimeMillis = redisConf.getProperty(
+ MIN_EVICTABLE_IDLE_TIME_MILLS, DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLS);
+
+ if (!StringUtils.isNumeric(maxTotal) || !StringUtils.isNumeric(maxIdle)
+ || !StringUtils.isNumeric(minIdle)
+ || !StringUtils.isNumeric(maxWaitMillis)
+ || !StringUtils.isNumeric(timeBetweenEvictionRunsMillis)
+ || !StringUtils.isNumeric(numTestsPerEvictionRun)
+ || !StringUtils.isNumeric(minEvictableIdleTimeMillis)) {
+ StringBuilder sb = new StringBuilder(
+ "Error configuration of redis, the value of these key must be a postive number, but you" +
+ " may specified other types.\n");
+ sb.append(MAX_TO_TOTAL + ": " + maxTotal + "\n");
+ sb.append(MAX_IDLE + ": " + maxIdle + "\n");
+ sb.append(MIN_IDLE + ": " + minIdle + "\n");
+ sb.append(MAX_WAIT_MILLIS + ": " + maxWaitMillis + "\n");
+ sb.append(TIME_BETWEEN_EVICTION_RUNMILLIS + ": "
+ + timeBetweenEvictionRunsMillis + "\n");
+ sb.append(
+ NUM_TESTS_PER_EVICTION_RUN + ": " + numTestsPerEvictionRun + "\n");
+ sb.append(MIN_EVICTABLE_IDLE_TIME_MILLS + ": "
+ + minEvictableIdleTimeMillis + "\n");
+ throw new ConfigurationException(sb.toString());
+ }
+ boolean testOnBorrowBool = true;
+ boolean testOnReturnBool = true;
+ boolean testWhileIdleBool = true;
+ try {
+ testOnBorrowBool = Boolean.parseBoolean(testOnBorrow.trim());
+ testOnReturnBool = Boolean.parseBoolean(testOnReturn.trim());
+ testWhileIdleBool = Boolean.parseBoolean(testWhileIdle.trim());
+ } catch (Exception ex) {
+ StringBuilder sb = new StringBuilder(
+ "Error configuration of redis, the value of these key must be with \"true\" or " +
+ "\"false\", but you may specified other types.\n");
+ sb.append(TEST_ON_BORROW + ": " + testOnBorrow + "\n");
+ sb.append(testOnReturnBool + ": " + testOnReturnBool + "\n");
+ sb.append(testWhileIdleBool + ": " + testWhileIdleBool + "\n");
+ throw new ConfigurationException(sb.toString());
+ }
+
+ JedisPoolConfig jedisConfig = new JedisPoolConfig();
+ jedisConfig.setMaxTotal(Integer.parseInt(maxTotal));
+ jedisConfig.setMaxIdle(Integer.parseInt(maxIdle));
+ jedisConfig.setMinIdle(Integer.parseInt(minIdle));
+ jedisConfig.setMaxWaitMillis(Integer.parseInt(maxWaitMillis));
+ jedisConfig.setTestOnBorrow(testOnBorrowBool);
+ jedisConfig.setTestOnReturn(testOnReturnBool);
+ jedisConfig.setTestWhileIdle(testWhileIdleBool);
+ jedisConfig.setTimeBetweenEvictionRunsMillis(
+ Integer.parseInt(timeBetweenEvictionRunsMillis));
+ jedisConfig
+ .setNumTestsPerEvictionRun(Integer.parseInt(numTestsPerEvictionRun));
+ jedisConfig.setMinEvictableIdleTimeMillis(
+ Integer.parseInt(minEvictableIdleTimeMillis));
+ return jedisConfig;
+ }
+
+ @Override
+ public void configure(Context ctx) {
+ String redisType = ctx.getString(SERVER_TYPE, DEFAULT_SEVER_TYPE);
+ if (!redisType.equals(DEFAULT_SEVER_TYPE)
+ && !redisType.equals(CLUSTER_SERVER_TYPE)
+ && !redisType.equals(SENTINEL_SERVER_TYPE)) {
+ throw new ConfigurationException("redis type must be specified with \""
+ + DEFAULT_SEVER_TYPE + "\", \"" + CLUSTER_SERVER_TYPE + "\" or \""
+ + SENTINEL_SERVER_TYPE + "\". but I get a \"" + redisType + "\".");
+ }
+ String host;
+ if (redisType.equals(DEFAULT_SEVER_TYPE)) {
+ host = ctx.getString(REDIS_SERVER);
+ if (null == host || host.isEmpty()) {
+ throw new ConfigurationException("redis server must be specified.");
+ }
+ String[] hostAndPortPair = host.split(":");
+ if (2 != hostAndPortPair.length
+ || !StringUtils.isNumeric(hostAndPortPair[1])) {
+ throw new ConfigurationException("wrong redis server format.");
+ }
+ } else if (redisType.equals(CLUSTER_SERVER_TYPE)) {
+ host = ctx.getString(REDIS_CLUSTER_SERVER);
+ if (null == host || host.isEmpty()) {
+ throw new ConfigurationException(
+ "redis cluster server must be specified.");
+ }
+ String[] hosts = host.split(",");
+ if (0 == hosts.length) {
+ throw new ConfigurationException("wrong redis cluster server format.");
+ }
+ for (String hostAndPort : hosts) {
+ String[] hostAndPortPair = hostAndPort.split(":");
+ if (2 != hostAndPortPair.length
+ || !StringUtils.isNumeric(hostAndPortPair[1])) {
+ throw new ConfigurationException(
+ "wrong redis server format with host: " + hostAndPort + ".");
+ }
+ }
+ } else if (redisType.equals(SENTINEL_SERVER_TYPE)) {
+ host = ctx.getString(REDIS_SENTINEL_SERVER);
+ if (null == host || host.isEmpty()) {
+ throw new ConfigurationException(
+ "redis sentinel server must be specified.");
+ }
+ String masterName = ctx.getString(SENTINEL_MASTER_NAME);
+ if (null == masterName || masterName.isEmpty()) {
+ throw new ConfigurationException(
+ "Sentinel master name must be specified.");
+ }
+ redisConf.put(SENTINEL_MASTER_NAME, masterName);
+ } else {
+ throw new ConfigurationException(
+ "Unknown error about redis type \"" + redisType + "\".");
+ }
+ String batchNumber = ctx.getString(BATCH_NUMBER, DEFAULT_BATCH_NUMBER);
+ String password = ctx.getString(REDIS_PASSWORD);
+ if (null == password || password.isEmpty()) {
+ password = "";
+ LOGGER.info("redis password has not be specified.");
+ }
+ String key = ctx.getString(QUEUE_KEY);
+ if (key == null || key.isEmpty()) {
+ throw new ConfigurationException("queue key must be specified");
+ }
+ String redisTimeOut = redisConf.getProperty(REDIS_CONNTIMOUT,
+ DEFAULT_REDIS_CONNTIMEOUT);
+ String clusterMaxAttemp = redisConf.getProperty(CLUSTER_MAX_ATTEMP,
+ DEFAULT_CLUSTER_MAX_ATTEMP);
+ if (!StringUtils.isNumeric(redisTimeOut)
+ || !StringUtils.isNumeric(clusterMaxAttemp)) {
+ StringBuilder sb = new StringBuilder(
+ "Error configuration of redis, the value of these key must be a postive number, but you" +
+ " may specified other types.\n");
+ sb.append(REDIS_CONNTIMOUT + ": " + redisTimeOut + "\n");
+ sb.append(CLUSTER_MAX_ATTEMP + ": " + clusterMaxAttemp + "\n");
+ throw new ConfigurationException(sb.toString());
+ }
+ redisConf.putAll(ctx.getSubProperties(RREDIS_PREFIX));
+ redisConf.put(SERVER_TYPE, redisType);
+ redisConf.put(REDIS_SERVER, host);
+ redisConf.put(REDIS_PASSWORD, password);
+ redisConf.put(QUEUE_KEY, key);
+ redisConf.put(REDIS_CONNTIMOUT, redisTimeOut);
+ redisConf.put(CLUSTER_MAX_ATTEMP, clusterMaxAttemp);
+ redisConf.put(BATCH_NUMBER, batchNumber);
+
+ this.jedisPoolConfig = createJedisConfig(redisConf);
+ this.queue_key = key;
+ parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT,
+ DEFAULT_PARSE_AS_FLUME_EVENT);
+
+ if (counter == null) {
+ counter = new RedisChannelCounter(getName());
+ }
+
+ }
+
+ private enum TransactionType {
+ PUT, TAKE, NONE
+ }
+
+ private class RedisTransaction extends BasicTransactionSemantics {
+
+ private TransactionType type = TransactionType.NONE;
+ // For Puts
+ private ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+
+ // For put transactions, serialize the events and batch them and send
+ // it.
+ private LinkedList serializedEvents = new LinkedList();
+ // For take transactions, deserialize and hold them till commit goes
+ // through
+ private LinkedList events = new LinkedList();
+ private SpecificDatumWriter writer = new SpecificDatumWriter(
+ AvroFlumeEvent.class);
+ private SpecificDatumReader reader = new SpecificDatumReader(
+ AvroFlumeEvent.class);
+
+ // Fine to use null for initial value, Avro will create new ones if this
+ // is null
+ private BinaryEncoder encoder = null;
+ private BinaryDecoder decoder = null;
+ private final String batchUUID = UUID.randomUUID().toString();
+ private boolean eventTaken = false;
+
+ @Override
+ protected void doPut(Event event) throws InterruptedException {
+ type = TransactionType.PUT;
+ try {
+ tempOutStream.reset();
+ AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()),
+ ByteBuffer.wrap(event.getBody()));
+ encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream,
+ encoder);
+ writer.write(e, encoder);
+ // Not really possible to avoid this copy :(
+ serializedEvents.add(tempOutStream.toByteArray());
+ } catch (Exception e) {
+ throw new ChannelException("Error while serializing event", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Event doTake() throws InterruptedException {
+ type = TransactionType.TAKE;
+ try {
+ if (!transFailoverController.get().uuid.equals(channelUUID)) {
+ transFailoverController.remove();
+ LOGGER.info("UUID mismatch, creating new transFailoverController");
+ }
+ } catch (Exception ex) {
+ LOGGER.warn("Error while get multi message", ex);
+ }
+ Event e;
+ if (!transFailoverController.get().failedEvents.isEmpty()) {
+ e = transFailoverController.get().failedEvents.removeFirst();
+ } else {
+ try {
+ long startTime = System.nanoTime();
+ Long queueLength = redisOperator.llen(queue_key);
+ while (queueLength == null || queueLength == 0) {
+ Thread.sleep(1000);
+ queueLength = redisOperator.llen(queue_key);
+ }
+ long endTime = System.nanoTime();
+ counter
+ .addToRedisEventGetTimer((endTime - startTime) / (1000 * 1000));
+ String message = redisOperator.rpop(queue_key);
+ if (parseAsFlumeEvent) {
+ ByteArrayInputStream in = new ByteArrayInputStream(
+ message.getBytes());
+ decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
+ AvroFlumeEvent event = reader.read(null, decoder);
+ e = EventBuilder.withBody(event.getBody().array(),
+ toStringMap(event.getHeaders()));
+ } else {
+ e = EventBuilder.withBody(message.getBytes(),
+ Collections.EMPTY_MAP);
+ }
+
+ } catch (Exception ex) {
+ LOGGER.warn("Error while getting events from redis", ex);
+ throw new ChannelException("Error while getting events from redis",
+ ex);
+ }
+ }
+ eventTaken = true;
+ events.add(e);
+ return e;
+ }
+
+ @Override
+ protected void doCommit() throws InterruptedException {
+ if (type.equals(TransactionType.NONE)) {
+ return;
+ }
+ if (type.equals(TransactionType.PUT)) {
+ try {
+ List messages = new ArrayList();
+ for (byte[] event : serializedEvents) {
+ messages.add(new String(event));
+ }
+ long startTime = System.nanoTime();
+ String[] message_to_redis = new String[messages.size()];
+ redisOperator.lpush(queue_key, messages.toArray(message_to_redis));
+ long endTime = System.nanoTime();
+ counter
+ .addToRedisEventSendTimer((endTime - startTime) / (1000 * 1000));
+ counter.addToEventPutSuccessCount(Long.valueOf(messages.size()));
+ serializedEvents.clear();
+ } catch (Exception ex) {
+ LOGGER.warn("Sending events to Kafka failed", ex);
+ throw new ChannelException("Commit failed as send to Kafka failed",
+ ex);
+ }
+ } else {
+ counter.addToEventTakeSuccessCount(Long.valueOf(events.size()));
+ events.clear();
+ }
+ }
+
+ @Override
+ protected void doRollback() throws InterruptedException {
+ if (type.equals(TransactionType.NONE)) {
+ return;
+ }
+ if (type.equals(TransactionType.PUT)) {
+ serializedEvents.clear();
+ } else {
+ counter.addToRollbackCounter(Long.valueOf(events.size()));
+ transFailoverController.get().failedEvents.addAll(events);
+ events.clear();
+ }
+ }
+ }
+
+ /**
+ * Helper function to convert a map of String to a map of CharSequence.
+ */
+ private static Map toCharSeqMap(
+ Map stringMap) {
+ Map charSeqMap = new HashMap();
+ for (Map.Entry entry : stringMap.entrySet()) {
+ charSeqMap.put(entry.getKey(), entry.getValue());
+ }
+ return charSeqMap;
+ }
+
+ /**
+ * Helper function to convert a map of CharSequence to a map of String.
+ */
+ private static Map toStringMap(
+ Map charSeqMap) {
+ Map stringMap = new HashMap();
+ for (Map.Entry entry : charSeqMap.entrySet()) {
+ stringMap.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+ return stringMap;
+ }
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelConfiguration.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelConfiguration.java
new file mode 100755
index 0000000000..3bc189caa4
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelConfiguration.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis;
+
+public class RedisChannelConfiguration {
+ public static final String SERVER_TYPE = "server.type";
+ public static final String REDIS_CLUSTER_SERVER = "cluster.servers";
+ public static final String REDIS_SERVER = "single.server";
+ public static final String REDIS_SENTINEL_SERVER = "sentinel.servers";
+ public static final String SENTINEL_MASTER_NAME = "sentinel.master.name";
+ public static final String REDIS_PASSWORD = "password";
+ public static final String QUEUE_KEY = "key";
+ public static final String BATCH_NUMBER = "batch_number";
+ public static final String DEFAULT_BATCH_NUMBER = "10";
+ public static final String DEFAULT_REIDS_PORT = "6379";
+ public static final String DEFAULT_SEVER_TYPE = "single";
+ public static final String CLUSTER_SERVER_TYPE = "cluster";
+ public static final String SENTINEL_SERVER_TYPE = "sentinel";
+
+ public static final String RREDIS_PREFIX = "redis.";
+ public static final String REDIS_CONNTIMOUT = "timeout";
+ public static final String MAX_TO_TOTAL = "max.total";
+ public static final String MAX_IDLE = "max.idle";
+ public static final String MIN_IDLE = "min.idle";
+ public static final String MAX_WAIT_MILLIS = "max.wait.millis";
+ public static final String TEST_ON_BORROW = "test.on.borrow";
+ public static final String TEST_ON_RETURN = "test.on.return";
+ public static final String TEST_WHILE_IDLE = "test.while.idle";
+ public static final String TIME_BETWEEN_EVICTION_RUNMILLIS = "time.between.eviction.runs.millis";
+ public static final String NUM_TESTS_PER_EVICTION_RUN = "num.tests.per.eviction.run";
+ public static final String MIN_EVICTABLE_IDLE_TIME_MILLS = "min.evictable.idle.time.millis";
+ public static final String CLUSTER_MAX_ATTEMP = "cluster.max.attemp";
+ public static final String DEFAULT_REDIS_CONNTIMEOUT = "5000";
+ public static final String DEFAULT_MAX_TO_TOTAL = "500";
+ public static final String DEFAULT_MAX_IDLE = "300";
+ public static final String DEFAULT_MIN_IDLE = "10";
+ public static final String DEFAULT_MAX_WAIT_MILLIS = "60000";
+ public static final String DEFAULT_TEST_ON_BORROW = "true";
+ public static final String DEFAULT_TEST_ON_RETURN = "true";
+ public static final String DEFAULT_TEST_WHILE_IDLE = "true";
+ public static final String DEFAULT_TIME_BETWEEN_EVICTION_RUNMILLIS = "30000";
+ public static final String DEFAULT_NUM_TESTS_PER_EVICTION_RUN = "10";
+ public static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLS = "60000";
+ public static final String DEFAULT_CLUSTER_MAX_ATTEMP = "1";
+
+ public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
+ public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true;
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounter.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounter.java
new file mode 100755
index 0000000000..00c40c60ce
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flume.instrumentation.ChannelCounter;
+
+public class RedisChannelCounter extends ChannelCounter
+ implements RedisChannelCounterMBean {
+
+ private static final String TIMER_REDIS_EVENT_GET =
+ "channel.redis.event.get.time";
+
+ private static final String TIMER_REDIS_EVENT_SEND =
+ "channel.redis.event.send.time";
+
+ private static final String TIMER_REDIS_COMMIT =
+ "channel.redis.commit.time";
+
+ private static final String COUNT_ROLLBACK =
+ "channel.rollback.count";
+
+ private static String[] ATTRIBUTES = {
+ TIMER_REDIS_COMMIT, TIMER_REDIS_EVENT_SEND, TIMER_REDIS_EVENT_GET,
+ COUNT_ROLLBACK
+ };
+
+ public RedisChannelCounter(String name) {
+ super(name, ATTRIBUTES);
+ }
+
+ public long addToRedisEventGetTimer(long delta) {
+ return addAndGet(TIMER_REDIS_EVENT_GET, delta);
+ }
+
+ public long addToRedisEventSendTimer(long delta) {
+ return addAndGet(TIMER_REDIS_EVENT_SEND, delta);
+ }
+
+ public long addToRedisCommitTimer(long delta) {
+ return addAndGet(TIMER_REDIS_COMMIT, delta);
+ }
+
+ public long addToRollbackCounter(long delta) {
+ return addAndGet(COUNT_ROLLBACK, delta);
+ }
+
+ public long getRedisEventGetTimer() {
+ return get(TIMER_REDIS_EVENT_GET);
+ }
+
+ public long getRedisEventSendTimer() {
+ return get(TIMER_REDIS_EVENT_SEND);
+ }
+
+ public long getRedisCommitTimer() {
+ return get(TIMER_REDIS_COMMIT);
+ }
+
+ public long getRollbackCount() {
+ return get(COUNT_ROLLBACK);
+ }
+
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounterMBean.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounterMBean.java
new file mode 100755
index 0000000000..99a22ef87e
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounterMBean.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis;
+
+public interface RedisChannelCounterMBean {
+ long getRedisEventGetTimer();
+
+ long getRedisEventSendTimer();
+
+ long getRedisCommitTimer();
+
+ long getRollbackCount();
+
+ long getChannelSize();
+
+ long getEventPutAttemptCount();
+
+ long getEventTakeAttemptCount();
+
+ long getEventPutSuccessCount();
+
+ long getEventTakeSuccessCount();
+
+ long getStartTime();
+
+ long getStopTime();
+
+ long getChannelCapacity();
+
+ String getType();
+
+ double getChannelFillPercentage();
+
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/CannotGetRedisInstanceException.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/CannotGetRedisInstanceException.java
new file mode 100755
index 0000000000..6093e770bb
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/CannotGetRedisInstanceException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis.exception;
+
+public class CannotGetRedisInstanceException extends Exception {
+
+ public CannotGetRedisInstanceException() {
+ super();
+ }
+
+ public CannotGetRedisInstanceException(String message) {
+ super(message);
+ }
+
+ public CannotGetRedisInstanceException(String message,Throwable cause) {
+ super(message,cause);
+ }
+
+ public CannotGetRedisInstanceException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/UnsupportRedisTypeException.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/UnsupportRedisTypeException.java
new file mode 100755
index 0000000000..45ad66e07b
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/UnsupportRedisTypeException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis.exception;
+
+public class UnsupportRedisTypeException extends Exception {
+
+ public UnsupportRedisTypeException() {
+ super();
+ }
+
+ public UnsupportRedisTypeException(String message) {
+ super(message);
+ }
+
+ public UnsupportRedisTypeException(String message,Throwable cause) {
+ super(message,cause);
+ }
+
+ public UnsupportRedisTypeException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/ClusterRedisOperator.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/ClusterRedisOperator.java
new file mode 100755
index 0000000000..df5f40e9d5
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/ClusterRedisOperator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis.tools;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.HostAndPort;
+
+public class ClusterRedisOperator implements RedisOperator {
+ private static final Logger log = LoggerFactory.getLogger(RedisInit.class);
+ private JedisCluster jedis;
+
+ public ClusterRedisOperator(String servers, String passwd, int timeout,
+ int maxAttemp, JedisPoolConfig jedisPoolConfig) {
+ Set nodeSet = new HashSet<>();
+ String[] serverStrings = servers.split(",");
+ for (String ipPort : serverStrings) {
+ String[] ipPortPair = ipPort.split(":");
+ nodeSet.add(new redis.clients.jedis.HostAndPort(ipPortPair[0].trim(),
+ Integer.valueOf(ipPortPair[1].trim())));
+ }
+ if (StringUtils.isBlank(passwd)) {
+ jedis = new JedisCluster(nodeSet, timeout, timeout, maxAttemp,
+ jedisPoolConfig);
+ } else {
+ jedis = new JedisCluster(nodeSet, timeout, timeout, maxAttemp, passwd,
+ jedisPoolConfig);
+ }
+ }
+
+ public long lpush(String key, String... strings) {
+ long count = 0;
+ count = jedis.lpush(key, strings);
+ return count;
+ }
+
+ public String rpoplpush(String srckey, String dstkey) {
+ String msg = "";
+ msg = jedis.rpoplpush(srckey, dstkey);
+ return msg;
+ }
+
+ public String rpop(String key) {
+ String msg = "";
+ msg = jedis.rpop(key);
+ return msg;
+ }
+
+ public Long llen(String key) {
+ long result;
+ result = jedis.llen(key);
+ return result;
+ }
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisInit.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisInit.java
new file mode 100755
index 0000000000..95bb071116
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisInit.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis.tools;
+
+import java.util.HashMap;
+import java.util.Properties;
+
+import javax.naming.ConfigurationException;
+
+import redis.clients.jedis.JedisPoolConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.channels.redis.exception.CannotGetRedisInstanceException;
+import org.apache.flume.channels.redis.exception.UnsupportRedisTypeException;
+
+import static org.apache.flume.channels.redis.RedisChannelConfiguration.*;
+
+public class RedisInit {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RedisInit.class);
+ private static HashMap instances = new HashMap();
+
+ public static synchronized RedisOperator getInstance(Properties redisConf,
+ JedisPoolConfig jedisPoolConfig)
+ throws UnsupportRedisTypeException, CannotGetRedisInstanceException {
+ String type = redisConf.getProperty(SERVER_TYPE);
+ String hosts = redisConf.getProperty(REDIS_SERVER);
+ String passwd = redisConf.getProperty(REDIS_PASSWORD);
+ int timeout = Integer.parseInt(redisConf.getProperty(REDIS_CONNTIMOUT));
+ String instanceKey = type + "|" + hosts;
+ RedisOperator ro = null;
+ if (instances.containsKey(instanceKey)) {
+ ro = instances.get(instanceKey);
+ } else {
+ if (null == jedisPoolConfig) {
+ jedisPoolConfig = new JedisPoolConfig();
+ }
+ if (type.equals(DEFAULT_SEVER_TYPE)) {
+ String[] hostAndPort = hosts.split(":");
+ ro = new SingleServerRedisOperator(hostAndPort[0].trim(),
+ Integer.valueOf(hostAndPort[1].trim()), passwd, timeout,
+ jedisPoolConfig);
+ } else if (type.equals(CLUSTER_SERVER_TYPE)) {
+ int maxAttemp = Integer
+ .parseInt(redisConf.getProperty(CLUSTER_MAX_ATTEMP));
+ ro = new ClusterRedisOperator(hosts, passwd, timeout, maxAttemp,
+ jedisPoolConfig);
+ } else if (type.equals(SENTINEL_SERVER_TYPE)) {
+ String masterName = redisConf.getProperty(SENTINEL_MASTER_NAME);
+ ro = new SentinelServerRedisOperator(masterName, hosts, passwd, timeout,
+ jedisPoolConfig);
+ } else {
+ throw new UnsupportRedisTypeException("Unsupport redis type " + type);
+ }
+ instances.put(instanceKey, ro);
+ }
+ if (null == ro) {
+ throw new CannotGetRedisInstanceException(
+ "Can not get redis instance with type: " + type + " and servers: "
+ + hosts);
+ }
+ return ro;
+ }
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisOperator.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisOperator.java
new file mode 100755
index 0000000000..aaefdacd0b
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisOperator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis.tools;
+
+import java.util.List;
+
+public interface RedisOperator {
+
+ public long lpush(String key, String... strings);
+
+ public String rpoplpush(String srckey, String dstkey);
+
+ public String rpop(String key);
+
+ public Long llen(String key);
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelRedisController.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelRedisController.java
new file mode 100755
index 0000000000..70e31462ef
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelRedisController.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis.tools;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.JedisSentinelPool;
+
+public class SentinelRedisController {
+ private JedisSentinelPool jsp;
+
+ public SentinelRedisController(String masterName, String servers,
+ String passwd, int timeout, JedisPoolConfig jedisPoolConfig) {
+ Set sentinels = new HashSet();
+ String[] arr = servers.split(",");
+ if (arr != null && arr.length > 0) {
+ for (String item : arr) {
+ if (item != null && !item.equals("")) {
+ sentinels.add(item);
+ }
+ }
+ }
+ if (StringUtils.isBlank(passwd)) {
+ jsp = new JedisSentinelPool(masterName, sentinels, jedisPoolConfig,
+ timeout);
+
+ } else {
+ jsp = new JedisSentinelPool(masterName, sentinels, jedisPoolConfig,
+ timeout, passwd);
+ }
+ }
+
+ public void destory() {
+ jsp.destroy();
+ }
+
+ public Jedis getController() {
+ return jsp.getResource();
+ }
+
+ public void returnController(Jedis _jedis) {
+ _jedis.close();
+ }
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelServerRedisOperator.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelServerRedisOperator.java
new file mode 100755
index 0000000000..5996b75b74
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelServerRedisOperator.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis.tools;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.Response;
+import redis.clients.jedis.Transaction;
+
+public class SentinelServerRedisOperator implements RedisOperator {
+ private static final Logger log = LoggerFactory.getLogger(RedisInit.class);
+ private SentinelRedisController rc;
+
+ public SentinelServerRedisOperator(String mastName, String servers, String passwd,
+ int timeout, JedisPoolConfig jedisPoolConfig) {
+ rc = new SentinelRedisController(mastName, servers, passwd, timeout, jedisPoolConfig);
+ }
+
+ public long lpush(String key, String... strings) {
+ Jedis jedis = rc.getController();
+ long count = 0;
+ try {
+ count = jedis.lpush(key, strings);
+ } finally {
+ jedis.close();
+ }
+ return count;
+ }
+
+ public String rpoplpush(String srckey, String dstkey) {
+ Jedis jedis = rc.getController();
+ String msg = "";
+ try {
+ msg = jedis.rpoplpush(srckey, dstkey);
+ } finally {
+ jedis.close();
+ }
+ return msg;
+ }
+
+ public String rpop(String key) {
+ Jedis jedis = rc.getController();
+ String msg = "";
+ try {
+ msg = jedis.rpop(key);
+ } finally {
+ jedis.close();
+ }
+ return msg;
+ }
+
+ public Long llen(String key) {
+ Jedis jedis = rc.getController();
+ long result;
+ try {
+ result = jedis.llen(key);
+ } finally {
+ jedis.close();
+ }
+ return result;
+ }
+
+ public List mpop(String key, Long batch) {
+ Jedis jedis = rc.getController();
+ Response> result;
+ try {
+ Long llen = jedis.llen(key);
+ Long start = llen - batch;
+ if (start < 0L) {
+ start = 0L;
+ }
+ Transaction transaction = jedis.multi();
+ result = transaction.lrange(key, start, -1);
+ transaction.ltrim(key, 0, start - 1);
+ transaction.exec();
+ return result.get();
+ } finally {
+ jedis.close();
+ }
+ }
+
+ public Jedis getRedis() {
+ return rc.getController();
+ }
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleRedisController.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleRedisController.java
new file mode 100755
index 0000000000..0e0bcbe717
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleRedisController.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis.tools;
+
+import org.apache.commons.lang.StringUtils;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.JedisSentinelPool;
+
+public class SingleRedisController {
+ private JedisPool jsp;
+
+ public SingleRedisController(String host, int port, String passwd,
+ int timeout, JedisPoolConfig jedisPoolConfig) {
+ if (null == jedisPoolConfig) {
+ jedisPoolConfig = new JedisPoolConfig();
+ }
+ if (!StringUtils.isBlank(passwd)) {
+ jsp = new JedisPool(jedisPoolConfig, host, port, timeout, passwd);
+
+ } else {
+ jsp = new JedisPool(jedisPoolConfig, host, port, timeout);
+ }
+ }
+
+ public void destory() {
+ jsp.destroy();
+ }
+
+ public Jedis getController() {
+ return jsp.getResource();
+ }
+
+ public void returnController(Jedis _jedis) {
+ // jsp.returnBrokenResource(_jedis);
+ _jedis.close();
+ }
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleServerRedisOperator.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleServerRedisOperator.java
new file mode 100755
index 0000000000..9aa9040703
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleServerRedisOperator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channels.redis.tools;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.Response;
+import redis.clients.jedis.Transaction;
+
+public class SingleServerRedisOperator implements RedisOperator {
+ private static final Logger log = LoggerFactory.getLogger(RedisInit.class);
+ private SingleRedisController rc;
+
+ public SingleServerRedisOperator(String host, int port, String passwd,
+ int timeout, JedisPoolConfig jedisPoolConfig) {
+ rc = new SingleRedisController(host, port, passwd, timeout, jedisPoolConfig);
+ }
+
+ public long lpush(String key, String... strings) {
+ Jedis jedis = rc.getController();
+ long count = 0;
+ try {
+ count = jedis.lpush(key, strings);
+ } finally {
+ jedis.close();
+ }
+ return count;
+ }
+
+ public String rpoplpush(String srckey, String dstkey) {
+ Jedis jedis = rc.getController();
+ String msg = "";
+ try {
+ msg = jedis.rpoplpush(srckey, dstkey);
+ } finally {
+ jedis.close();
+ }
+ return msg;
+ }
+
+ public String rpop(String key) {
+ Jedis jedis = rc.getController();
+ String msg = "";
+ try {
+ msg = jedis.rpop(key);
+ } finally {
+ jedis.close();
+ }
+ return msg;
+ }
+
+ public Long llen(String key) {
+ Jedis jedis = rc.getController();
+ long result;
+ try {
+ result = jedis.llen(key);
+ } finally {
+ jedis.close();
+ }
+ return result;
+ }
+
+ public List mpop(String key, Long batch) {
+ Jedis jedis = rc.getController();
+ Response> result;
+ try {
+ Long llen = jedis.llen(key);
+ Long start = llen - batch;
+ if (start < 0L) {
+ start = 0L;
+ }
+ Transaction transaction = jedis.multi();
+ result = transaction.lrange(key, start, -1);
+ transaction.ltrim(key, 0, start - 1);
+ transaction.exec();
+ return result.get();
+ } finally {
+ jedis.close();
+ }
+ }
+
+ public Jedis getRedis() {
+ return rc.getController();
+ }
+
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSentinelRedis.java b/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSentinelRedis.java
new file mode 100755
index 0000000000..7415226d2b
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSentinelRedis.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.redis;
+
+import org.apache.flume.channels.redis.tools.SentinelServerRedisOperator;
+import org.apache.flume.channels.redis.tools.SingleServerRedisOperator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import redis.clients.jedis.JedisPoolConfig;
+import redis.embedded.RedisCluster;
+import redis.embedded.RedisExecProvider;
+import redis.embedded.RedisServer;
+import redis.embedded.util.JedisUtil;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class TestSentinelRedis {
+ private RedisCluster cluster;
+ private Set jedisSentinelHosts;
+ private SentinelServerRedisOperator sentinelServerRedisOpertor;
+
+ @Before
+ public void setUp() throws IOException {
+ cluster = RedisCluster.builder().ephemeral().sentinelCount(3).quorumSize(2).replicationGroup(
+ "master1",1)
+ .replicationGroup("master2", 1)
+ .replicationGroup("master3", 1).build();
+ cluster.start();
+ jedisSentinelHosts = JedisUtil.sentinelHosts(cluster);
+ StringBuilder sb = new StringBuilder();
+ for (String host : jedisSentinelHosts) {
+ sb.append(host + ",");
+ }
+ sentinelServerRedisOpertor = new SentinelServerRedisOperator("master1",
+ sb.toString(), "", 5000, new JedisPoolConfig());
+ }
+
+ @Test
+ public void testSimpleListOperator() {
+ Assert.assertEquals(1, sentinelServerRedisOpertor.lpush("test", "a"));
+ Assert.assertEquals(1, Integer.parseInt(sentinelServerRedisOpertor.llen("test")
+ .toString()));
+ Assert.assertEquals("a", sentinelServerRedisOpertor.rpop("test"));
+ Assert.assertEquals(0, Integer.parseInt(sentinelServerRedisOpertor.llen("test")
+ .toString()));
+ }
+
+ @Test
+ public void testComplexListOperator() {
+ String[] multiPush = {"a", "b", "c", "d", "e"};
+ Assert.assertEquals(multiPush.length, sentinelServerRedisOpertor.lpush("test", multiPush));
+ Assert.assertEquals(multiPush.length, Integer.parseInt(sentinelServerRedisOpertor.llen(
+ "test").toString()));
+ Assert.assertEquals(multiPush[0], sentinelServerRedisOpertor.rpop("test"));
+ Assert.assertEquals(multiPush.length - 1,
+ Integer.parseInt(sentinelServerRedisOpertor.llen("test").toString()));
+
+ }
+
+ @After
+ public void tearDown() {
+ cluster.stop();
+ }
+}
diff --git a/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSingleRedis.java b/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSingleRedis.java
new file mode 100755
index 0000000000..8895d947a3
--- /dev/null
+++ b/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSingleRedis.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.redis;
+
+import org.apache.flume.channels.redis.tools.SingleServerRedisOperator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import redis.embedded.RedisExecProvider;
+import redis.embedded.RedisServer;
+
+import java.io.IOException;
+
+public class TestSingleRedis {
+ private RedisServer redisServer;
+ private SingleServerRedisOperator singleServerRedisOperator;
+
+ @Before
+ public void setUp() throws IOException {
+ RedisExecProvider customProvider = RedisExecProvider.defaultProvider();
+ redisServer = RedisServer.builder().redisExecProvider(customProvider)
+ .setting("maxheap 128M").build();
+ redisServer.start();
+ singleServerRedisOperator = new SingleServerRedisOperator("localhost", 6379, "", 1000, null);
+ }
+
+ @Test
+ public void testSimpleListOperator() {
+ Assert.assertEquals(1,singleServerRedisOperator.lpush("test", "a"));
+ Assert.assertEquals(1, Integer.parseInt(singleServerRedisOperator.llen("test").toString()));
+ Assert.assertEquals("a", singleServerRedisOperator.rpop("test"));
+ Assert.assertEquals(0, Integer.parseInt(singleServerRedisOperator.llen("test").toString()));
+ }
+
+ @Test
+ public void testComplexListOperator() {
+ String[] multiPush = {"a","b","c","d","e"};
+ Assert.assertEquals(multiPush.length,singleServerRedisOperator.lpush("test", multiPush));
+ Assert.assertEquals(multiPush.length,
+ Integer.parseInt(singleServerRedisOperator.llen("test").toString()));
+ Assert.assertEquals(multiPush[0], singleServerRedisOperator.rpop("test"));
+ Assert.assertEquals(multiPush.length - 1,
+ Integer.parseInt(singleServerRedisOperator.llen("test").toString()));
+
+ }
+
+ @After
+ public void tearDown() {
+ redisServer.stop();
+ }
+
+}
diff --git a/flume-ng-channels/pom.xml b/flume-ng-channels/pom.xml
index d86b9a432f..323e32074c 100644
--- a/flume-ng-channels/pom.xml
+++ b/flume-ng-channels/pom.xml
@@ -45,5 +45,6 @@ limitations under the License.
flume-file-channel
flume-spillable-memory-channel
flume-kafka-channel
+ flume-redis-channel
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
index 2e3cb34397..3dfb374047 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
@@ -51,7 +51,13 @@ public enum ChannelType {
*
* @see SpillableMemoryChannel
*/
- SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel");
+ SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel"),
+
+ /**
+ * Redis Channel
+ * @see RedisChannel
+ */
+ REDIS("org.flume.channels.redis.RedisChannel");
private final String channelClassName;
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
old mode 100644
new mode 100755
index 6d7085c545..873bf2ff6a
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -3391,6 +3391,61 @@ Also please make sure that the operating system user of the Flume processes has
};
+Redis Channel
+~~~~~~~~~~~~~
+
+The events are stored in a Redis server (must be installed separately). If you have many agents but want to share one channel which can provide high concurrency, Redis Channel is a good choice. Redis also can provide high availability and replication, you can choose the best way to adapt your application.
+
+This version of Flume requires Redis version 2.6 or greater. If you want to use Redis cluster, we recommend Redis version 3.1 or greater.
+
+Required properties are in **bold**.
+
+================================================ ================================ ========================================================
+Property Name Default Description
+================================================ ================================ ========================================================
+**type** -- The component type name, needs to be ``redis``
+**server.type** single Specify the type of Redis, now we support single, sentinel and cluster. Default value is single server mode.
+single.server -- A Single Redis server used by the channel. It will work with "single" server type.
+sentinel.servers -- Sentinel Redis servers used by the channel. It will work with "sentinel" server type. Use commas to separate different node.
+sentinel.master.name -- Sentinel Redis master name. It will work with "sentinel" server type.
+cluster.servers -- Cluster Redis servers used by the channel. It will work with "cluster" server type. Use commas to separate different node.
+cluster.max.attemp 1 Amount times to retry to access Redis cluster.
+**key** -- Since we use list to store events, you must specify a name of list. The list can be not exist in Redis server.
+password -- The password of Redis.
+redis.timout 5000 Amount of time (in milliseconds) to wait to connect Redis.
+redis.max.total 500
+Max total number connections of Redis pool.
+redis.max.idle 300 Max idle number connections of Redis pool.
+redis.min.idle 10 Min idle number connections of Redis pools.
+redis.max.wait.millis 60000 Amount of time(in milliseconds) to get a connection from Redis pool.
+redis.test.on.borrow true If test when get a connection from Redis pool. Recommond let it be true in case the connection lost.
+redis.test.on.return true If test when give a connection back to Redis pool.
+redis.test.while.idle true If test while connection in Redis pool. Recommond let it be true in case the connection lost.
+================================================ ================================ ========================================================
+
+
+Example for agent named a1 and channel named channel1:
+
+.. code-block:: properties
+
+ a1.channels.channel1.type = org.flume.channels.redis.RedisChannel
+ a1.channels.channel1.server = 10.0.0.1:6379
+ a1.channels.channel1.key = redis-channel
+ a1.channels.channel1.password = mytest
+
+ a1.channels.channel2.type = org.flume.channels.redis.RedisChannel
+ a1.channels.channel2.server.type = sentinel
+ a1.channels.channel2.sentinel.servers = 10.0.0.1:26379,10.0.0.2:26379
+ a1.channels.channel2.sentinel.master.name = mymaster
+ a1.channels.channel2.key = redis-channel
+
+ a1.channels.channel3.type = org.flume.channels.redis.RedisChannel
+ a1.channels.channel3.server.type = cluster
+ a1.channels.channel3.cluster.servers = 10.0.0.1:7000,10.0.0.1:7001,10.0.0.1:7002
+ a1.channels.channel3.key = redis-channel
+
+
+
File Channel
~~~~~~~~~~~~