Skip to content

Commit afd49e2

Browse files
author
Aljoscha Krettek
committed
Forward fit new config API to all examples
1 parent 8806498 commit afd49e2

File tree

5 files changed

+62
-93
lines changed

5 files changed

+62
-93
lines changed

pact/pact-examples/src/main/java/eu/stratosphere/pact/example/graph/EnumTriangles.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,12 @@ public Plan getPlan(String... args) {
272272
closeTriads.setParameter("LOCAL_STRATEGY", "LOCAL_STRATEGY_HASH_BUILD_SECOND");
273273

274274
FileDataSink triangles = new FileDataSink(RecordOutputFormat.class, output, "Output");
275-
triangles.getParameters().setString(RecordOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
276-
triangles.getParameters().setString(RecordOutputFormat.FIELD_DELIMITER_PARAMETER, " ");
277-
triangles.getParameters().setInteger(RecordOutputFormat.NUM_FIELDS_PARAMETER, 3);
278-
triangles.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactString.class);
279-
triangles.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, PactString.class);
280-
triangles.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, PactString.class);
275+
RecordOutputFormat.configureRecordFormat(triangles)
276+
.recordDelimiter('\n')
277+
.fieldDelimiter(' ')
278+
.field(PactString.class, 0)
279+
.field(PactString.class, 1)
280+
.field(PactString.class, 2);
281281

282282
triangles.setInput(closeTriads);
283283
closeTriads.setSecondInput(edges);

pact/pact-examples/src/main/java/eu/stratosphere/pact/example/relational/TPCHQueryAsterix.java

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,10 @@ public Plan getPlan(final String... args) {
148148
// create DataSourceContract for Orders input
149149
FileDataSource orders = new FileDataSource(RecordInputFormat.class, ordersPath, "Orders");
150150
orders.setDegreeOfParallelism(noSubtasks);
151-
orders.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n");
152-
orders.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, "|");
153-
orders.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 1);
154-
// cust id
155-
orders.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, DecimalTextIntParser.class);
156-
orders.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 1);
151+
RecordInputFormat.configureRecordFormat(orders)
152+
.recordDelimiter('\n')
153+
.fieldDelimiter('|')
154+
.field(DecimalTextIntParser.class, 1);
157155
// compiler hints
158156
orders.getCompilerHints().setAvgBytesPerRecord(5);
159157
orders.getCompilerHints().setAvgNumRecordsPerDistinctFields(new FieldSet(new int[]{0}), 10);
@@ -166,15 +164,11 @@ public Plan getPlan(final String... args) {
166164
// create DataSourceContract for Customer input
167165
FileDataSource customers = new FileDataSource(RecordInputFormat.class, customerPath, "Customers");
168166
customers.setDegreeOfParallelism(noSubtasks);
169-
customers.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n");
170-
customers.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, "|");
171-
customers.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 2);
172-
// cust id
173-
customers.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, DecimalTextIntParser.class);
174-
customers.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 0);
175-
// market segment
176-
customers.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+1, VarLengthStringParser.class);
177-
customers.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+1, 6);
167+
RecordInputFormat.configureRecordFormat(customers)
168+
.recordDelimiter('\n')
169+
.fieldDelimiter('|')
170+
.field(DecimalTextIntParser.class, 0)
171+
.field(VarLengthStringParser.class, 6);
178172
// compiler hints
179173
customers.getCompilerHints().setAvgNumRecordsPerDistinctFields(new FieldSet(new int[]{0}), 1);
180174
customers.getCompilerHints().setAvgBytesPerRecord(20);
@@ -199,11 +193,14 @@ public Plan getPlan(final String... args) {
199193
// create DataSinkContract for writing the result
200194
FileDataSink result = new FileDataSink(RecordOutputFormat.class, output, "Output");
201195
result.setDegreeOfParallelism(noSubtasks);
202-
result.getParameters().setString(RecordOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
203-
result.getParameters().setString(RecordOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
204-
result.getParameters().setInteger(RecordOutputFormat.NUM_FIELDS_PARAMETER, 2);
205-
result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactInteger.class);
206-
result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, PactString.class);
196+
RecordOutputFormat.configureRecordFormat(result)
197+
.recordDelimiter('\n')
198+
.fieldDelimiter('|')
199+
.field(PactInteger.class, 0)
200+
.field(PactString.class, 1);
201+
// TODO positions are missing here from original code, assume 0 and 1
202+
//result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactInteger.class);
203+
//result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, PactString.class);
207204

208205
// assemble the PACT plan
209206
result.addInput(aggCO);

pact/pact-examples/src/main/java/eu/stratosphere/pact/example/relational/WebLogAnalysis.java

Lines changed: 23 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -270,16 +270,11 @@ public Plan getPlan(String... args) {
270270
FileDataSource docs = new FileDataSource(RecordInputFormat.class, docsInput, "Docs Input");
271271
docs.setDegreeOfParallelism(noSubTasks);
272272
docs.getCompilerHints().setUniqueField(new FieldSet(0));
273-
274-
docs.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n");
275-
docs.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, "|");
276-
docs.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 2);
277-
// url
278-
docs.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, VarLengthStringParser.class);
279-
docs.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 0);
280-
// doctext
281-
docs.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+1, VarLengthStringParser.class);
282-
docs.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+1, 1);
273+
RecordInputFormat.configureRecordFormat(docs)
274+
.recordDelimiter('\n')
275+
.fieldDelimiter('|')
276+
.field(VarLengthStringParser.class, 0)
277+
.field(VarLengthStringParser.class, 1);
283278

284279
/*
285280
* Output Format:
@@ -290,19 +285,12 @@ public Plan getPlan(String... args) {
290285
// Create DataSourceContract for ranks relation
291286
FileDataSource ranks = new FileDataSource(RecordInputFormat.class, ranksInput, "Ranks input");
292287
ranks.setDegreeOfParallelism(noSubTasks);
293-
294-
ranks.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n");
295-
ranks.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, "|");
296-
ranks.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 3);
297-
// url
298-
ranks.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, VarLengthStringParser.class);
299-
ranks.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 1);
300-
// rank
301-
ranks.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+1, DecimalTextIntParser.class);
302-
ranks.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+1, 0);
303-
// avgDuration
304-
ranks.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+2, DecimalTextIntParser.class);
305-
ranks.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+2, 2);
288+
RecordInputFormat.configureRecordFormat(ranks)
289+
.recordDelimiter('\n')
290+
.fieldDelimiter('|')
291+
.field(VarLengthStringParser.class, 1)
292+
.field(DecimalTextIntParser.class, 0)
293+
.field(DecimalTextIntParser.class, 2);
306294

307295
/*
308296
* Output Format:
@@ -312,17 +300,11 @@ public Plan getPlan(String... args) {
312300
// Create DataSourceContract for visits relation
313301
FileDataSource visits = new FileDataSource(RecordInputFormat.class, visitsInput, "Visits input:q");
314302
visits.setDegreeOfParallelism(noSubTasks);
315-
316-
visits.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n");
317-
visits.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, "|");
318-
visits.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 2);
319-
// url
320-
visits.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, VarLengthStringParser.class);
321-
visits.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 1);
322-
// date
323-
visits.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+1, VarLengthStringParser.class);
324-
visits.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+1, 2);
325-
303+
RecordInputFormat.configureRecordFormat(visits)
304+
.recordDelimiter('\n')
305+
.fieldDelimiter('|')
306+
.field(VarLengthStringParser.class, 1)
307+
.field(VarLengthStringParser.class, 2);
326308

327309
// Create MapContract for filtering the entries from the documents
328310
// relation
@@ -375,16 +357,13 @@ public Plan getPlan(String... args) {
375357
// Create DataSinkContract for writing the result of the OLAP query
376358
FileDataSink result = new FileDataSink(RecordOutputFormat.class, output, antiJoinVisits, "Result");
377359
result.setDegreeOfParallelism(noSubTasks);
378-
result.getParameters().setString(RecordOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
379-
result.getParameters().setString(RecordOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
380-
result.getParameters().setBoolean(RecordOutputFormat.LENIENT_PARSING, true);
381-
result.getParameters().setInteger(RecordOutputFormat.NUM_FIELDS_PARAMETER, 3);
382-
result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactInteger.class);
383-
result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 1);
384-
result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, PactString.class);
385-
result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
386-
result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, PactInteger.class);
387-
result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 2, 2);
360+
RecordOutputFormat.configureRecordFormat(result)
361+
.recordDelimiter('\n')
362+
.fieldDelimiter('|')
363+
.lenient(true)
364+
.field(PactInteger.class, 1)
365+
.field(PactString.class, 0)
366+
.field(PactInteger.class, 2);
388367

389368
// Return the PACT plan
390369
return new Plan(result, "Weblog Analysis");

pact/pact-examples/src/main/java/eu/stratosphere/pact/example/sort/ReduceGroupSort.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,11 @@ public Plan getPlan(String... args) {
8686
String output = (args.length > 2 ? args[2] : "");
8787

8888
FileDataSource input = new FileDataSource(RecordInputFormat.class, dataInput, "Input");
89-
input.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n");
90-
input.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, " ");
91-
input.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 2);
92-
input.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, DecimalTextIntParser.class);
93-
input.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 0);
94-
input.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+1, DecimalTextIntParser.class);
95-
input.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+1, 1);
89+
RecordInputFormat.configureRecordFormat(input)
90+
.recordDelimiter('\n')
91+
.fieldDelimiter(' ')
92+
.field(DecimalTextIntParser.class, 0)
93+
.field(DecimalTextIntParser.class, 1);
9694

9795
// create the reduce contract and sets the key to the first field
9896
ReduceContract sorter = new ReduceContract.Builder(IdentityReducer.class, PactInteger.class, 0)
@@ -104,14 +102,11 @@ public Plan getPlan(String... args) {
104102

105103
// create and configure the output format
106104
FileDataSink out = new FileDataSink(RecordOutputFormat.class, output, sorter, "Sorted Output");
107-
out.getParameters().setString(RecordOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
108-
out.getParameters().setString(RecordOutputFormat.FIELD_DELIMITER_PARAMETER, " ");
109-
out.getParameters().setBoolean(RecordOutputFormat.LENIENT_PARSING, true);
110-
out.getParameters().setInteger(RecordOutputFormat.NUM_FIELDS_PARAMETER, 2);
111-
out.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactInteger.class);
112-
out.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 0);
113-
out.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, PactInteger.class);
114-
out.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 1);
105+
RecordOutputFormat.configureRecordFormat(out)
106+
.recordDelimiter('\n')
107+
.fieldDelimiter(' ')
108+
.field(PactInteger.class, 0)
109+
.field(PactInteger.class, 1);
115110

116111
Plan plan = new Plan(out, "SecondarySort Example");
117112
plan.setDefaultParallelism(noSubTasks);

pact/pact-examples/src/main/java/eu/stratosphere/pact/example/wordcount/WordCount.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,12 @@ public Plan getPlan(String... args)
145145
.name("Count Words")
146146
.build();
147147
FileDataSink out = new FileDataSink(RecordOutputFormat.class, output, reducer, "Word Counts");
148-
out.getParameters().setString(RecordOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
149-
out.getParameters().setString(RecordOutputFormat.FIELD_DELIMITER_PARAMETER, " ");
150-
out.getParameters().setBoolean(RecordOutputFormat.LENIENT_PARSING, true);
151-
out.getParameters().setInteger(RecordOutputFormat.NUM_FIELDS_PARAMETER, 2);
152-
out.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactString.class);
153-
out.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 0);
154-
out.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, PactInteger.class);
155-
out.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 1);
148+
RecordOutputFormat.configureRecordFormat(out)
149+
.recordDelimiter('\n')
150+
.fieldDelimiter(' ')
151+
.lenient(true)
152+
.field(PactString.class, 0)
153+
.field(PactInteger.class, 1);
156154

157155
Plan plan = new Plan(out, "WordCount Example");
158156
plan.setDefaultParallelism(noSubTasks);

0 commit comments

Comments
 (0)