Skip to content

FLUME-2984: Fix OOM in thrift source. Make transport buffer configurable. #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
* handle incoming data.
*/
public static final String CONFIG_THREADS = "threads";
/**
* Thrift Default FrameBuffer Size
*/
public static final String MAX_READ_BUFFER_BYTES = "maxReadBufferBytes";
public static final int DEFAULT_MAX_READ_BUFFER_BYTES = 16384000;
/**
* Config param for the hostname to listen on.
*/
Expand Down Expand Up @@ -112,6 +117,7 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
private Integer port;
private String bindAddress;
private int maxThreads = 0;
private int maxReadBufferBytes;
private SourceCounter sourceCounter;
private TServer server;
private ExecutorService servingExecutor;
Expand Down Expand Up @@ -143,6 +149,17 @@ public void configure(Context context) {
"integer value: " + context.getString(CONFIG_THREADS));
}

try {
maxReadBufferBytes = context.getInteger(MAX_READ_BUFFER_BYTES, DEFAULT_MAX_READ_BUFFER_BYTES);
maxReadBufferBytes = (maxReadBufferBytes <= 0) ?
DEFAULT_MAX_READ_BUFFER_BYTES : maxReadBufferBytes;
} catch (NumberFormatException e) {
logger.warn("Thrift source\'s \"maxReadBufferBytes\" property must specify an " +
"integer value: " + context.getString(MAX_READ_BUFFER_BYTES) +
". setting to default value: " + DEFAULT_MAX_READ_BUFFER_BYTES);
maxReadBufferBytes = DEFAULT_MAX_READ_BUFFER_BYTES;
}

if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
Expand Down Expand Up @@ -306,7 +323,7 @@ private TServer getTThreadedSelectorServer() {
}
Class<?> serverClass;
Class<?> argsClass;
TServer.AbstractServerArgs args;
TNonblockingServer.AbstractNonblockingServerArgs args;
try {
serverClass = Class.forName("org.apache.thrift" +
".server.TThreadedSelectorServer");
Expand All @@ -331,6 +348,7 @@ private TServer getTThreadedSelectorServer() {
ExecutorService.class);
m.invoke(args, sourceService);

args.maxReadBufferBytes = maxReadBufferBytes;
populateServerParams(args);

/*
Expand Down Expand Up @@ -365,7 +383,6 @@ private TServer getTThreadPoolServer() {
private void populateServerParams(TServer.AbstractServerArgs args) {
//populate the ProtocolFactory
args.protocolFactory(getProtocolFactory());

//populate the transportFactory
if (enableKerberos) {
args.transportFactory(getSASLTransportFactory());
Expand Down
1 change: 1 addition & 0 deletions flume-ng-doc/sphinx/FlumeUserGuide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ Property Name Default Description
**bind** -- hostname or IP address to listen on
**port** -- Port # to bind to
threads -- Maximum number of worker threads to spawn
maxReadBufferBytes 16384000 Thrift Default FrameBuffer Size
selector.type
selector.*
interceptors -- Space separated list of interceptors
Expand Down