Skip to content

Flume 2866 #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
private final Charset inputCharset;
private final DecodeErrorPolicy decodeErrorPolicy;
private final ConsumeOrder consumeOrder;
private final int fileTimeMinOffsetSeconds;

private Optional<FileInfo> currentFile = Optional.absent();
/** Always contains the last file from which lines have been read. **/
Expand All @@ -115,7 +116,7 @@ private ReliableSpoolingFileEventReader(File spoolDirectory,
String deserializerType, Context deserializerContext,
String deletePolicy, String inputCharset,
DecodeErrorPolicy decodeErrorPolicy,
ConsumeOrder consumeOrder) throws IOException {
ConsumeOrder consumeOrder, int fileTimeMinOffsetSeconds) throws IOException {

// Sanity checks
Preconditions.checkNotNull(spoolDirectory);
Expand Down Expand Up @@ -176,6 +177,7 @@ private ReliableSpoolingFileEventReader(File spoolDirectory,
this.inputCharset = Charset.forName(inputCharset);
this.decodeErrorPolicy = Preconditions.checkNotNull(decodeErrorPolicy);
this.consumeOrder = Preconditions.checkNotNull(consumeOrder);
this.fileTimeMinOffsetSeconds = fileTimeMinOffsetSeconds;

File trackerDirectory = new File(trackerDirPath);

Expand Down Expand Up @@ -427,21 +429,26 @@ private void deleteCurrentFile(File fileToDelete) throws IOException {
* If the {@link #consumeOrder} variable is {@link ConsumeOrder#RANDOM}
* then cache the directory listing to amortize retreival cost, and return
* any arbitary file from the directory.
* If the file last modified time is less than offset, the file will not be processed.
*/
private Optional<FileInfo> getNextFile() {
List<File> candidateFiles = Collections.emptyList();
final long currentTime = System.currentTimeMillis();
final long olderThanTime = currentTime - (fileTimeMinOffsetSeconds * 1000);
final long newerThanTime = currentTime + (fileTimeMinOffsetSeconds * 1000);

if (consumeOrder != ConsumeOrder.RANDOM ||
candidateFileIter == null ||
!candidateFileIter.hasNext()) {
/* Filter to exclude finished or hidden files */
/* Filter to exclude finished or hidden files or files not meeting the minimum time offset */
FileFilter filter = new FileFilter() {
public boolean accept(File candidate) {
String fileName = candidate.getName();
if ((candidate.isDirectory()) ||
(fileName.endsWith(completedSuffix)) ||
(fileName.startsWith(".")) ||
ignorePattern.matcher(fileName).matches()) {
ignorePattern.matcher(fileName).matches() ||
(candidate.lastModified() >= olderThanTime && candidate.lastModified() <= newerThanTime)) {
return false;
}
return true;
Expand Down Expand Up @@ -596,6 +603,8 @@ public static class Builder {
.toUpperCase(Locale.ENGLISH));
private ConsumeOrder consumeOrder =
SpoolDirectorySourceConfigurationConstants.DEFAULT_CONSUME_ORDER;
private int fileTimeMinOffsetSeconds =
SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_TIME_MIN_OFFSET_SECONDS;

public Builder spoolDirectory(File directory) {
this.spoolDirectory = directory;
Expand Down Expand Up @@ -666,13 +675,18 @@ public Builder consumeOrder(ConsumeOrder consumeOrder) {
this.consumeOrder = consumeOrder;
return this;
}

public Builder fileTimeMinOffsetSeconds(int fileTimeMinOffsetSeconds) {
this.fileTimeMinOffsetSeconds = fileTimeMinOffsetSeconds;
return this;
}

public ReliableSpoolingFileEventReader build() throws IOException {
return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
annotateBaseName, baseNameHeader, deserializerType,
deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy,
consumeOrder);
consumeOrder, fileTimeMinOffsetSeconds);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class SpoolDirectorySource extends AbstractSource implements
private int maxBackoff;
private ConsumeOrder consumeOrder;
private int pollDelay;
private int fileTimeMinOffsetSeconds;

@Override
public synchronized void start() {
Expand All @@ -95,6 +96,7 @@ public synchronized void start() {
.inputCharset(inputCharset)
.decodeErrorPolicy(decodeErrorPolicy)
.consumeOrder(consumeOrder)
.fileTimeMinOffsetSeconds(fileTimeMinOffsetSeconds)
.build();
} catch (IOException ioe) {
throw new FlumeException("Error instantiating spooling event parser",
Expand Down Expand Up @@ -168,6 +170,9 @@ public synchronized void configure(Context context) {

pollDelay = context.getInteger(POLL_DELAY, DEFAULT_POLL_DELAY);

fileTimeMinOffsetSeconds = context.getInteger(FILE_TIME_MIN_OFFSET_SECONDS,
DEFAULT_FILE_TIME_MIN_OFFSET_SECONDS);

// "Hack" to support backwards compatibility with previous generation of
// spooling directory source, which did not support deserializers
Integer bufferMaxLineLength = context.getInteger(BUFFER_MAX_LINE_LENGTH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,8 @@ public enum ConsumeOrder {
/** Delay(in milliseconds) used when polling for new files. The default is 500ms */
public static final String POLL_DELAY = "pollDelay";
public static final int DEFAULT_POLL_DELAY = 500;

/** Minimum time offset from current time before picking up a file */
public static final String FILE_TIME_MIN_OFFSET_SECONDS = "fileTimeMinOffsetSeconds";
public static final int DEFAULT_FILE_TIME_MIN_OFFSET_SECONDS = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,31 @@ public void testZeroByteTrackerFile() throws IOException {
Assert.assertEquals(expectedLines, seenLines);
}

@Test
public void testConsumeFileOlderThan()
throws IOException, InterruptedException {
ReliableEventReader reader
= new ReliableSpoolingFileEventReader.Builder()
.spoolDirectory(WORK_DIR)
.fileTimeMinOffsetSeconds(1)
.build();
File file1 = new File(WORK_DIR, "new-file1");
File file2 = new File(WORK_DIR, "new-file2");
Thread.sleep(1000L);
FileUtils.write(file1, "New file1 created that will be 2s old when read.\n");
Thread.sleep(2000L);
FileUtils.write(file2, "New file2 created.\n");
Set<String> actual = Sets.newHashSet();
readEventsForFilesInDir(WORK_DIR, reader, actual);
Set<String> expected = Sets.newHashSet();
createExpectedFromFilesInSetup(expected);
// Empty Line file was added in the last in Setup.
expected.add("");
expected.add("New file1 created that will be 2s old when read.");

Assert.assertEquals(expected, actual);
}

private void templateTestForLargeNumberOfFiles(ConsumeOrder order,
Comparator<Long> comparator,
int N) throws IOException {
Expand Down