Skip to content

Commit 66319bc

Browse files
author
sewen
committed
Merge branch 'acb' into v2cbm
Conflicts: sopremo/sopremo-base/src/main/java/eu/stratosphere/sopremo/base/Projection.java sopremo/sopremo-base/src/main/java/eu/stratosphere/sopremo/base/UnionAll.java sopremo/sopremo-common/src/test/java/eu/stratosphere/sopremo/pact/CsvInputFormatTest.java sopremo/sopremo-common/src/test/java/eu/stratosphere/sopremo/pact/JsonInputFormatTest.java
2 parents 1221ddb + 635f6ba commit 66319bc

File tree

26 files changed

+898
-361
lines changed

26 files changed

+898
-361
lines changed

pact/pact-clients/src/main/java/eu/stratosphere/pact/testing/TestPlan.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -818,9 +818,9 @@ private OptimizedPlan compile(final Plan plan) {
818818
*/
819819
private void configureSinksAndSources() {
820820
for (FileDataSink sink : this.sinks)
821-
sink.getParameters().setLong(FileOutputFormat.OUTPUT_STREAM_OPEN_TIMEOUT, 0);
821+
sink.getParameters().setLong(FileOutputFormat.OUTPUT_STREAM_OPEN_TIMEOUT_KEY, 0);
822822
for (FileDataSource source : this.sources)
823-
source.getParameters().setLong(FileInputFormat.INPUT_STREAM_OPEN_TIMEOUT, 0);
823+
source.getParameters().setLong(FileInputFormat.INPUT_STREAM_OPEN_TIMEOUT_KEY, 0);
824824
}
825825

826826
/**

pact/pact-common/src/main/java/eu/stratosphere/pact/common/io/DelimitedInputFormat.java

Lines changed: 145 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package eu.stratosphere.pact.common.io;
1717

1818
import java.io.IOException;
19+
import java.io.UnsupportedEncodingException;
1920
import java.net.URI;
2021
import java.util.ArrayList;
2122
import java.util.List;
@@ -30,6 +31,7 @@
3031
import eu.stratosphere.nephele.fs.FileSystem;
3132
import eu.stratosphere.nephele.fs.LineReader;
3233
import eu.stratosphere.nephele.fs.Path;
34+
import eu.stratosphere.pact.common.contract.FileDataSource;
3335
import eu.stratosphere.pact.common.io.statistics.BaseStatistics;
3436
import eu.stratosphere.pact.common.type.PactRecord;
3537

@@ -43,15 +45,7 @@
4345
*/
4446
public abstract class DelimitedInputFormat extends FileInputFormat
4547
{
46-
/**
47-
* The configuration key to set the record delimiter.
48-
*/
49-
public static final String RECORD_DELIMITER = "delimited-format.delimiter";
50-
51-
/**
52-
* The configuration key to set the number of samples to take for the statistics.
53-
*/
54-
public static final String NUM_STATISTICS_SAMPLES = "delimited-format.numSamples";
48+
// -------------------------------------- Constants -------------------------------------------
5549

5650
/**
5751
* The log.
@@ -68,6 +62,23 @@ public abstract class DelimitedInputFormat extends FileInputFormat
6862
*/
6963
private static final int DEFAULT_NUM_SAMPLES = 10;
7064

65+
// ------------------------------------- Config Keys ------------------------------------------
66+
67+
/**
68+
* The configuration key to set the record delimiter.
69+
*/
70+
public static final String RECORD_DELIMITER = "delimited-format.delimiter";
71+
72+
/**
73+
* The configuration key to set the record delimiter encoding.
74+
*/
75+
private static final String RECORD_DELIMITER_ENCODING = "delimited-format.delimiter-encoding";
76+
77+
/**
78+
* The configuration key to set the number of samples to take for the statistics.
79+
*/
80+
private static final String NUM_STATISTICS_SAMPLES = "delimited-format.numSamples";
81+
7182
// --------------------------------------------------------------------------------------------
7283

7384
protected byte[] readBuffer;
@@ -78,7 +89,7 @@ public abstract class DelimitedInputFormat extends FileInputFormat
7889

7990
protected int limit;
8091

81-
protected byte[] delimiter = new byte[] { '\n' };
92+
protected byte[] delimiter = new byte[] {'\n'};
8293

8394
private byte[] currBuffer;
8495
private int currOffset;
@@ -151,12 +162,18 @@ public void configure(Configuration parameters)
151162
{
152163
super.configure(parameters);
153164

154-
String delimString = parameters.getString(RECORD_DELIMITER, "\n");
165+
final String delimString = parameters.getString(RECORD_DELIMITER, AbstractConfigBuilder.NEWLINE_DELIMITER);
155166
if (delimString == null) {
156167
throw new IllegalArgumentException("The delimiter not be null.");
157168
}
169+
final String charsetName = parameters.getString(RECORD_DELIMITER_ENCODING, null);
158170

159-
this.delimiter = delimString.getBytes();
171+
try {
172+
this.delimiter = charsetName == null ? delimString.getBytes() : delimString.getBytes(charsetName);
173+
} catch (UnsupportedEncodingException useex) {
174+
throw new IllegalArgumentException("The charset with the name '" + charsetName +
175+
"' is not supported on this TaskManager instance.", useex);
176+
}
160177

161178
// set the number of samples
162179
this.numLineSamples = DEFAULT_NUM_SAMPLES;
@@ -374,6 +391,9 @@ public boolean reachedEnd()
374391
return this.end;
375392
}
376393

394+
/* (non-Javadoc)
395+
* @see eu.stratosphere.pact.common.generic.io.InputFormat#nextRecord(java.lang.Object)
396+
*/
377397
@Override
378398
public boolean nextRecord(PactRecord record) throws IOException
379399
{
@@ -501,4 +521,117 @@ private final boolean fillBuffer() throws IOException {
501521
return true;
502522
}
503523
}
524+
525+
// ============================================================================================
526+
527+
/**
528+
* Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent
529+
* fashion.
530+
*
531+
* @return A config builder for setting parameters.
532+
*/
533+
public static ConfigBuilder configureDelimitedFormat(FileDataSource target) {
534+
return new ConfigBuilder(target.getParameters());
535+
}
536+
537+
/**
538+
* Abstract builder used to set parameters to the input format's configuration in a fluent way.
539+
*/
540+
protected static class AbstractConfigBuilder<T> extends FileInputFormat.AbstractConfigBuilder<T>
541+
{
542+
private static final String NEWLINE_DELIMITER = "\n";
543+
544+
// --------------------------------------------------------------------
545+
546+
/**
547+
* Creates a new builder for the given configuration.
548+
*
549+
* @param targetConfig The configuration into which the parameters will be written.
550+
*/
551+
protected AbstractConfigBuilder(Configuration config) {
552+
super(config);
553+
}
554+
555+
// --------------------------------------------------------------------
556+
557+
/**
558+
* Sets the delimiter to be a single character, namely the given one. The character must be within
559+
* the value range <code>0</code> to <code>127</code>.
560+
*
561+
* @param delimiter The delimiter character.
562+
* @return The builder itself.
563+
*/
564+
public T recordDelimiter(char delimiter) {
565+
if (delimiter == '\n') {
566+
this.config.setString(RECORD_DELIMITER, NEWLINE_DELIMITER);
567+
} else {
568+
this.config.setString(RECORD_DELIMITER, String.valueOf(delimiter));
569+
}
570+
@SuppressWarnings("unchecked")
571+
T ret = (T) this;
572+
return ret;
573+
}
574+
575+
/**
576+
* Sets the delimiter to be the given string. The string will be converted to bytes for more efficient
577+
* comparison during input parsing. The conversion will be done using the platforms default charset.
578+
*
579+
* @param delimiter The delimiter string.
580+
* @return The builder itself.
581+
*/
582+
public T recordDelimiter(String delimiter) {
583+
this.config.setString(RECORD_DELIMITER, delimiter);
584+
@SuppressWarnings("unchecked")
585+
T ret = (T) this;
586+
return ret;
587+
}
588+
589+
/**
590+
* Sets the delimiter to be the given string. The string will be converted to bytes for more efficient
591+
* comparison during input parsing. The conversion will be done using the charset with the given name.
592+
* The charset must be available on the processing nodes, otherwise an exception will be raised at
593+
* runtime.
594+
*
595+
* @param delimiter The delimiter string.
596+
* @param charsetName The name of the encoding character set.
597+
* @return The builder itself.
598+
*/
599+
public T recordDelimiter(String delimiter, String charsetName) {
600+
this.config.setString(RECORD_DELIMITER, delimiter);
601+
this.config.setString(RECORD_DELIMITER_ENCODING, charsetName);
602+
@SuppressWarnings("unchecked")
603+
T ret = (T) this;
604+
return ret;
605+
}
606+
607+
/**
608+
* Sets the number of line samples to take in order to estimate the base statistics for the
609+
* input format.
610+
*
611+
* @param numSamples The number of line samples to take.
612+
* @return The builder itself.
613+
*/
614+
public T numSamplesForStatistics(int numSamples) {
615+
this.config.setInteger(NUM_STATISTICS_SAMPLES, numSamples);
616+
@SuppressWarnings("unchecked")
617+
T ret = (T) this;
618+
return ret;
619+
}
620+
}
621+
622+
/**
623+
* A builder used to set parameters to the input format's configuration in a fluent way.
624+
*/
625+
public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder>
626+
{
627+
/**
628+
* Creates a new builder for the given configuration.
629+
*
630+
* @param targetConfig The configuration into which the parameters will be written.
631+
*/
632+
protected ConfigBuilder(Configuration targetConfig) {
633+
super(targetConfig);
634+
}
635+
636+
}
504637
}

0 commit comments

Comments
 (0)