Skip to content

Commit 635f6ba

Browse files
author
Aljoscha Krettek
committed
Forward fit new config API to plans in pact-tests
1 parent afd49e2 commit 635f6ba

File tree

11 files changed

+95
-143
lines changed

11 files changed

+95
-143
lines changed

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/CoGroupITCase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,13 @@ protected JobGraph getJobGraph() throws Exception {
181181
String pathPrefix = getFilesystemProvider().getURIPrefix() + getFilesystemProvider().getTempDirPath();
182182

183183
FileDataSource input_left = new FileDataSource(CoGroupTestInFormat.class, pathPrefix + "/cogroup_left");
184-
input_left.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
184+
DelimitedInputFormat.configureDelimitedFormat(input_left)
185+
.recordDelimiter('\n');
185186
input_left.setDegreeOfParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
186187

187188
FileDataSource input_right = new FileDataSource(CoGroupTestInFormat.class, pathPrefix + "/cogroup_right");
188-
input_right.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
189+
DelimitedInputFormat.configureDelimitedFormat(input_right)
190+
.recordDelimiter('\n');
189191
input_right.setDegreeOfParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
190192

191193
CoGroupContract testCoGrouper = CoGroupContract.builder(TestCoGrouper.class, PactString.class, 0, 0)

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/CrossITCase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,14 @@ protected JobGraph getJobGraph() throws Exception {
140140

141141
FileDataSource input_left = new FileDataSource(
142142
ContractITCaseInputFormat.class, pathPrefix + "/cross_left");
143-
input_left.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
143+
DelimitedInputFormat.configureDelimitedFormat(input_left)
144+
.recordDelimiter('\n');
144145
input_left.setDegreeOfParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
145146

146147
FileDataSource input_right = new FileDataSource(
147148
ContractITCaseInputFormat.class, pathPrefix + "/cross_right");
148-
input_right.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
149+
DelimitedInputFormat.configureDelimitedFormat(input_right)
150+
.recordDelimiter('\n');
149151
input_right.setDegreeOfParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
150152

151153
CrossContract testCross = CrossContract.builder(TestCross.class).build();

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/MapITCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ protected JobGraph getJobGraph() throws Exception {
109109

110110
FileDataSource input = new FileDataSource(
111111
ContractITCaseInputFormat.class, pathPrefix+"/mapInput");
112-
input.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
112+
DelimitedInputFormat.configureDelimitedFormat(input)
113+
.recordDelimiter('\n');
113114
input.setDegreeOfParallelism(config.getInteger("MapTest#NoSubtasks", 1));
114115

115116
MapContract testMapper = MapContract.builder(TestMapper.class).build();

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/MatchITCase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,14 @@ protected JobGraph getJobGraph() throws Exception {
135135

136136
FileDataSource input_left = new FileDataSource(
137137
ContractITCaseInputFormat.class, pathPrefix + "/match_left");
138-
input_left.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
138+
DelimitedInputFormat.configureDelimitedFormat(input_left)
139+
.recordDelimiter('\n');
139140
input_left.setDegreeOfParallelism(config.getInteger("MatchTest#NoSubtasks", 1));
140141

141142
FileDataSource input_right = new FileDataSource(
142143
ContractITCaseInputFormat.class, pathPrefix + "/match_right");
143-
input_right.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
144+
DelimitedInputFormat.configureDelimitedFormat(input_right)
145+
.recordDelimiter('\n');
144146
input_right.setDegreeOfParallelism(config.getInteger("MatchTest#NoSubtasks", 1));
145147

146148
MatchContract testMatcher = MatchContract.builder(TestMatcher.class, PactString.class, 0, 0)

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/ReduceITCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ protected JobGraph getJobGraph() throws Exception {
131131

132132
FileDataSource input = new FileDataSource(
133133
ContractITCaseInputFormat.class, pathPrefix + "/reduceInput");
134-
input.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
134+
DelimitedInputFormat.configureDelimitedFormat(input)
135+
.recordDelimiter('\n');
135136
input.setDegreeOfParallelism(config.getInteger("ReduceTest#NoSubtasks", 1));
136137

137138
ReduceContract testReducer = new ReduceContract.Builder(TestReducer.class, PactString.class, 0)

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/contracts/UnionITCase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,14 @@ protected JobGraph getJobGraph() throws Exception {
126126

127127
FileDataSource input1 = new FileDataSource(
128128
ContractITCaseInputFormat.class, pathPrefix + config.getString("UnionTest#Input1Path", ""));
129-
input1.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
129+
DelimitedInputFormat.configureDelimitedFormat(input1)
130+
.recordDelimiter('\n');
130131
input1.setDegreeOfParallelism(config.getInteger("UnionTest#NoSubtasks", 1));
131132

132133
FileDataSource input2 = new FileDataSource(
133134
ContractITCaseInputFormat.class, pathPrefix + config.getString("UnionTest#Input2Path", ""));
134-
input2.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
135+
DelimitedInputFormat.configureDelimitedFormat(input2)
136+
.recordDelimiter('\n');
135137
input2.setDegreeOfParallelism(config.getInteger("UnionTest#NoSubtasks", 1));
136138

137139
MapContract testMapper = MapContract.builder(TestMapper.class).build();

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/failingPrograms/TaskFailureITCase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ protected JobGraph getFailingJobGraph() throws Exception {
103103
// init data source
104104
FileDataSource input = new FileDataSource(
105105
ContractITCaseInputFormat.class, pathPrefix+"/mapInput");
106-
input.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
106+
DelimitedInputFormat.configureDelimitedFormat(input)
107+
.recordDelimiter('\n');
107108
input.setDegreeOfParallelism(config.getInteger("MapTest#NoSubtasks", 1));
108109

109110
// init failing map task
@@ -143,7 +144,8 @@ protected JobGraph getJobGraph() throws Exception {
143144
// init data source
144145
FileDataSource input = new FileDataSource(
145146
ContractITCaseInputFormat.class, pathPrefix+"/mapInput");
146-
input.setParameter(DelimitedInputFormat.RECORD_DELIMITER, "\n");
147+
DelimitedInputFormat.configureDelimitedFormat(input)
148+
.recordDelimiter('\n');
147149
input.setDegreeOfParallelism(config.getInteger("MapTest#NoSubtasks", 1));
148150

149151
// init (working) map task

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/pactPrograms/GlobalSortingITCase.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,11 @@ public Plan getPlan(String... args) throws IllegalArgumentException {
180180
FileDataSink sink =
181181
new FileDataSink(RecordOutputFormat.class, output);
182182
sink.setDegreeOfParallelism(noSubtasks);
183-
sink.getParameters().setString(RecordOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
184-
sink.getParameters().setString(RecordOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
185-
sink.getParameters().setBoolean(RecordOutputFormat.LENIENT_PARSING, true);
186-
sink.getParameters().setInteger(RecordOutputFormat.NUM_FIELDS_PARAMETER, 1);
187-
sink.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactInteger.class);
188-
sink.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 0);
183+
RecordOutputFormat.configureRecordFormat(sink)
184+
.recordDelimiter('\n')
185+
.fieldDelimiter('|')
186+
.lenient(true)
187+
.field(PactInteger.class, 0);
189188

190189
sink.setGlobalOrder(new Ordering(0, PactInteger.class, Order.ASCENDING), new UniformDistribution());
191190
sink.setInput(source);

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/pactPrograms/GlobalSortingMixedOrderITCase.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -205,16 +205,13 @@ public Plan getPlan(String... args) throws IllegalArgumentException {
205205
.field(DecimalTextIntParser.class, 2);
206206

207207
FileDataSink sink = new FileDataSink(RecordOutputFormat.class, output);
208-
sink.getParameters().setString(RecordOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
209-
sink.getParameters().setString(RecordOutputFormat.FIELD_DELIMITER_PARAMETER, ",");
210-
sink.getParameters().setBoolean(RecordOutputFormat.LENIENT_PARSING, true);
211-
sink.getParameters().setInteger(RecordOutputFormat.NUM_FIELDS_PARAMETER, 3);
212-
sink.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactInteger.class);
213-
sink.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, PactInteger.class);
214-
sink.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, PactInteger.class);
215-
sink.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 0);
216-
sink.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 1);
217-
sink.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 2, 2);
208+
RecordOutputFormat.configureRecordFormat(sink)
209+
.recordDelimiter('\n')
210+
.fieldDelimiter(',')
211+
.lenient(true)
212+
.field(PactInteger.class, 0)
213+
.field(PactInteger.class, 1)
214+
.field(PactInteger.class, 2);
218215

219216
sink.setGlobalOrder(
220217
new Ordering(0, PactInteger.class, Order.DESCENDING)

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/testPrograms/mergeOnlyJoin/MergeOnlyJoin.java

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,11 @@ public Plan getPlan(final String... args)
8080
// create DataSourceContract for Orders input
8181
FileDataSource input1 = new FileDataSource(RecordInputFormat.class, input1Path, "Input 1");
8282
input1.setDegreeOfParallelism(noSubtasks);
83-
84-
input1.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n");
85-
input1.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, "|");
86-
input1.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 2);
87-
88-
input1.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, DecimalTextIntParser.class);
89-
input1.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 0);
90-
91-
input1.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+1, DecimalTextIntParser.class);
92-
input1.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+1, 1);
93-
83+
RecordInputFormat.configureRecordFormat(input1)
84+
.recordDelimiter('\n')
85+
.fieldDelimiter('|')
86+
.field(DecimalTextIntParser.class, 0)
87+
.field(DecimalTextIntParser.class, 1);
9488

9589
ReduceContract aggInput1 = new ReduceContract.Builder(DummyReduce.class, PactInteger.class, 0)
9690
.input(input1)
@@ -102,16 +96,11 @@ public Plan getPlan(final String... args)
10296
// create DataSourceContract for Orders input
10397
FileDataSource input2 = new FileDataSource(RecordInputFormat.class, input2Path, "Input 2");
10498
input2.setDegreeOfParallelism(noSubtasksInput2);
105-
106-
input2.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n");
107-
input2.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, "|");
108-
input2.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 2);
109-
// order id
110-
input2.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, DecimalTextIntParser.class);
111-
input2.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 0);
112-
// ship prio
113-
input2.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+1, DecimalTextIntParser.class);
114-
input2.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+1, 1);
99+
RecordInputFormat.configureRecordFormat(input2)
100+
.recordDelimiter('\n')
101+
.fieldDelimiter('|')
102+
.field(DecimalTextIntParser.class, 0)
103+
.field(DecimalTextIntParser.class, 1);
115104

116105
ReduceContract aggInput2 = new ReduceContract.Builder(DummyReduce.class, PactInteger.class, 0)
117106
.input(input2)
@@ -132,16 +121,13 @@ public Plan getPlan(final String... args)
132121
// create DataSinkContract for writing the result
133122
FileDataSink result = new FileDataSink(RecordOutputFormat.class, output, joinLiO, "Output");
134123
result.setDegreeOfParallelism(noSubtasks);
135-
result.getParameters().setString(RecordOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
136-
result.getParameters().setString(RecordOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
137-
result.getParameters().setBoolean(RecordOutputFormat.LENIENT_PARSING, true);
138-
result.getParameters().setInteger(RecordOutputFormat.NUM_FIELDS_PARAMETER, 3);
139-
result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactInteger.class);
140-
result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 0);
141-
result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, PactInteger.class);
142-
result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 1);
143-
result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, PactInteger.class);
144-
result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 2, 2);
124+
RecordOutputFormat.configureRecordFormat(result)
125+
.recordDelimiter('\n')
126+
.fieldDelimiter('|')
127+
.lenient(true)
128+
.field(PactInteger.class, 0)
129+
.field(PactInteger.class, 1)
130+
.field(PactInteger.class, 2);
145131

146132
// assemble the PACT plan
147133
Plan plan = new Plan(result, "Merge Only Join");

0 commit comments

Comments
 (0)