|
17 | 17 | import eu.stratosphere.pact.common.IdentityMap;
|
18 | 18 | import eu.stratosphere.pact.common.contract.CoGroupContract;
|
19 | 19 | import eu.stratosphere.pact.common.contract.Contract;
|
| 20 | +import eu.stratosphere.pact.common.contract.CrossContract; |
20 | 21 | import eu.stratosphere.pact.common.contract.MapContract;
|
21 | 22 | import eu.stratosphere.pact.common.contract.MatchContract;
|
22 | 23 | import eu.stratosphere.pact.common.contract.ReduceContract;
|
23 | 24 | import eu.stratosphere.pact.common.plan.ContractUtil;
|
24 | 25 | import eu.stratosphere.pact.common.plan.PactModule;
|
25 | 26 | import eu.stratosphere.pact.common.stubs.CoGroupStub;
|
| 27 | +import eu.stratosphere.pact.common.stubs.CrossStub; |
| 28 | +import eu.stratosphere.pact.common.stubs.MapStub; |
26 | 29 | import eu.stratosphere.pact.common.stubs.MatchStub;
|
27 | 30 | import eu.stratosphere.pact.common.stubs.ReduceStub;
|
28 | 31 | import eu.stratosphere.pact.common.stubs.Stub;
|
@@ -380,9 +383,14 @@ else if (contractClass == MatchContract.class) {
|
380 | 383 | builder.name(this.toString());
|
381 | 384 | PactBuilderUtil.addKeysExceptFirst(builder, keyTypes, keyIndices1, keyIndices2);
|
382 | 385 | return builder.build();
|
383 |
| - } |
384 |
| - return ReflectUtil.newInstance(contractClass, stubClass, |
385 |
| - this.toString()); |
| 386 | + } else if(contractClass == MapContract.class) |
| 387 | + return MapContract.builder((Class<? extends MapStub>) stubClass). |
| 388 | + name(this.toString()).build(); |
| 389 | + else if(contractClass == CrossContract.class) |
| 390 | + return CrossContract.builder((Class<? extends CrossStub>) stubClass). |
| 391 | + name(this.toString()).build(); |
| 392 | + else throw new UnsupportedOperationException("Unknown contract type"); |
| 393 | + |
386 | 394 | } catch (final Exception e) {
|
387 | 395 | throw new IllegalStateException("Cannot create contract from stub "
|
388 | 396 | + stubClass, e);
|
|
0 commit comments