Skip to content

Add BucketWriterOptions to simplify the BucketWriter's constructor #103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,62 +114,34 @@ class BucketWriter {
protected boolean closed = false;
AtomicInteger renameTries = new AtomicInteger(0);

BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
Context context, String filePath, String fileName, String inUsePrefix,
String inUseSuffix, String fileSuffix, CompressionCodec codeC,
CompressionType compType, HDFSWriter writer,
ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser,
SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback,
String onCloseCallbackPath, long callTimeout,
ExecutorService callTimeoutPool, long retryInterval,
int maxCloseTries) {
this(rollInterval, rollSize, rollCount, batchSize,
context, filePath, fileName, inUsePrefix,
inUseSuffix, fileSuffix, codeC,
compType, writer,
timedRollerPool, proxyUser,
sinkCounter, idleTimeout, onCloseCallback,
onCloseCallbackPath, callTimeout,
callTimeoutPool, retryInterval,
maxCloseTries, new SystemClock());
}

BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
Context context, String filePath, String fileName, String inUsePrefix,
String inUseSuffix, String fileSuffix, CompressionCodec codeC,
CompressionType compType, HDFSWriter writer,
ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser,
SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback,
String onCloseCallbackPath, long callTimeout,
ExecutorService callTimeoutPool, long retryInterval,
int maxCloseTries, Clock clock) {
this.rollInterval = rollInterval;
this.rollSize = rollSize;
this.rollCount = rollCount;
this.batchSize = batchSize;
this.filePath = filePath;
this.fileName = fileName;
this.inUsePrefix = inUsePrefix;
this.inUseSuffix = inUseSuffix;
this.fileSuffix = fileSuffix;
this.codeC = codeC;
this.compType = compType;
this.writer = writer;
this.timedRollerPool = timedRollerPool;
this.proxyUser = proxyUser;
this.sinkCounter = sinkCounter;
this.idleTimeout = idleTimeout;
this.onCloseCallback = onCloseCallback;
this.onCloseCallbackPath = onCloseCallbackPath;
this.callTimeout = callTimeout;
this.callTimeoutPool = callTimeoutPool;
fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());

this.retryInterval = retryInterval;
this.maxRenameTries = maxCloseTries;
BucketWriter(BucketWriterOptions options) {
this.rollInterval = options.rollInterval;
this.rollSize = options.rollSize;
this.rollCount = options.rollCount;
this.batchSize = options.batchSize;
this.filePath = options.filePath;
this.fileName = options.fileName;
this.inUsePrefix = options.inUsePrefix;
this.inUseSuffix = options.inUseSuffix;
this.fileSuffix = options.fileSuffix;
this.codeC = options.compressionCodec;
this.compType = options.compressionType;
this.writer = options.hdfsWriter;
this.timedRollerPool = options.timedRollerPool;
this.proxyUser = options.proxyUser;
this.sinkCounter = options.sinkCounter;
this.idleTimeout = options.idleTimeout;
this.onCloseCallback = options.onCloseCallback;
this.onCloseCallbackPath = options.onCloseCallbackPath;
this.callTimeout = options.callTimeout;
this.callTimeoutPool = options.callTimeoutPool;
fileExtensionCounter = new AtomicLong(options.clock.currentTimeMillis());

this.retryInterval = options.retryInterval;
this.maxRenameTries = options.maxCloseTries;
isOpen = false;
isUnderReplicated = false;
this.writer.configure(context);
this.writer.configure(options.context);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.sink.hdfs;

import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.SystemClock;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
* Wrapper class for {@link BucketWriter}'s constructor parameters.
*/
class BucketWriterOptions {

long rollInterval;
long rollSize;
long rollCount;
long batchSize;
Context context;
String filePath;
String fileName;
String inUsePrefix;
String inUseSuffix;
String fileSuffix;
CompressionCodec compressionCodec;
SequenceFile.CompressionType compressionType;
HDFSWriter hdfsWriter;
ScheduledExecutorService timedRollerPool;
PrivilegedExecutor proxyUser;
SinkCounter sinkCounter;
int idleTimeout;
HDFSEventSink.WriterCallback onCloseCallback;
String onCloseCallbackPath;
long callTimeout;
ExecutorService callTimeoutPool;
long retryInterval;
int maxCloseTries;
Clock clock = new SystemClock();

BucketWriterOptions rollInterval(long rollInterval) {
this.rollInterval = rollInterval;
return this;
}

BucketWriterOptions rollSize(long rollSize) {
this.rollSize = rollSize;
return this;
}

BucketWriterOptions rollCount(long rollCount) {
this.rollCount = rollCount;
return this;
}

BucketWriterOptions batchSize(long batchSize) {
this.batchSize = batchSize;
return this;
}

BucketWriterOptions context(Context context) {
this.context = context;
return this;
}

BucketWriterOptions filePath(String filePath) {
this.filePath = filePath;
return this;
}

BucketWriterOptions fileName(String fileName) {
this.fileName = fileName;
return this;
}

BucketWriterOptions inUsePrefix(String inUsePrefix) {
this.inUsePrefix = inUsePrefix;
return this;
}

BucketWriterOptions inUseSuffix(String inUseSuffix) {
this.inUseSuffix = inUseSuffix;
return this;
}

BucketWriterOptions fileSuffix(String fileSuffix) {
this.fileSuffix = fileSuffix;
return this;
}

BucketWriterOptions compressionCodec(CompressionCodec compressionCodec) {
this.compressionCodec = compressionCodec;
return this;
}

BucketWriterOptions compressionType(SequenceFile.CompressionType compressionType) {
this.compressionType = compressionType;
return this;
}

BucketWriterOptions hdfsWriter(HDFSWriter hdfsWriter) {
this.hdfsWriter = hdfsWriter;
return this;
}

BucketWriterOptions timedRollerPool(ScheduledExecutorService timedRollerPool) {
this.timedRollerPool = timedRollerPool;
return this;
}

BucketWriterOptions proxyUser(PrivilegedExecutor proxyUser) {
this.proxyUser = proxyUser;
return this;
}

BucketWriterOptions sinkCounter(SinkCounter sinkCounter) {
this.sinkCounter = sinkCounter;
return this;
}

BucketWriterOptions idleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
return this;
}

BucketWriterOptions onCloseCallback(HDFSEventSink.WriterCallback onCloseCallback) {
this.onCloseCallback = onCloseCallback;
return this;
}

BucketWriterOptions onCloseCallbackPath(String onCloseCallbackPath) {
this.onCloseCallbackPath = onCloseCallbackPath;
return this;
}

BucketWriterOptions callTimeout(long callTimeout) {
this.callTimeout = callTimeout;
return this;
}

BucketWriterOptions callTimeoutPool(ExecutorService callTimeoutPool) {
this.callTimeoutPool = callTimeoutPool;
return this;
}

BucketWriterOptions retryInterval(long retryInterval) {
this.retryInterval = retryInterval;
return this;
}

BucketWriterOptions maxCloseTries(int maxCloseTries) {
this.maxCloseTries = maxCloseTries;
return this;
}

BucketWriterOptions clock(Clock clock) {
this.clock = clock;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,13 +458,31 @@ public void run(String bucketPath) {
private BucketWriter initializeBucketWriter(String realPath,
String realName, String lookupPath, HDFSWriter hdfsWriter,
WriterCallback closeCallback) {
BucketWriter bucketWriter = new BucketWriter(rollInterval,
rollSize, rollCount,
batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType, hdfsWriter, timedRollerPool,
privExecutor, sinkCounter, idleTimeout, closeCallback,
lookupPath, callTimeout, callTimeoutPool, retryInterval,
tryCount);
BucketWriterOptions options = new BucketWriterOptions()
.rollInterval(rollInterval)
.rollSize(rollSize)
.rollCount(rollCount)
.batchSize(batchSize)
.context(context)
.filePath(realPath)
.fileName(realName)
.inUsePrefix(inUsePrefix)
.inUseSuffix(inUseSuffix)
.fileSuffix(suffix)
.compressionCodec(codeC)
.compressionType(compType)
.hdfsWriter(hdfsWriter)
.timedRollerPool(timedRollerPool)
.proxyUser(privExecutor)
.sinkCounter(sinkCounter)
.idleTimeout(idleTimeout)
.onCloseCallback(closeCallback)
.onCloseCallbackPath(lookupPath)
.callTimeout(callTimeout)
.callTimeoutPool(callTimeoutPool)
.retryInterval(retryInterval)
.maxCloseTries(tryCount);
BucketWriter bucketWriter = new BucketWriter(options);
if (mockFs != null) {
bucketWriter.setFileSystem(mockFs);
bucketWriter.setMockStream(mockWriter);
Expand Down
Loading