diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 741f01e7b8..60c57ef904 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -369,7 +369,9 @@ public Status process() throws EventDeliveryException { timeZone, needRounding, roundUnit, roundValue, useLocalTime); String realName = BucketPath.escapeString(fileName, event.getHeaders(), timeZone, needRounding, roundUnit, roundValue, useLocalTime); - + String realSuffix = BucketPath.escapeString(suffix, event.getHeaders(), + timeZone, needRounding, roundUnit, roundValue, useLocalTime); + String lookupPath = realPath + DIRECTORY_DELIMITER + realName; BucketWriter bucketWriter; HDFSWriter hdfsWriter = null; @@ -390,7 +392,7 @@ public void run(String bucketPath) { // we haven't seen this file yet, so open it and cache the handle if (bucketWriter == null) { hdfsWriter = writerFactory.getWriter(fileType); - bucketWriter = initializeBucketWriter(realPath, realName, + bucketWriter = initializeBucketWriter(realPath, realName, realSuffix, lookupPath, hdfsWriter, closeCallback); sfWriters.put(lookupPath, bucketWriter); } @@ -408,7 +410,7 @@ public void run(String bucketPath) { LOG.info("Bucket was closed while trying to append, " + "reinitializing bucket and writing event."); hdfsWriter = writerFactory.getWriter(fileType); - bucketWriter = initializeBucketWriter(realPath, realName, + bucketWriter = initializeBucketWriter(realPath, realName, realSuffix, lookupPath, hdfsWriter, closeCallback); synchronized (sfWritersLock) { sfWriters.put(lookupPath, bucketWriter); @@ -456,12 +458,12 @@ public void run(String bucketPath) { } private BucketWriter initializeBucketWriter(String realPath, - String realName, String lookupPath, HDFSWriter hdfsWriter, + String realName, String realSuffix, String lookupPath, HDFSWriter hdfsWriter, WriterCallback closeCallback) { BucketWriter bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, - suffix, codeC, compType, hdfsWriter, timedRollerPool, + realSuffix, codeC, compType, hdfsWriter, timedRollerPool, privExecutor, sinkCounter, idleTimeout, closeCallback, lookupPath, callTimeout, callTimeoutPool, retryInterval, tryCount); diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index 4221a5d2b0..b89cbe3901 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -19,12 +19,14 @@ package org.apache.flume.sink.hdfs; import com.google.common.base.Charsets; + import org.apache.flume.Clock; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.auth.FlumeAuthenticationUtil; import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.event.EventBuilder; +import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.instrumentation.SinkCounter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -373,6 +375,7 @@ public void testInUseSuffix() throws IOException, InterruptedException { Assert.assertTrue("Incorrect in use suffix", hdfsWriter.getOpenedFilePath().contains(SUFFIX)); } + @Test public void testCallbackOnClose() throws IOException, InterruptedException { final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index 782cf47cc6..29d76254c6 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.Maps; + import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -51,6 +52,7 @@ import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.apache.flume.event.SimpleEvent; +import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.lifecycle.LifecycleException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -119,7 +121,9 @@ public void setUp() { @After public void tearDown() { - if (System.getenv("hdfs_keepFiles") == null) dirCleanup(); + if (System.getenv("hdfs_keepFiles") == null) { + dirCleanup(); + } } @Test @@ -204,9 +208,11 @@ public void doTestTextBatchAppend(boolean useRawLocalFileSystem) // check that the roll happened correctly for the given data long expectedFiles = totalEvents / rollCount; - if (totalEvents % rollCount > 0) expectedFiles++; - Assert.assertEquals("num files wrong, found: " + - Lists.newArrayList(fList), expectedFiles, fList.length); + if (totalEvents % rollCount > 0) { + expectedFiles++; + } + Assert.assertEquals("num files wrong, found: " + Lists.newArrayList(fList), + expectedFiles, fList.length); // check the contents of the all files verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); } @@ -257,10 +263,9 @@ public void testKerbFileAccess() throws InterruptedException, String kerbConfPrincipal = "user1/localhost@EXAMPLE.COM"; String kerbKeytab = "/usr/lib/flume/nonexistkeytabfile"; - //turn security on + // turn security on Configuration conf = new Configuration(); - conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, - "kerberos"); + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); Context context = new Context(); @@ -278,9 +283,8 @@ public void testKerbFileAccess() throws InterruptedException, Assert.assertTrue(expected.getMessage().contains( "Keytab is not a readable file")); } finally { - //turn security off - conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, - "simple"); + // turn security off + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); } } @@ -357,9 +361,11 @@ public void testTextAppend() throws InterruptedException, LifecycleException, // check that the roll happened correctly for the given data long expectedFiles = totalEvents / rollCount; - if (totalEvents % rollCount > 0) expectedFiles++; - Assert.assertEquals("num files wrong, found: " + - Lists.newArrayList(fList), expectedFiles, fList.length); + if (totalEvents % rollCount > 0) { + expectedFiles++; + } + Assert.assertEquals("num files wrong, found: " + Lists.newArrayList(fList), + expectedFiles, fList.length); verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); } @@ -436,12 +442,88 @@ public void testAvroAppend() throws InterruptedException, LifecycleException, // check that the roll happened correctly for the given data long expectedFiles = totalEvents / rollCount; - if (totalEvents % rollCount > 0) expectedFiles++; - Assert.assertEquals("num files wrong, found: " + - Lists.newArrayList(fList), expectedFiles, fList.length); + if (totalEvents % rollCount > 0) { + expectedFiles++; + } + Assert.assertEquals("num files wrong, found: " + Lists.newArrayList(fList), + expectedFiles, fList.length); verifyOutputAvroFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); } + @Test + public void testFileSuffixTranslation() throws InterruptedException, + LifecycleException, EventDeliveryException, IOException { + + LOG.debug("Starting..."); + final String fileName = "FlumeData"; + String newPath = testPath + "/singleBucket"; + + // clear the test directory + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + Path dirPath = new Path(newPath); + fs.delete(dirPath, true); + fs.mkdirs(dirPath); + + Context context = new Context(); + + context.put("hdfs.path", newPath); + context.put("hdfs.filePrefix", fileName); + context.put("hdfs.fileSuffix", "_%Y%m%d.avro"); + + Configurables.configure(sink, context); + + Channel channel = new MemoryChannel(); + Configurables.configure(channel, context); + + sink.setChannel(channel); + sink.start(); + + // Need to override system time use for test so we know what to expect + final long testTime = System.currentTimeMillis(); + + Clock testClock = new Clock() { + public long currentTimeMillis() { + return testTime; + } + }; + Clock prevClock = BucketPath.getClock(); + + BucketPath.setClock(testClock); + + Calendar eventDate = Calendar.getInstance(); + + Transaction txn = channel.getTransaction(); + txn.begin(); + Event event = new SimpleEvent(); + eventDate.set(2017, 1, 1, 1, 0); // yy mm dd + event.getHeaders().put("timestamp", String.valueOf(eventDate.getTimeInMillis())); + String body = "Test"; + event.setBody(body.getBytes()); + + channel.put(event); + txn.commit(); + txn.close(); + + // execute sink to process the events + sink.process(); + + sink.stop(); + + String translatedSuffixString = + BucketPath.escapeString(context.getString("hdfs.fileSuffix"), event.getHeaders()); + + BucketPath.setClock(prevClock); + + // loop through all the files generated and check their contains + FileStatus[] dirStat = fs.listStatus(dirPath); + Path[] fList = FileUtil.stat2Paths(dirStat); + + // check that the roll happened correctly for the given data + Assert.assertTrue("File suffix translation failed: " + fList[0].toUri(), + fList[0].getName().indexOf(translatedSuffixString) != -1); + } + @Test public void testSimpleAppend() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { @@ -512,15 +594,18 @@ public void testSimpleAppend() throws InterruptedException, // check that the roll happened correctly for the given data long expectedFiles = totalEvents / rollCount; - if (totalEvents % rollCount > 0) expectedFiles++; - Assert.assertEquals("num files wrong, found: " + - Lists.newArrayList(fList), expectedFiles, fList.length); - verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); + if (totalEvents % rollCount > 0) { + expectedFiles++; + } + Assert.assertEquals("num files wrong, found: " + Lists.newArrayList(fList), + expectedFiles, fList.length); + verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, + bodies); } @Test - public void testSimpleAppendLocalTime() - throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + public void testSimpleAppendLocalTime() throws InterruptedException, + LifecycleException, EventDeliveryException, IOException { final long currentTime = System.currentTimeMillis(); Clock clk = new Clock() { @Override @@ -534,9 +619,9 @@ public long currentTimeMillis() { final long rollCount = 5; final long batchSize = 2; final int numBatches = 4; - String newPath = testPath + "/singleBucket/%s" ; - String expectedPath = testPath + "/singleBucket/" + - String.valueOf(currentTime / 1000); + String newPath = testPath + "/singleBucket/%s"; + String expectedPath = testPath + "/singleBucket/" + + String.valueOf(currentTime / 1000); int totalEvents = 0; int i = 1, j = 1; @@ -599,10 +684,13 @@ public long currentTimeMillis() { // check that the roll happened correctly for the given data long expectedFiles = totalEvents / rollCount; - if (totalEvents % rollCount > 0) expectedFiles++; - Assert.assertEquals("num files wrong, found: " + - Lists.newArrayList(fList), expectedFiles, fList.length); - verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); + if (totalEvents % rollCount > 0) { + expectedFiles++; + } + Assert.assertEquals("num files wrong, found: " + Lists.newArrayList(fList), + expectedFiles, fList.length); + verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, + bodies); // The clock in bucketpath is static, so restore the real clock sink.setBucketClock(new SystemClock()); } @@ -665,7 +753,8 @@ public void testAppend() throws InterruptedException, LifecycleException, } sink.stop(); - verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); + verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, + bodies); } // inject fault and make sure that the txn is rolled back and retried @@ -742,11 +831,11 @@ public void testBadSimpleAppend() throws InterruptedException, LOG.info("Process events to injected fault: " + sink.process()); LOG.info("Process events remaining events: " + sink.process()); sink.stop(); - verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); + verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, + bodies); } - private List getAllFiles(String input) { List output = Lists.newArrayList(); File dir = new File(input); @@ -761,14 +850,15 @@ private List getAllFiles(String input) { return output; } - private void verifyOutputSequenceFiles(FileSystem fs, Configuration conf, String dir, - String prefix, List bodies) throws IOException { + private void verifyOutputSequenceFiles(FileSystem fs, Configuration conf, + String dir, String prefix, List bodies) throws IOException { int found = 0; int expected = bodies.size(); for (String outputFile : getAllFiles(dir)) { String name = (new File(outputFile)).getName(); if (name.startsWith(prefix)) { - SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(outputFile), conf); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path( + outputFile), conf); LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); while (reader.next(key, value)) { @@ -787,14 +877,13 @@ private void verifyOutputSequenceFiles(FileSystem fs, Configuration conf, String LOG.error("Never found event body: {}", body); } } - Assert.assertTrue("Found = " + found + ", Expected = " + - expected + ", Left = " + bodies.size() + " " + bodies, - bodies.size() == 0); + Assert.assertTrue("Found = " + found + ", Expected = " + expected + + ", Left = " + bodies.size() + " " + bodies, bodies.size() == 0); } - private void verifyOutputTextFiles(FileSystem fs, Configuration conf, String dir, String prefix, - List bodies) throws IOException { + private void verifyOutputTextFiles(FileSystem fs, Configuration conf, + String dir, String prefix, List bodies) throws IOException { int found = 0; int expected = bodies.size(); for (String outputFile : getAllFiles(dir)) { @@ -810,14 +899,13 @@ private void verifyOutputTextFiles(FileSystem fs, Configuration conf, String dir reader.close(); } } - Assert.assertTrue("Found = " + found + ", Expected = " + - expected + ", Left = " + bodies.size() + " " + bodies, - bodies.size() == 0); + Assert.assertTrue("Found = " + found + ", Expected = " + expected + + ", Left = " + bodies.size() + " " + bodies, bodies.size() == 0); } - private void verifyOutputAvroFiles(FileSystem fs, Configuration conf, String dir, String prefix, - List bodies) throws IOException { + private void verifyOutputAvroFiles(FileSystem fs, Configuration conf, + String dir, String prefix, List bodies) throws IOException { int found = 0; int expected = bodies.size(); for (String outputFile : getAllFiles(dir)) { @@ -825,8 +913,8 @@ private void verifyOutputAvroFiles(FileSystem fs, Configuration conf, String dir if (name.startsWith(prefix)) { FSDataInputStream input = fs.open(new Path(outputFile)); DatumReader reader = new GenericDatumReader(); - DataFileStream avroStream = - new DataFileStream(input, reader); + DataFileStream avroStream = new DataFileStream( + input, reader); GenericRecord record = new GenericData.Record(avroStream.getSchema()); while (avroStream.hasNext()) { avroStream.next(record); @@ -841,20 +929,19 @@ private void verifyOutputAvroFiles(FileSystem fs, Configuration conf, String dir input.close(); } } - Assert.assertTrue("Found = " + found + ", Expected = " + - expected + ", Left = " + bodies.size() + " " + bodies, - bodies.size() == 0); + Assert.assertTrue("Found = " + found + ", Expected = " + expected + + ", Left = " + bodies.size() + " " + bodies, bodies.size() == 0); } /** - * Ensure that when a write throws an IOException we are - * able to continue to progress in the next process() call. - * This relies on Transactional rollback semantics for durability and - * the behavior of the BucketWriter class of close()ing upon IOException. + * Ensure that when a write throws an IOException we are able to continue to + * progress in the next process() call. This relies on Transactional rollback + * semantics for durability and the behavior of the BucketWriter class of + * close()ing upon IOException. */ @Test - public void testCloseReopen() - throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + public void testCloseReopen() throws InterruptedException, + LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting..."); final int numBatches = 4; @@ -919,16 +1006,17 @@ public void testCloseReopen() LOG.info("clear any events pending due to errors: " + sink.process()); sink.stop(); - verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); + verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, + bodies); } /** - * Test that the old bucket writer is closed at the end of rollInterval and - * a new one is used for the next set of events. + * Test that the old bucket writer is closed at the end of rollInterval and a + * new one is used for the next set of events. */ @Test - public void testCloseReopenOnRollTime() - throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + public void testCloseReopenOnRollTime() throws InterruptedException, + LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting..."); final int numBatches = 4; @@ -1010,8 +1098,8 @@ public void testCloseReopenOnRollTime() * sfWriters map. */ @Test - public void testCloseRemovesFromSFWriters() - throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + public void testCloseRemovesFromSFWriters() throws InterruptedException, + LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting..."); final String fileName = "FlumeData"; @@ -1088,11 +1176,9 @@ public void testCloseRemovesFromSFWriters() bodies); } - - /* - * append using slow sink writer. - * verify that the process returns backoff due to timeout + * append using slow sink writer. verify that the process returns backoff due + * to timeout */ @Test public void testSlowAppendFailure() throws InterruptedException, @@ -1163,12 +1249,12 @@ public void testSlowAppendFailure() throws InterruptedException, } /* - * append using slow sink writer with specified append timeout - * verify that the data is written correctly to files + * append using slow sink writer with specified append timeout verify that the + * data is written correctly to files */ private void slowAppendTestHelper(long appendTimeout) - throws InterruptedException, IOException, LifecycleException, EventDeliveryException, - IOException { + throws InterruptedException, IOException, LifecycleException, + EventDeliveryException, IOException { final String fileName = "FlumeData"; final long rollCount = 5; final long batchSize = 2; @@ -1239,15 +1325,18 @@ private void slowAppendTestHelper(long appendTimeout) // check that the roll happened correctly for the given data // Note that we'll end up with two files with only a head long expectedFiles = totalEvents / rollCount; - if (totalEvents % rollCount > 0) expectedFiles++; - Assert.assertEquals("num files wrong, found: " + - Lists.newArrayList(fList), expectedFiles, fList.length); - verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); + if (totalEvents % rollCount > 0) { + expectedFiles++; + } + Assert.assertEquals("num files wrong, found: " + Lists.newArrayList(fList), + expectedFiles, fList.length); + verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, + bodies); } /* - * append using slow sink writer with long append timeout - * verify that the data is written correctly to files + * append using slow sink writer with long append timeout verify that the data + * is written correctly to files */ @Test public void testSlowAppendWithLongTimeout() throws InterruptedException, @@ -1257,8 +1346,8 @@ public void testSlowAppendWithLongTimeout() throws InterruptedException, } /* - * append using slow sink writer with no timeout to make append - * synchronous. Verify that the data is written correctly to files + * append using slow sink writer with no timeout to make append synchronous. + * Verify that the data is written correctly to files */ @Test public void testSlowAppendWithoutTimeout() throws InterruptedException, @@ -1266,8 +1355,10 @@ public void testSlowAppendWithoutTimeout() throws InterruptedException, LOG.debug("Starting..."); slowAppendTestHelper(0); } + @Test - public void testCloseOnIdle() throws IOException, EventDeliveryException, InterruptedException { + public void testCloseOnIdle() throws IOException, EventDeliveryException, + InterruptedException { String hdfsPath = testPath + "/idleClose"; Configuration conf = new Configuration(); @@ -1278,8 +1369,8 @@ public void testCloseOnIdle() throws IOException, EventDeliveryException, Interr Context context = new Context(); context.put("hdfs.path", hdfsPath); /* - * All three rolling methods are disabled so the only - * way a file can roll is through the idle timeout. + * All three rolling methods are disabled so the only way a file can roll is + * through the idle timeout. */ context.put("hdfs.rollCount", "0"); context.put("hdfs.rollSize", "0"); @@ -1320,21 +1411,24 @@ public void testCloseOnIdle() throws IOException, EventDeliveryException, Interr sink.stop(); FileStatus[] dirStat = fs.listStatus(dirPath); Path[] fList = FileUtil.stat2Paths(dirStat); - Assert.assertEquals("Incorrect content of the directory " + StringUtils.join(fList, ","), - 2, fList.length); - Assert.assertTrue(!fList[0].getName().endsWith(".tmp") && - !fList[1].getName().endsWith(".tmp")); + Assert.assertEquals( + "Incorrect content of the directory " + StringUtils.join(fList, ","), + 2, fList.length); + Assert.assertTrue(!fList[0].getName().endsWith(".tmp") + && !fList[1].getName().endsWith(".tmp")); fs.close(); } /** - * This test simulates what happens when a batch of events is written to a compressed sequence - * file (and thus hsync'd to hdfs) but the file is not yet closed. + * This test simulates what happens when a batch of events is written to a + * compressed sequence file (and thus hsync'd to hdfs) but the file is not yet + * closed. * * When this happens, the data that we wrote should still be readable. */ @Test - public void testBlockCompressSequenceFileWriterSync() throws IOException, EventDeliveryException { + public void testBlockCompressSequenceFileWriterSync() throws IOException, + EventDeliveryException { String hdfsPath = testPath + "/sequenceFileWriterSync"; FileSystem fs = FileSystem.get(new Configuration()); // Since we are reading a partial file we don't want to use checksums @@ -1342,28 +1436,23 @@ public void testBlockCompressSequenceFileWriterSync() throws IOException, EventD fs.setWriteChecksum(false); // Compression codecs that don't require native hadoop libraries - String [] codecs = {"BZip2Codec", "DeflateCodec"}; + String[] codecs = { "BZip2Codec", "DeflateCodec" }; for (String codec : codecs) { - sequenceFileWriteAndVerifyEvents(fs, hdfsPath, codec, Collections.singletonList( - "single-event" - )); + sequenceFileWriteAndVerifyEvents(fs, hdfsPath, codec, + Collections.singletonList("single-event")); sequenceFileWriteAndVerifyEvents(fs, hdfsPath, codec, Arrays.asList( - "multiple-events-1", - "multiple-events-2", - "multiple-events-3", - "multiple-events-4", - "multiple-events-5" - )); + "multiple-events-1", "multiple-events-2", "multiple-events-3", + "multiple-events-4", "multiple-events-5")); } fs.close(); } - private void sequenceFileWriteAndVerifyEvents(FileSystem fs, String hdfsPath, String codec, - Collection eventBodies) - throws IOException, EventDeliveryException { + private void sequenceFileWriteAndVerifyEvents(FileSystem fs, String hdfsPath, + String codec, Collection eventBodies) throws IOException, + EventDeliveryException { Path dirPath = new Path(hdfsPath); fs.delete(dirPath, true); fs.mkdirs(dirPath); @@ -1400,15 +1489,15 @@ private void sequenceFileWriteAndVerifyEvents(FileSystem fs, String hdfsPath, St sink.process(); } - // Sink is _not_ closed. The file should remain open but + // Sink is _not_ closed. The file should remain open but // the data written should be visible to readers via sync + hflush FileStatus[] dirStat = fs.listStatus(dirPath); Path[] paths = FileUtil.stat2Paths(dirStat); Assert.assertEquals(1, paths.length); - SequenceFile.Reader reader = - new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.stream(fs.open(paths[0]))); + SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), + SequenceFile.Reader.stream(fs.open(paths[0]))); LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); @@ -1428,7 +1517,7 @@ private Context getContextForRetryTests() { context.put("hdfs.batchSize", String.valueOf(100)); context.put("hdfs.fileType", "DataStream"); context.put("hdfs.serializer", "text"); - context.put("hdfs.closeTries","3"); + context.put("hdfs.closeTries", "3"); context.put("hdfs.rollCount", "1"); context.put("hdfs.retryInterval", "1"); return context; @@ -1455,7 +1544,7 @@ public void testBadConfigurationForRetryIntervalNegative() throws Exception { @Test public void testBadConfigurationForRetryCountZero() throws Exception { Context context = getContextForRetryTests(); - context.put("hdfs.closeTries" ,"0"); + context.put("hdfs.closeTries", "0"); Configurables.configure(sink, context); Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount()); @@ -1464,21 +1553,22 @@ public void testBadConfigurationForRetryCountZero() throws Exception { @Test public void testBadConfigurationForRetryCountNegative() throws Exception { Context context = getContextForRetryTests(); - context.put("hdfs.closeTries" ,"-4"); + context.put("hdfs.closeTries", "-4"); Configurables.configure(sink, context); Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount()); } @Test - public void testRetryRename() - throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + public void testRetryRename() throws InterruptedException, + LifecycleException, EventDeliveryException, IOException { testRetryRename(true); testRetryRename(false); } private void testRetryRename(boolean closeSucceed) - throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + throws InterruptedException, LifecycleException, EventDeliveryException, + IOException { LOG.debug("Starting..."); String newPath = testPath + "/retryBucket"; @@ -1530,7 +1620,7 @@ private void testRetryRename(boolean closeSucceed) sink.process(); } - TimeUnit.SECONDS.sleep(5); //Sleep till all retries are done. + TimeUnit.SECONDS.sleep(5); // Sleep till all retries are done. Collection writers = sink.getSfWriters().values();