Skip to content

Commit 8806498

Browse files
author
sewen
committed
Forward fitted changes to Sopremo Packages.
1 parent 02cf55d commit 8806498

File tree

7 files changed

+44
-22
lines changed

7 files changed

+44
-22
lines changed

sopremo/sopremo-base/src/main/java/eu/stratosphere/sopremo/base/Projection.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import eu.stratosphere.sopremo.InputCardinality;
77
import eu.stratosphere.sopremo.Name;
88
import eu.stratosphere.sopremo.OutputCardinality;
9-
import eu.stratosphere.sopremo.expressions.CachingExpression;
109
import eu.stratosphere.sopremo.expressions.EvaluationExpression;
1110
import eu.stratosphere.sopremo.pact.JsonCollector;
1211
import eu.stratosphere.sopremo.pact.SopremoMap;

sopremo/sopremo-base/src/main/java/eu/stratosphere/sopremo/base/UnionAll.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public PactModule asPactModule(EvaluationContext context) {
3434
final List<JsonStream> inputs = this.getInputs();
3535
final PactModule module = new PactModule(this.getName(), inputs.size(), 1);
3636
// TODO: remove identity map, when Nephele can deal with direct source->sink connections
37-
MapContract identityContract = new MapContract.Builder(IdentityMap.class).build();
37+
MapContract identityContract = MapContract.builder(IdentityMap.class).build();
3838
for (Contract input : module.getInputs())
3939
identityContract.addInput(input);
4040
module.getOutput(0).setInput(identityContract);

sopremo/sopremo-common/src/main/java/eu/stratosphere/sopremo/ElementaryOperator.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ public PactModule asPactModule(final EvaluationContext context) {
224224
for (final List<Contract> inputs : inputLists) {
225225
// assume at least one input for each contract input slot
226226
if (inputs.isEmpty())
227-
inputs.add(new MapContract.Builder(IdentityMap.class).build());
227+
inputs.add(MapContract.builder(IdentityMap.class).build());
228228
for (final Contract input : inputs)
229229
if (!distinctInputs.contains(input))
230230
distinctInputs.add(input);
@@ -318,27 +318,31 @@ protected Contract getContract(final Schema globalSchema) {
318318
try {
319319
if (contractClass == ReduceContract.class) {
320320
int[] keyIndices = this.getKeyIndices(globalSchema, this.getKeyExpressions(0));
321-
ReduceContract.Builder builder = new ReduceContract.Builder((Class<? extends ReduceStub>) stubClass);
321+
ReduceContract.Builder builder = ReduceContract.builder((Class<? extends ReduceStub>) stubClass);
322322
builder.name(this.toString());
323323
PactBuilderUtil.addKeys(builder, this.getKeyClasses(globalSchema, keyIndices), keyIndices);
324324
return builder.build();
325325
}
326326
else if (contractClass == CoGroupContract.class) {
327327
int[] keyIndices1 = this.getKeyIndices(globalSchema, this.getKeyExpressions(0));
328328
int[] keyIndices2 = this.getKeyIndices(globalSchema, this.getKeyExpressions(1));
329-
CoGroupContract.Builder builder = new CoGroupContract.Builder((Class<? extends CoGroupStub>) stubClass);
329+
Class<? extends Key>[] keyTypes = this.getCommonKeyClasses(globalSchema, keyIndices1, keyIndices2);
330+
331+
CoGroupContract.Builder builder = CoGroupContract.builder((Class<? extends CoGroupStub>) stubClass,
332+
keyTypes[0], keyIndices1[0], keyIndices2[0]);
330333
builder.name(this.toString());
331-
PactBuilderUtil.addKeys(builder, this.getCommonKeyClasses(globalSchema, keyIndices1, keyIndices2),
332-
keyIndices1, keyIndices2);
334+
PactBuilderUtil.addKeysExceptFirst(builder, keyTypes, keyIndices1, keyIndices2);
333335
return builder.build();
334336
}
335337
else if (contractClass == MatchContract.class) {
336338
int[] keyIndices1 = this.getKeyIndices(globalSchema, this.getKeyExpressions(0));
337339
int[] keyIndices2 = this.getKeyIndices(globalSchema, this.getKeyExpressions(1));
338-
MatchContract.Builder builder = new MatchContract.Builder((Class<? extends MatchStub>) stubClass);
340+
Class<? extends Key>[] keyTypes = this.getCommonKeyClasses(globalSchema, keyIndices1, keyIndices2);
341+
342+
MatchContract.Builder builder = MatchContract.builder((Class<? extends MatchStub>) stubClass,
343+
keyTypes[0], keyIndices1[0], keyIndices2[0]);
339344
builder.name(this.toString());
340-
PactBuilderUtil.addKeys(builder, this.getCommonKeyClasses(globalSchema, keyIndices1, keyIndices2),
341-
keyIndices1, keyIndices2);
345+
PactBuilderUtil.addKeysExceptFirst(builder, keyTypes, keyIndices1, keyIndices2);
342346
return builder.build();
343347
}
344348
return ReflectUtil.newInstance(contractClass, stubClass, this.toString());

sopremo/sopremo-common/src/main/java/eu/stratosphere/sopremo/PactBuilderUtil.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
*
2727
* @author Aljoscha Krettek
2828
*/
29-
public class PactBuilderUtil {
29+
public class PactBuilderUtil
30+
{
3031
public static void addKeys(ReduceContract.Builder builder, Class<? extends Key>[] keyClasses, int[] keyIndices) {
3132
Preconditions.checkArgument(keyClasses.length == keyIndices.length,
3233
"Lenght of keyClasses and keyIndices must match.");
@@ -35,21 +36,39 @@ public static void addKeys(ReduceContract.Builder builder, Class<? extends Key>[
3536
}
3637
}
3738

38-
public static void addKeys(CoGroupContract.Builder builder, Class<? extends Key>[] keyClasses, int[] keyIndices,
39+
public static void addKeys(CoGroupContract.Builder builder, Class<? extends Key>[] keyClasses, int[] keyIndices1,
3940
int[] keyIndices2) {
40-
Preconditions.checkArgument(keyClasses.length == keyIndices.length,
41+
Preconditions.checkArgument(keyClasses.length == keyIndices1.length && keyClasses.length == keyIndices2.length,
4142
"Lenght of keyClasses and keyIndices must match.");
4243
for (int i = 0; i < keyClasses.length; ++i) {
43-
builder.keyField(keyClasses[i], keyIndices[i], keyIndices2[i]);
44+
builder.keyField(keyClasses[i], keyIndices1[i], keyIndices2[i]);
45+
}
46+
}
47+
48+
public static void addKeysExceptFirst(CoGroupContract.Builder builder, Class<? extends Key>[] keyClasses, int[] keyIndices1,
49+
int[] keyIndices2) {
50+
Preconditions.checkArgument(keyClasses.length == keyIndices1.length && keyClasses.length == keyIndices2.length,
51+
"Lenght of keyClasses and keyIndices must match.");
52+
for (int i = 1; i < keyClasses.length; ++i) {
53+
builder.keyField(keyClasses[i], keyIndices1[i], keyIndices2[i]);
4454
}
4555
}
4656

47-
public static void addKeys(MatchContract.Builder builder, Class<? extends Key>[] keyClasses, int[] keyIndices,
57+
public static void addKeys(MatchContract.Builder builder, Class<? extends Key>[] keyClasses, int[] keyIndices1,
4858
int[] keyIndices2) {
49-
Preconditions.checkArgument(keyClasses.length == keyIndices.length,
59+
Preconditions.checkArgument(keyClasses.length == keyIndices1.length && keyClasses.length == keyIndices2.length,
5060
"Lenght of keyClasses and keyIndices must match.");
5161
for (int i = 0; i < keyClasses.length; ++i) {
52-
builder.keyField(keyClasses[i], keyIndices[i], keyIndices2[i]);
62+
builder.keyField(keyClasses[i], keyIndices1[i], keyIndices2[i]);
63+
}
64+
}
65+
66+
public static void addKeysExceptFirst(MatchContract.Builder builder, Class<? extends Key>[] keyClasses, int[] keyIndices1,
67+
int[] keyIndices2) {
68+
Preconditions.checkArgument(keyClasses.length == keyIndices1.length && keyClasses.length == keyIndices2.length,
69+
"Lenght of keyClasses and keyIndices must match.");
70+
for (int i = 1; i < keyClasses.length; ++i) {
71+
builder.keyField(keyClasses[i], keyIndices1[i], keyIndices2[i]);
5372
}
5473
}
5574
}

sopremo/sopremo-common/src/main/java/eu/stratosphere/sopremo/io/JsonToCsv.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public static FileDataSink convert(final String sourceFile, final JsonToCsv json
9999
final PactModule sourceModule = new Source(sourceFile).asPactModule(jsonToCsv.getContext());
100100
final Contract source = sourceModule.getOutput(0).getInputs().get(0);
101101

102-
final MapContract jsonToString = new MapContract.Builder(JsonToString.class).build();
102+
final MapContract jsonToString = MapContract.builder(JsonToString.class).build();
103103
jsonToString.getParameters().setString("separator", jsonToCsv.getSeparator());
104104
SopremoUtil.serialize(jsonToString.getParameters(), SopremoUtil.CONTEXT, jsonToCsv.getContext());
105105
SopremoUtil.serialize(jsonToString.getParameters(), "extractionExpressions",
@@ -176,7 +176,7 @@ public void open(final Configuration parameters) throws Exception {
176176
* eu.stratosphere.pact.common.stubs.Collector)
177177
*/
178178
@Override
179-
public void map(final PactRecord record, final Collector out) throws Exception {
179+
public void map(final PactRecord record, final Collector<PactRecord> out) throws Exception {
180180
final StringBuilder string = new StringBuilder();
181181

182182
this.node = this.schema.recordToJson(record, this.node);

sopremo/sopremo-common/src/test/java/eu/stratosphere/sopremo/pact/CsvInputFormatTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void completeTestPassesWithExpectedValues() throws IOException {
3535
final FileDataSource read = new FileDataSource(
3636
CsvInputFormat.class, this.getResource("SopremoTestPlan/restaurant_short.csv"), "Input");
3737

38-
final MapContract map = new MapContract.Builder(IdentityMap.class).name("Map").build();
38+
final MapContract map = MapContract.builder(IdentityMap.class).name("Map").build();
3939
map.setInput(read);
4040

4141
final FileDataSink output = this.createOutput(map, SequentialOutputFormat.class);

sopremo/sopremo-common/src/test/java/eu/stratosphere/sopremo/pact/JsonInputFormatTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void completeTestPasses() throws IOException {
5454
SopremoUtil.serialize(read.getParameters(), IOConstants.SCHEMA, SCHEMA);
5555

5656
final MapContract map =
57-
new MapContract.Builder(IdentityMap.class).name("Map").build();
57+
MapContract.builder(IdentityMap.class).name("Map").build();
5858
map.setInput(read);
5959

6060
final FileDataSink output = this.createOutput(map, SequentialOutputFormat.class);
@@ -80,7 +80,7 @@ public void completeTestPassesWithExpectedValues() throws IOException {
8080
JsonInputFormat.class, this.getResource("SopremoTestPlan/test.json"), "Input");
8181
SopremoUtil.serialize(read.getParameters(), IOConstants.SCHEMA, SCHEMA);
8282

83-
final MapContract map = new MapContract.Builder(IdentityMap.class).name("Map").build();
83+
final MapContract map = MapContract.builder(IdentityMap.class).name("Map").build();
8484
map.setInput(read);
8585

8686
final FileDataSink output = this.createOutput(map,

0 commit comments

Comments
 (0)