diff --git a/flume-ng-channels/flume-redis-channel/pom.xml b/flume-ng-channels/flume-redis-channel/pom.xml new file mode 100644 index 0000000000..dcfbbefba4 --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/pom.xml @@ -0,0 +1,56 @@ + + + + + flume-ng-channels + org.apache.flume + 1.9.0-SNAPSHOT + + 4.0.0 + + org.apache.flume.flume-ng-channels + flume-redis-channel + + + + org.apache.flume + flume-ng-core + + + org.apache.flume + flume-ng-sdk + + + redis.clients + jedis + 2.9.0 + + + com.github.kstyrc + embedded-redis + 0.6 + + + junit + junit + test + + + diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannel.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannel.java new file mode 100755 index 0000000000..b2cd6e3cf5 --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannel.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis; + +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.flume.channel.BasicChannelSemantics; +import org.apache.flume.channel.BasicTransactionSemantics; +import org.apache.flume.channels.redis.tools.RedisInit; +import org.apache.flume.channels.redis.tools.RedisOperator; +import org.apache.flume.channels.redis.exception.CannotGetRedisInstanceException; +import org.apache.flume.channels.redis.exception.UnsupportRedisTypeException; +import org.apache.flume.conf.ConfigurationException; + +import static org.apache.flume.channels.redis.RedisChannelConfiguration.*; + +import org.apache.flume.event.EventBuilder; +import org.apache.flume.source.avro.AvroFlumeEvent; +import org.apache.commons.lang.StringUtils; +import org.jboss.netty.util.internal.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import redis.clients.jedis.JedisPoolConfig; + +import com.google.common.net.HostAndPort; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +public class RedisChannel extends BasicChannelSemantics { + + private static final Logger LOGGER = LoggerFactory + .getLogger(RedisChannel.class); + + private final Properties redisConf = new Properties(); + private final String channelUUID = UUID.randomUUID().toString(); + + private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT; + private String queue_key; + private RedisOperator redisOperator; + private JedisPoolConfig jedisPoolConfig; + private RedisChannelCounter counter; + + private final ThreadLocal transFailoverController = new ThreadLocal() { + @Override + public TransEvents initialValue() { + return new TransEvents(channelUUID); + } + + }; + + private class TransEvents { + final String uuid; + final LinkedList failedEvents = new LinkedList(); + + TransEvents(String uuid) { + this.uuid = uuid; + } + } + + @Override + public void start() { + try { + LOGGER.info("Starting Redis Channel: " + getName()); + redisOperator = RedisInit.getInstance(redisConf, jedisPoolConfig); + counter.start(); + super.start(); + } catch (CannotGetRedisInstanceException e) { + LOGGER.error("Could not start Redis Chnnel"); + throw new FlumeException("Unable to create Redis Connections. " + + "Check whether Redis Server are up and that the " + + "Flume agent can connect to it.", e); + } catch (UnsupportRedisTypeException e) { + LOGGER.error("Could not start Redis Chnnel"); + throw new FlumeException("Unable to create Redis Connections. " + + "Check whether Redis Type is correct.", e); + } + } + + @Override + public void stop() { + counter.stop(); + super.stop(); + LOGGER.info("Redis channel {} stopped. Metrics: {}", getName(), counter); + } + + @Override + protected BasicTransactionSemantics createTransaction() { + return new RedisTransaction(); + } + + Properties getRedisConf() { + return redisConf; + } + + private JedisPoolConfig createJedisConfig(Properties redisConf) + throws ConfigurationException { + String maxTotal = redisConf.getProperty(MAX_TO_TOTAL, DEFAULT_MAX_TO_TOTAL); + String maxIdle = redisConf.getProperty(MAX_IDLE, DEFAULT_MAX_IDLE); + String minIdle = redisConf.getProperty(MIN_IDLE, DEFAULT_MIN_IDLE); + String maxWaitMillis = redisConf.getProperty(MAX_WAIT_MILLIS, + DEFAULT_MAX_WAIT_MILLIS); + String testOnBorrow = redisConf.getProperty(TEST_ON_BORROW, + DEFAULT_TEST_ON_BORROW); + String testOnReturn = redisConf.getProperty(TEST_ON_RETURN, + DEFAULT_TEST_ON_RETURN); + String testWhileIdle = redisConf.getProperty(TEST_WHILE_IDLE, + DEFAULT_TEST_WHILE_IDLE); + String timeBetweenEvictionRunsMillis = redisConf.getProperty( + TIME_BETWEEN_EVICTION_RUNMILLIS, + DEFAULT_TIME_BETWEEN_EVICTION_RUNMILLIS); + String numTestsPerEvictionRun = redisConf.getProperty( + NUM_TESTS_PER_EVICTION_RUN, DEFAULT_NUM_TESTS_PER_EVICTION_RUN); + String minEvictableIdleTimeMillis = redisConf.getProperty( + MIN_EVICTABLE_IDLE_TIME_MILLS, DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLS); + + if (!StringUtils.isNumeric(maxTotal) || !StringUtils.isNumeric(maxIdle) + || !StringUtils.isNumeric(minIdle) + || !StringUtils.isNumeric(maxWaitMillis) + || !StringUtils.isNumeric(timeBetweenEvictionRunsMillis) + || !StringUtils.isNumeric(numTestsPerEvictionRun) + || !StringUtils.isNumeric(minEvictableIdleTimeMillis)) { + StringBuilder sb = new StringBuilder( + "Error configuration of redis, the value of these key must be a postive number, but you" + + " may specified other types.\n"); + sb.append(MAX_TO_TOTAL + ": " + maxTotal + "\n"); + sb.append(MAX_IDLE + ": " + maxIdle + "\n"); + sb.append(MIN_IDLE + ": " + minIdle + "\n"); + sb.append(MAX_WAIT_MILLIS + ": " + maxWaitMillis + "\n"); + sb.append(TIME_BETWEEN_EVICTION_RUNMILLIS + ": " + + timeBetweenEvictionRunsMillis + "\n"); + sb.append( + NUM_TESTS_PER_EVICTION_RUN + ": " + numTestsPerEvictionRun + "\n"); + sb.append(MIN_EVICTABLE_IDLE_TIME_MILLS + ": " + + minEvictableIdleTimeMillis + "\n"); + throw new ConfigurationException(sb.toString()); + } + boolean testOnBorrowBool = true; + boolean testOnReturnBool = true; + boolean testWhileIdleBool = true; + try { + testOnBorrowBool = Boolean.parseBoolean(testOnBorrow.trim()); + testOnReturnBool = Boolean.parseBoolean(testOnReturn.trim()); + testWhileIdleBool = Boolean.parseBoolean(testWhileIdle.trim()); + } catch (Exception ex) { + StringBuilder sb = new StringBuilder( + "Error configuration of redis, the value of these key must be with \"true\" or " + + "\"false\", but you may specified other types.\n"); + sb.append(TEST_ON_BORROW + ": " + testOnBorrow + "\n"); + sb.append(testOnReturnBool + ": " + testOnReturnBool + "\n"); + sb.append(testWhileIdleBool + ": " + testWhileIdleBool + "\n"); + throw new ConfigurationException(sb.toString()); + } + + JedisPoolConfig jedisConfig = new JedisPoolConfig(); + jedisConfig.setMaxTotal(Integer.parseInt(maxTotal)); + jedisConfig.setMaxIdle(Integer.parseInt(maxIdle)); + jedisConfig.setMinIdle(Integer.parseInt(minIdle)); + jedisConfig.setMaxWaitMillis(Integer.parseInt(maxWaitMillis)); + jedisConfig.setTestOnBorrow(testOnBorrowBool); + jedisConfig.setTestOnReturn(testOnReturnBool); + jedisConfig.setTestWhileIdle(testWhileIdleBool); + jedisConfig.setTimeBetweenEvictionRunsMillis( + Integer.parseInt(timeBetweenEvictionRunsMillis)); + jedisConfig + .setNumTestsPerEvictionRun(Integer.parseInt(numTestsPerEvictionRun)); + jedisConfig.setMinEvictableIdleTimeMillis( + Integer.parseInt(minEvictableIdleTimeMillis)); + return jedisConfig; + } + + @Override + public void configure(Context ctx) { + String redisType = ctx.getString(SERVER_TYPE, DEFAULT_SEVER_TYPE); + if (!redisType.equals(DEFAULT_SEVER_TYPE) + && !redisType.equals(CLUSTER_SERVER_TYPE) + && !redisType.equals(SENTINEL_SERVER_TYPE)) { + throw new ConfigurationException("redis type must be specified with \"" + + DEFAULT_SEVER_TYPE + "\", \"" + CLUSTER_SERVER_TYPE + "\" or \"" + + SENTINEL_SERVER_TYPE + "\". but I get a \"" + redisType + "\"."); + } + String host; + if (redisType.equals(DEFAULT_SEVER_TYPE)) { + host = ctx.getString(REDIS_SERVER); + if (null == host || host.isEmpty()) { + throw new ConfigurationException("redis server must be specified."); + } + String[] hostAndPortPair = host.split(":"); + if (2 != hostAndPortPair.length + || !StringUtils.isNumeric(hostAndPortPair[1])) { + throw new ConfigurationException("wrong redis server format."); + } + } else if (redisType.equals(CLUSTER_SERVER_TYPE)) { + host = ctx.getString(REDIS_CLUSTER_SERVER); + if (null == host || host.isEmpty()) { + throw new ConfigurationException( + "redis cluster server must be specified."); + } + String[] hosts = host.split(","); + if (0 == hosts.length) { + throw new ConfigurationException("wrong redis cluster server format."); + } + for (String hostAndPort : hosts) { + String[] hostAndPortPair = hostAndPort.split(":"); + if (2 != hostAndPortPair.length + || !StringUtils.isNumeric(hostAndPortPair[1])) { + throw new ConfigurationException( + "wrong redis server format with host: " + hostAndPort + "."); + } + } + } else if (redisType.equals(SENTINEL_SERVER_TYPE)) { + host = ctx.getString(REDIS_SENTINEL_SERVER); + if (null == host || host.isEmpty()) { + throw new ConfigurationException( + "redis sentinel server must be specified."); + } + String masterName = ctx.getString(SENTINEL_MASTER_NAME); + if (null == masterName || masterName.isEmpty()) { + throw new ConfigurationException( + "Sentinel master name must be specified."); + } + redisConf.put(SENTINEL_MASTER_NAME, masterName); + } else { + throw new ConfigurationException( + "Unknown error about redis type \"" + redisType + "\"."); + } + String batchNumber = ctx.getString(BATCH_NUMBER, DEFAULT_BATCH_NUMBER); + String password = ctx.getString(REDIS_PASSWORD); + if (null == password || password.isEmpty()) { + password = ""; + LOGGER.info("redis password has not be specified."); + } + String key = ctx.getString(QUEUE_KEY); + if (key == null || key.isEmpty()) { + throw new ConfigurationException("queue key must be specified"); + } + String redisTimeOut = redisConf.getProperty(REDIS_CONNTIMOUT, + DEFAULT_REDIS_CONNTIMEOUT); + String clusterMaxAttemp = redisConf.getProperty(CLUSTER_MAX_ATTEMP, + DEFAULT_CLUSTER_MAX_ATTEMP); + if (!StringUtils.isNumeric(redisTimeOut) + || !StringUtils.isNumeric(clusterMaxAttemp)) { + StringBuilder sb = new StringBuilder( + "Error configuration of redis, the value of these key must be a postive number, but you" + + " may specified other types.\n"); + sb.append(REDIS_CONNTIMOUT + ": " + redisTimeOut + "\n"); + sb.append(CLUSTER_MAX_ATTEMP + ": " + clusterMaxAttemp + "\n"); + throw new ConfigurationException(sb.toString()); + } + redisConf.putAll(ctx.getSubProperties(RREDIS_PREFIX)); + redisConf.put(SERVER_TYPE, redisType); + redisConf.put(REDIS_SERVER, host); + redisConf.put(REDIS_PASSWORD, password); + redisConf.put(QUEUE_KEY, key); + redisConf.put(REDIS_CONNTIMOUT, redisTimeOut); + redisConf.put(CLUSTER_MAX_ATTEMP, clusterMaxAttemp); + redisConf.put(BATCH_NUMBER, batchNumber); + + this.jedisPoolConfig = createJedisConfig(redisConf); + this.queue_key = key; + parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, + DEFAULT_PARSE_AS_FLUME_EVENT); + + if (counter == null) { + counter = new RedisChannelCounter(getName()); + } + + } + + private enum TransactionType { + PUT, TAKE, NONE + } + + private class RedisTransaction extends BasicTransactionSemantics { + + private TransactionType type = TransactionType.NONE; + // For Puts + private ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream(); + + // For put transactions, serialize the events and batch them and send + // it. + private LinkedList serializedEvents = new LinkedList(); + // For take transactions, deserialize and hold them till commit goes + // through + private LinkedList events = new LinkedList(); + private SpecificDatumWriter writer = new SpecificDatumWriter( + AvroFlumeEvent.class); + private SpecificDatumReader reader = new SpecificDatumReader( + AvroFlumeEvent.class); + + // Fine to use null for initial value, Avro will create new ones if this + // is null + private BinaryEncoder encoder = null; + private BinaryDecoder decoder = null; + private final String batchUUID = UUID.randomUUID().toString(); + private boolean eventTaken = false; + + @Override + protected void doPut(Event event) throws InterruptedException { + type = TransactionType.PUT; + try { + tempOutStream.reset(); + AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()), + ByteBuffer.wrap(event.getBody())); + encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream, + encoder); + writer.write(e, encoder); + // Not really possible to avoid this copy :( + serializedEvents.add(tempOutStream.toByteArray()); + } catch (Exception e) { + throw new ChannelException("Error while serializing event", e); + } + } + + @SuppressWarnings("unchecked") + @Override + protected Event doTake() throws InterruptedException { + type = TransactionType.TAKE; + try { + if (!transFailoverController.get().uuid.equals(channelUUID)) { + transFailoverController.remove(); + LOGGER.info("UUID mismatch, creating new transFailoverController"); + } + } catch (Exception ex) { + LOGGER.warn("Error while get multi message", ex); + } + Event e; + if (!transFailoverController.get().failedEvents.isEmpty()) { + e = transFailoverController.get().failedEvents.removeFirst(); + } else { + try { + long startTime = System.nanoTime(); + Long queueLength = redisOperator.llen(queue_key); + while (queueLength == null || queueLength == 0) { + Thread.sleep(1000); + queueLength = redisOperator.llen(queue_key); + } + long endTime = System.nanoTime(); + counter + .addToRedisEventGetTimer((endTime - startTime) / (1000 * 1000)); + String message = redisOperator.rpop(queue_key); + if (parseAsFlumeEvent) { + ByteArrayInputStream in = new ByteArrayInputStream( + message.getBytes()); + decoder = DecoderFactory.get().directBinaryDecoder(in, decoder); + AvroFlumeEvent event = reader.read(null, decoder); + e = EventBuilder.withBody(event.getBody().array(), + toStringMap(event.getHeaders())); + } else { + e = EventBuilder.withBody(message.getBytes(), + Collections.EMPTY_MAP); + } + + } catch (Exception ex) { + LOGGER.warn("Error while getting events from redis", ex); + throw new ChannelException("Error while getting events from redis", + ex); + } + } + eventTaken = true; + events.add(e); + return e; + } + + @Override + protected void doCommit() throws InterruptedException { + if (type.equals(TransactionType.NONE)) { + return; + } + if (type.equals(TransactionType.PUT)) { + try { + List messages = new ArrayList(); + for (byte[] event : serializedEvents) { + messages.add(new String(event)); + } + long startTime = System.nanoTime(); + String[] message_to_redis = new String[messages.size()]; + redisOperator.lpush(queue_key, messages.toArray(message_to_redis)); + long endTime = System.nanoTime(); + counter + .addToRedisEventSendTimer((endTime - startTime) / (1000 * 1000)); + counter.addToEventPutSuccessCount(Long.valueOf(messages.size())); + serializedEvents.clear(); + } catch (Exception ex) { + LOGGER.warn("Sending events to Kafka failed", ex); + throw new ChannelException("Commit failed as send to Kafka failed", + ex); + } + } else { + counter.addToEventTakeSuccessCount(Long.valueOf(events.size())); + events.clear(); + } + } + + @Override + protected void doRollback() throws InterruptedException { + if (type.equals(TransactionType.NONE)) { + return; + } + if (type.equals(TransactionType.PUT)) { + serializedEvents.clear(); + } else { + counter.addToRollbackCounter(Long.valueOf(events.size())); + transFailoverController.get().failedEvents.addAll(events); + events.clear(); + } + } + } + + /** + * Helper function to convert a map of String to a map of CharSequence. + */ + private static Map toCharSeqMap( + Map stringMap) { + Map charSeqMap = new HashMap(); + for (Map.Entry entry : stringMap.entrySet()) { + charSeqMap.put(entry.getKey(), entry.getValue()); + } + return charSeqMap; + } + + /** + * Helper function to convert a map of CharSequence to a map of String. + */ + private static Map toStringMap( + Map charSeqMap) { + Map stringMap = new HashMap(); + for (Map.Entry entry : charSeqMap.entrySet()) { + stringMap.put(entry.getKey().toString(), entry.getValue().toString()); + } + return stringMap; + } +} diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelConfiguration.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelConfiguration.java new file mode 100755 index 0000000000..3bc189caa4 --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelConfiguration.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis; + +public class RedisChannelConfiguration { + public static final String SERVER_TYPE = "server.type"; + public static final String REDIS_CLUSTER_SERVER = "cluster.servers"; + public static final String REDIS_SERVER = "single.server"; + public static final String REDIS_SENTINEL_SERVER = "sentinel.servers"; + public static final String SENTINEL_MASTER_NAME = "sentinel.master.name"; + public static final String REDIS_PASSWORD = "password"; + public static final String QUEUE_KEY = "key"; + public static final String BATCH_NUMBER = "batch_number"; + public static final String DEFAULT_BATCH_NUMBER = "10"; + public static final String DEFAULT_REIDS_PORT = "6379"; + public static final String DEFAULT_SEVER_TYPE = "single"; + public static final String CLUSTER_SERVER_TYPE = "cluster"; + public static final String SENTINEL_SERVER_TYPE = "sentinel"; + + public static final String RREDIS_PREFIX = "redis."; + public static final String REDIS_CONNTIMOUT = "timeout"; + public static final String MAX_TO_TOTAL = "max.total"; + public static final String MAX_IDLE = "max.idle"; + public static final String MIN_IDLE = "min.idle"; + public static final String MAX_WAIT_MILLIS = "max.wait.millis"; + public static final String TEST_ON_BORROW = "test.on.borrow"; + public static final String TEST_ON_RETURN = "test.on.return"; + public static final String TEST_WHILE_IDLE = "test.while.idle"; + public static final String TIME_BETWEEN_EVICTION_RUNMILLIS = "time.between.eviction.runs.millis"; + public static final String NUM_TESTS_PER_EVICTION_RUN = "num.tests.per.eviction.run"; + public static final String MIN_EVICTABLE_IDLE_TIME_MILLS = "min.evictable.idle.time.millis"; + public static final String CLUSTER_MAX_ATTEMP = "cluster.max.attemp"; + public static final String DEFAULT_REDIS_CONNTIMEOUT = "5000"; + public static final String DEFAULT_MAX_TO_TOTAL = "500"; + public static final String DEFAULT_MAX_IDLE = "300"; + public static final String DEFAULT_MIN_IDLE = "10"; + public static final String DEFAULT_MAX_WAIT_MILLIS = "60000"; + public static final String DEFAULT_TEST_ON_BORROW = "true"; + public static final String DEFAULT_TEST_ON_RETURN = "true"; + public static final String DEFAULT_TEST_WHILE_IDLE = "true"; + public static final String DEFAULT_TIME_BETWEEN_EVICTION_RUNMILLIS = "30000"; + public static final String DEFAULT_NUM_TESTS_PER_EVICTION_RUN = "10"; + public static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLS = "60000"; + public static final String DEFAULT_CLUSTER_MAX_ATTEMP = "1"; + + public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent"; + public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true; +} diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounter.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounter.java new file mode 100755 index 0000000000..00c40c60ce --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.flume.instrumentation.ChannelCounter; + +public class RedisChannelCounter extends ChannelCounter + implements RedisChannelCounterMBean { + + private static final String TIMER_REDIS_EVENT_GET = + "channel.redis.event.get.time"; + + private static final String TIMER_REDIS_EVENT_SEND = + "channel.redis.event.send.time"; + + private static final String TIMER_REDIS_COMMIT = + "channel.redis.commit.time"; + + private static final String COUNT_ROLLBACK = + "channel.rollback.count"; + + private static String[] ATTRIBUTES = { + TIMER_REDIS_COMMIT, TIMER_REDIS_EVENT_SEND, TIMER_REDIS_EVENT_GET, + COUNT_ROLLBACK + }; + + public RedisChannelCounter(String name) { + super(name, ATTRIBUTES); + } + + public long addToRedisEventGetTimer(long delta) { + return addAndGet(TIMER_REDIS_EVENT_GET, delta); + } + + public long addToRedisEventSendTimer(long delta) { + return addAndGet(TIMER_REDIS_EVENT_SEND, delta); + } + + public long addToRedisCommitTimer(long delta) { + return addAndGet(TIMER_REDIS_COMMIT, delta); + } + + public long addToRollbackCounter(long delta) { + return addAndGet(COUNT_ROLLBACK, delta); + } + + public long getRedisEventGetTimer() { + return get(TIMER_REDIS_EVENT_GET); + } + + public long getRedisEventSendTimer() { + return get(TIMER_REDIS_EVENT_SEND); + } + + public long getRedisCommitTimer() { + return get(TIMER_REDIS_COMMIT); + } + + public long getRollbackCount() { + return get(COUNT_ROLLBACK); + } + +} diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounterMBean.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounterMBean.java new file mode 100755 index 0000000000..99a22ef87e --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/RedisChannelCounterMBean.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis; + +public interface RedisChannelCounterMBean { + long getRedisEventGetTimer(); + + long getRedisEventSendTimer(); + + long getRedisCommitTimer(); + + long getRollbackCount(); + + long getChannelSize(); + + long getEventPutAttemptCount(); + + long getEventTakeAttemptCount(); + + long getEventPutSuccessCount(); + + long getEventTakeSuccessCount(); + + long getStartTime(); + + long getStopTime(); + + long getChannelCapacity(); + + String getType(); + + double getChannelFillPercentage(); + +} diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/CannotGetRedisInstanceException.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/CannotGetRedisInstanceException.java new file mode 100755 index 0000000000..6093e770bb --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/CannotGetRedisInstanceException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis.exception; + +public class CannotGetRedisInstanceException extends Exception { + + public CannotGetRedisInstanceException() { + super(); + } + + public CannotGetRedisInstanceException(String message) { + super(message); + } + + public CannotGetRedisInstanceException(String message,Throwable cause) { + super(message,cause); + } + + public CannotGetRedisInstanceException(Throwable cause) { + super(cause); + } +} \ No newline at end of file diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/UnsupportRedisTypeException.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/UnsupportRedisTypeException.java new file mode 100755 index 0000000000..45ad66e07b --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/exception/UnsupportRedisTypeException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis.exception; + +public class UnsupportRedisTypeException extends Exception { + + public UnsupportRedisTypeException() { + super(); + } + + public UnsupportRedisTypeException(String message) { + super(message); + } + + public UnsupportRedisTypeException(String message,Throwable cause) { + super(message,cause); + } + + public UnsupportRedisTypeException(Throwable cause) { + super(cause); + } +} \ No newline at end of file diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/ClusterRedisOperator.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/ClusterRedisOperator.java new file mode 100755 index 0000000000..df5f40e9d5 --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/ClusterRedisOperator.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis.tools; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.HostAndPort; + +public class ClusterRedisOperator implements RedisOperator { + private static final Logger log = LoggerFactory.getLogger(RedisInit.class); + private JedisCluster jedis; + + public ClusterRedisOperator(String servers, String passwd, int timeout, + int maxAttemp, JedisPoolConfig jedisPoolConfig) { + Set nodeSet = new HashSet<>(); + String[] serverStrings = servers.split(","); + for (String ipPort : serverStrings) { + String[] ipPortPair = ipPort.split(":"); + nodeSet.add(new redis.clients.jedis.HostAndPort(ipPortPair[0].trim(), + Integer.valueOf(ipPortPair[1].trim()))); + } + if (StringUtils.isBlank(passwd)) { + jedis = new JedisCluster(nodeSet, timeout, timeout, maxAttemp, + jedisPoolConfig); + } else { + jedis = new JedisCluster(nodeSet, timeout, timeout, maxAttemp, passwd, + jedisPoolConfig); + } + } + + public long lpush(String key, String... strings) { + long count = 0; + count = jedis.lpush(key, strings); + return count; + } + + public String rpoplpush(String srckey, String dstkey) { + String msg = ""; + msg = jedis.rpoplpush(srckey, dstkey); + return msg; + } + + public String rpop(String key) { + String msg = ""; + msg = jedis.rpop(key); + return msg; + } + + public Long llen(String key) { + long result; + result = jedis.llen(key); + return result; + } +} diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisInit.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisInit.java new file mode 100755 index 0000000000..95bb071116 --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisInit.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis.tools; + +import java.util.HashMap; +import java.util.Properties; + +import javax.naming.ConfigurationException; + +import redis.clients.jedis.JedisPoolConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang.StringUtils; +import org.apache.flume.channels.redis.exception.CannotGetRedisInstanceException; +import org.apache.flume.channels.redis.exception.UnsupportRedisTypeException; + +import static org.apache.flume.channels.redis.RedisChannelConfiguration.*; + +public class RedisInit { + private static final Logger LOGGER = LoggerFactory.getLogger(RedisInit.class); + private static HashMap instances = new HashMap(); + + public static synchronized RedisOperator getInstance(Properties redisConf, + JedisPoolConfig jedisPoolConfig) + throws UnsupportRedisTypeException, CannotGetRedisInstanceException { + String type = redisConf.getProperty(SERVER_TYPE); + String hosts = redisConf.getProperty(REDIS_SERVER); + String passwd = redisConf.getProperty(REDIS_PASSWORD); + int timeout = Integer.parseInt(redisConf.getProperty(REDIS_CONNTIMOUT)); + String instanceKey = type + "|" + hosts; + RedisOperator ro = null; + if (instances.containsKey(instanceKey)) { + ro = instances.get(instanceKey); + } else { + if (null == jedisPoolConfig) { + jedisPoolConfig = new JedisPoolConfig(); + } + if (type.equals(DEFAULT_SEVER_TYPE)) { + String[] hostAndPort = hosts.split(":"); + ro = new SingleServerRedisOperator(hostAndPort[0].trim(), + Integer.valueOf(hostAndPort[1].trim()), passwd, timeout, + jedisPoolConfig); + } else if (type.equals(CLUSTER_SERVER_TYPE)) { + int maxAttemp = Integer + .parseInt(redisConf.getProperty(CLUSTER_MAX_ATTEMP)); + ro = new ClusterRedisOperator(hosts, passwd, timeout, maxAttemp, + jedisPoolConfig); + } else if (type.equals(SENTINEL_SERVER_TYPE)) { + String masterName = redisConf.getProperty(SENTINEL_MASTER_NAME); + ro = new SentinelServerRedisOperator(masterName, hosts, passwd, timeout, + jedisPoolConfig); + } else { + throw new UnsupportRedisTypeException("Unsupport redis type " + type); + } + instances.put(instanceKey, ro); + } + if (null == ro) { + throw new CannotGetRedisInstanceException( + "Can not get redis instance with type: " + type + " and servers: " + + hosts); + } + return ro; + } +} diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisOperator.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisOperator.java new file mode 100755 index 0000000000..aaefdacd0b --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/RedisOperator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis.tools; + +import java.util.List; + +public interface RedisOperator { + + public long lpush(String key, String... strings); + + public String rpoplpush(String srckey, String dstkey); + + public String rpop(String key); + + public Long llen(String key); +} diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelRedisController.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelRedisController.java new file mode 100755 index 0000000000..70e31462ef --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelRedisController.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis.tools; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.JedisSentinelPool; + +public class SentinelRedisController { + private JedisSentinelPool jsp; + + public SentinelRedisController(String masterName, String servers, + String passwd, int timeout, JedisPoolConfig jedisPoolConfig) { + Set sentinels = new HashSet(); + String[] arr = servers.split(","); + if (arr != null && arr.length > 0) { + for (String item : arr) { + if (item != null && !item.equals("")) { + sentinels.add(item); + } + } + } + if (StringUtils.isBlank(passwd)) { + jsp = new JedisSentinelPool(masterName, sentinels, jedisPoolConfig, + timeout); + + } else { + jsp = new JedisSentinelPool(masterName, sentinels, jedisPoolConfig, + timeout, passwd); + } + } + + public void destory() { + jsp.destroy(); + } + + public Jedis getController() { + return jsp.getResource(); + } + + public void returnController(Jedis _jedis) { + _jedis.close(); + } +} diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelServerRedisOperator.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelServerRedisOperator.java new file mode 100755 index 0000000000..5996b75b74 --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SentinelServerRedisOperator.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis.tools; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Response; +import redis.clients.jedis.Transaction; + +public class SentinelServerRedisOperator implements RedisOperator { + private static final Logger log = LoggerFactory.getLogger(RedisInit.class); + private SentinelRedisController rc; + + public SentinelServerRedisOperator(String mastName, String servers, String passwd, + int timeout, JedisPoolConfig jedisPoolConfig) { + rc = new SentinelRedisController(mastName, servers, passwd, timeout, jedisPoolConfig); + } + + public long lpush(String key, String... strings) { + Jedis jedis = rc.getController(); + long count = 0; + try { + count = jedis.lpush(key, strings); + } finally { + jedis.close(); + } + return count; + } + + public String rpoplpush(String srckey, String dstkey) { + Jedis jedis = rc.getController(); + String msg = ""; + try { + msg = jedis.rpoplpush(srckey, dstkey); + } finally { + jedis.close(); + } + return msg; + } + + public String rpop(String key) { + Jedis jedis = rc.getController(); + String msg = ""; + try { + msg = jedis.rpop(key); + } finally { + jedis.close(); + } + return msg; + } + + public Long llen(String key) { + Jedis jedis = rc.getController(); + long result; + try { + result = jedis.llen(key); + } finally { + jedis.close(); + } + return result; + } + + public List mpop(String key, Long batch) { + Jedis jedis = rc.getController(); + Response> result; + try { + Long llen = jedis.llen(key); + Long start = llen - batch; + if (start < 0L) { + start = 0L; + } + Transaction transaction = jedis.multi(); + result = transaction.lrange(key, start, -1); + transaction.ltrim(key, 0, start - 1); + transaction.exec(); + return result.get(); + } finally { + jedis.close(); + } + } + + public Jedis getRedis() { + return rc.getController(); + } +} diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleRedisController.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleRedisController.java new file mode 100755 index 0000000000..0e0bcbe717 --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleRedisController.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis.tools; + +import org.apache.commons.lang.StringUtils; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.JedisSentinelPool; + +public class SingleRedisController { + private JedisPool jsp; + + public SingleRedisController(String host, int port, String passwd, + int timeout, JedisPoolConfig jedisPoolConfig) { + if (null == jedisPoolConfig) { + jedisPoolConfig = new JedisPoolConfig(); + } + if (!StringUtils.isBlank(passwd)) { + jsp = new JedisPool(jedisPoolConfig, host, port, timeout, passwd); + + } else { + jsp = new JedisPool(jedisPoolConfig, host, port, timeout); + } + } + + public void destory() { + jsp.destroy(); + } + + public Jedis getController() { + return jsp.getResource(); + } + + public void returnController(Jedis _jedis) { + // jsp.returnBrokenResource(_jedis); + _jedis.close(); + } +} diff --git a/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleServerRedisOperator.java b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleServerRedisOperator.java new file mode 100755 index 0000000000..9aa9040703 --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/main/java/org/apache/flume/channels/redis/tools/SingleServerRedisOperator.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channels.redis.tools; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Response; +import redis.clients.jedis.Transaction; + +public class SingleServerRedisOperator implements RedisOperator { + private static final Logger log = LoggerFactory.getLogger(RedisInit.class); + private SingleRedisController rc; + + public SingleServerRedisOperator(String host, int port, String passwd, + int timeout, JedisPoolConfig jedisPoolConfig) { + rc = new SingleRedisController(host, port, passwd, timeout, jedisPoolConfig); + } + + public long lpush(String key, String... strings) { + Jedis jedis = rc.getController(); + long count = 0; + try { + count = jedis.lpush(key, strings); + } finally { + jedis.close(); + } + return count; + } + + public String rpoplpush(String srckey, String dstkey) { + Jedis jedis = rc.getController(); + String msg = ""; + try { + msg = jedis.rpoplpush(srckey, dstkey); + } finally { + jedis.close(); + } + return msg; + } + + public String rpop(String key) { + Jedis jedis = rc.getController(); + String msg = ""; + try { + msg = jedis.rpop(key); + } finally { + jedis.close(); + } + return msg; + } + + public Long llen(String key) { + Jedis jedis = rc.getController(); + long result; + try { + result = jedis.llen(key); + } finally { + jedis.close(); + } + return result; + } + + public List mpop(String key, Long batch) { + Jedis jedis = rc.getController(); + Response> result; + try { + Long llen = jedis.llen(key); + Long start = llen - batch; + if (start < 0L) { + start = 0L; + } + Transaction transaction = jedis.multi(); + result = transaction.lrange(key, start, -1); + transaction.ltrim(key, 0, start - 1); + transaction.exec(); + return result.get(); + } finally { + jedis.close(); + } + } + + public Jedis getRedis() { + return rc.getController(); + } + +} diff --git a/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSentinelRedis.java b/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSentinelRedis.java new file mode 100755 index 0000000000..7415226d2b --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSentinelRedis.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.redis; + +import org.apache.flume.channels.redis.tools.SentinelServerRedisOperator; +import org.apache.flume.channels.redis.tools.SingleServerRedisOperator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import redis.clients.jedis.JedisPoolConfig; +import redis.embedded.RedisCluster; +import redis.embedded.RedisExecProvider; +import redis.embedded.RedisServer; +import redis.embedded.util.JedisUtil; + +import java.io.IOException; +import java.util.Set; + +public class TestSentinelRedis { + private RedisCluster cluster; + private Set jedisSentinelHosts; + private SentinelServerRedisOperator sentinelServerRedisOpertor; + + @Before + public void setUp() throws IOException { + cluster = RedisCluster.builder().ephemeral().sentinelCount(3).quorumSize(2).replicationGroup( + "master1",1) + .replicationGroup("master2", 1) + .replicationGroup("master3", 1).build(); + cluster.start(); + jedisSentinelHosts = JedisUtil.sentinelHosts(cluster); + StringBuilder sb = new StringBuilder(); + for (String host : jedisSentinelHosts) { + sb.append(host + ","); + } + sentinelServerRedisOpertor = new SentinelServerRedisOperator("master1", + sb.toString(), "", 5000, new JedisPoolConfig()); + } + + @Test + public void testSimpleListOperator() { + Assert.assertEquals(1, sentinelServerRedisOpertor.lpush("test", "a")); + Assert.assertEquals(1, Integer.parseInt(sentinelServerRedisOpertor.llen("test") + .toString())); + Assert.assertEquals("a", sentinelServerRedisOpertor.rpop("test")); + Assert.assertEquals(0, Integer.parseInt(sentinelServerRedisOpertor.llen("test") + .toString())); + } + + @Test + public void testComplexListOperator() { + String[] multiPush = {"a", "b", "c", "d", "e"}; + Assert.assertEquals(multiPush.length, sentinelServerRedisOpertor.lpush("test", multiPush)); + Assert.assertEquals(multiPush.length, Integer.parseInt(sentinelServerRedisOpertor.llen( + "test").toString())); + Assert.assertEquals(multiPush[0], sentinelServerRedisOpertor.rpop("test")); + Assert.assertEquals(multiPush.length - 1, + Integer.parseInt(sentinelServerRedisOpertor.llen("test").toString())); + + } + + @After + public void tearDown() { + cluster.stop(); + } +} diff --git a/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSingleRedis.java b/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSingleRedis.java new file mode 100755 index 0000000000..8895d947a3 --- /dev/null +++ b/flume-ng-channels/flume-redis-channel/src/test/java/org/apache/flume/channel/redis/TestSingleRedis.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.redis; + +import org.apache.flume.channels.redis.tools.SingleServerRedisOperator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import redis.embedded.RedisExecProvider; +import redis.embedded.RedisServer; + +import java.io.IOException; + +public class TestSingleRedis { + private RedisServer redisServer; + private SingleServerRedisOperator singleServerRedisOperator; + + @Before + public void setUp() throws IOException { + RedisExecProvider customProvider = RedisExecProvider.defaultProvider(); + redisServer = RedisServer.builder().redisExecProvider(customProvider) + .setting("maxheap 128M").build(); + redisServer.start(); + singleServerRedisOperator = new SingleServerRedisOperator("localhost", 6379, "", 1000, null); + } + + @Test + public void testSimpleListOperator() { + Assert.assertEquals(1,singleServerRedisOperator.lpush("test", "a")); + Assert.assertEquals(1, Integer.parseInt(singleServerRedisOperator.llen("test").toString())); + Assert.assertEquals("a", singleServerRedisOperator.rpop("test")); + Assert.assertEquals(0, Integer.parseInt(singleServerRedisOperator.llen("test").toString())); + } + + @Test + public void testComplexListOperator() { + String[] multiPush = {"a","b","c","d","e"}; + Assert.assertEquals(multiPush.length,singleServerRedisOperator.lpush("test", multiPush)); + Assert.assertEquals(multiPush.length, + Integer.parseInt(singleServerRedisOperator.llen("test").toString())); + Assert.assertEquals(multiPush[0], singleServerRedisOperator.rpop("test")); + Assert.assertEquals(multiPush.length - 1, + Integer.parseInt(singleServerRedisOperator.llen("test").toString())); + + } + + @After + public void tearDown() { + redisServer.stop(); + } + +} diff --git a/flume-ng-channels/pom.xml b/flume-ng-channels/pom.xml index d86b9a432f..323e32074c 100644 --- a/flume-ng-channels/pom.xml +++ b/flume-ng-channels/pom.xml @@ -45,5 +45,6 @@ limitations under the License. flume-file-channel flume-spillable-memory-channel flume-kafka-channel + flume-redis-channel diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java index 2e3cb34397..3dfb374047 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java @@ -51,7 +51,13 @@ public enum ChannelType { * * @see SpillableMemoryChannel */ - SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel"); + SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel"), + + /** + * Redis Channel + * @see RedisChannel + */ + REDIS("org.flume.channels.redis.RedisChannel"); private final String channelClassName; diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst old mode 100644 new mode 100755 index 6d7085c545..873bf2ff6a --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -3391,6 +3391,61 @@ Also please make sure that the operating system user of the Flume processes has }; +Redis Channel +~~~~~~~~~~~~~ + +The events are stored in a Redis server (must be installed separately). If you have many agents but want to share one channel which can provide high concurrency, Redis Channel is a good choice. Redis also can provide high availability and replication, you can choose the best way to adapt your application. + +This version of Flume requires Redis version 2.6 or greater. If you want to use Redis cluster, we recommend Redis version 3.1 or greater. + +Required properties are in **bold**. + +================================================ ================================ ======================================================== +Property Name Default Description +================================================ ================================ ======================================================== +**type** -- The component type name, needs to be ``redis`` +**server.type** single Specify the type of Redis, now we support single, sentinel and cluster. Default value is single server mode. +single.server -- A Single Redis server used by the channel. It will work with "single" server type. +sentinel.servers -- Sentinel Redis servers used by the channel. It will work with "sentinel" server type. Use commas to separate different node. +sentinel.master.name -- Sentinel Redis master name. It will work with "sentinel" server type. +cluster.servers -- Cluster Redis servers used by the channel. It will work with "cluster" server type. Use commas to separate different node. +cluster.max.attemp 1 Amount times to retry to access Redis cluster. +**key** -- Since we use list to store events, you must specify a name of list. The list can be not exist in Redis server. +password -- The password of Redis. +redis.timout 5000 Amount of time (in milliseconds) to wait to connect Redis. +redis.max.total 500 +Max total number connections of Redis pool. +redis.max.idle 300 Max idle number connections of Redis pool. +redis.min.idle 10 Min idle number connections of Redis pools. +redis.max.wait.millis 60000 Amount of time(in milliseconds) to get a connection from Redis pool. +redis.test.on.borrow true If test when get a connection from Redis pool. Recommond let it be true in case the connection lost. +redis.test.on.return true If test when give a connection back to Redis pool. +redis.test.while.idle true If test while connection in Redis pool. Recommond let it be true in case the connection lost. +================================================ ================================ ======================================================== + + +Example for agent named a1 and channel named channel1: + +.. code-block:: properties + + a1.channels.channel1.type = org.flume.channels.redis.RedisChannel + a1.channels.channel1.server = 10.0.0.1:6379 + a1.channels.channel1.key = redis-channel + a1.channels.channel1.password = mytest + + a1.channels.channel2.type = org.flume.channels.redis.RedisChannel + a1.channels.channel2.server.type = sentinel + a1.channels.channel2.sentinel.servers = 10.0.0.1:26379,10.0.0.2:26379 + a1.channels.channel2.sentinel.master.name = mymaster + a1.channels.channel2.key = redis-channel + + a1.channels.channel3.type = org.flume.channels.redis.RedisChannel + a1.channels.channel3.server.type = cluster + a1.channels.channel3.cluster.servers = 10.0.0.1:7000,10.0.0.1:7001,10.0.0.1:7002 + a1.channels.channel3.key = redis-channel + + + File Channel ~~~~~~~~~~~~