Skip to content

FLUME-3023 {variable} substitution doesn't work for property 'fileSuffix' #110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,9 @@ public Status process() throws EventDeliveryException {
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String realName = BucketPath.escapeString(fileName, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);

String realSuffix = BucketPath.escapeString(suffix, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);

String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
BucketWriter bucketWriter;
HDFSWriter hdfsWriter = null;
Expand All @@ -390,7 +392,7 @@ public void run(String bucketPath) {
// we haven't seen this file yet, so open it and cache the handle
if (bucketWriter == null) {
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
bucketWriter = initializeBucketWriter(realPath, realName, realSuffix,
lookupPath, hdfsWriter, closeCallback);
sfWriters.put(lookupPath, bucketWriter);
}
Expand All @@ -408,7 +410,7 @@ public void run(String bucketPath) {
LOG.info("Bucket was closed while trying to append, " +
"reinitializing bucket and writing event.");
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
bucketWriter = initializeBucketWriter(realPath, realName, realSuffix,
lookupPath, hdfsWriter, closeCallback);
synchronized (sfWritersLock) {
sfWriters.put(lookupPath, bucketWriter);
Expand Down Expand Up @@ -456,12 +458,12 @@ public void run(String bucketPath) {
}

private BucketWriter initializeBucketWriter(String realPath,
String realName, String lookupPath, HDFSWriter hdfsWriter,
String realName, String realSuffix, String lookupPath, HDFSWriter hdfsWriter,
WriterCallback closeCallback) {
BucketWriter bucketWriter = new BucketWriter(rollInterval,
rollSize, rollCount,
batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType, hdfsWriter, timedRollerPool,
realSuffix, codeC, compType, hdfsWriter, timedRollerPool,
privExecutor, sinkCounter, idleTimeout, closeCallback,
lookupPath, callTimeout, callTimeoutPool, retryInterval,
tryCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.flume.sink.hdfs;

import com.google.common.base.Charsets;

import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -373,6 +375,7 @@ public void testInUseSuffix() throws IOException, InterruptedException {
Assert.assertTrue("Incorrect in use suffix", hdfsWriter.getOpenedFilePath().contains(SUFFIX));
}


@Test
public void testCallbackOnClose() throws IOException, InterruptedException {
final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
Expand Down
Loading