Skip to content

Commit 15d4cdc

Browse files
author
sewen
committed
Fixed and extended tests for global ordering.
1 parent 644437a commit 15d4cdc

File tree

3 files changed

+336
-10
lines changed

3 files changed

+336
-10
lines changed

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/pactPrograms/GlobalSortingITCase.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ protected void preSubmit() throws Exception {
8686
sb.append('\n');
8787
}
8888
getFilesystemProvider().createFile(recordsPath + "/part_" + i + ".txt", sb.toString());
89-
LOG.debug("Records Part " + (i + 1) + ":\n>" + sb.toString() + "<");
89+
90+
if (LOG.isDebugEnabled())
91+
LOG.debug("Records Part " + (i + 1) + ":\n>" + sb.toString() + "<");
9092
}
9193

9294
}
@@ -110,21 +112,17 @@ protected JobGraph getJobGraph() throws Exception {
110112
@Override
111113
protected void postSubmit() throws Exception {
112114
//Construct expected result
113-
Collections.sort(records);
114-
StringBuilder expectedResult = new StringBuilder();
115-
for (Integer number: records) {
116-
expectedResult.append(number);
117-
expectedResult.append('\n');
118-
}
115+
Collections.sort(this.records);
119116

120117
// Test results
121-
compareResultsByLinesInMemory(expectedResult.toString(), recordsPath);
118+
compareResultsByLinesInMemoryStrictOrder(this.records, this.resultPath);
122119

123120
}
124121

125122
@Override
126123
public void stopCluster() throws Exception {
127124
getFilesystemProvider().delete(recordsPath, true);
125+
getFilesystemProvider().delete(resultPath, true);
128126
super.stopCluster();
129127
}
130128

@@ -147,12 +145,10 @@ public UniformDistribution() { }
147145

148146
@Override
149147
public void write(DataOutput out) throws IOException {
150-
out.writeInt(0);
151148
}
152149

153150
@Override
154151
public void read(DataInput in) throws IOException {
155-
in.readInt();
156152
}
157153

158154
@Override
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/***********************************************************************************************************************
2+
*
3+
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
* specific language governing permissions and limitations under the License.
13+
*
14+
**********************************************************************************************************************/
15+
16+
package eu.stratosphere.pact.test.pactPrograms;
17+
18+
import java.io.DataInput;
19+
import java.io.DataOutput;
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.Collections;
24+
import java.util.LinkedList;
25+
import java.util.Random;
26+
27+
import org.apache.commons.logging.Log;
28+
import org.apache.commons.logging.LogFactory;
29+
import org.junit.runner.RunWith;
30+
import org.junit.runners.Parameterized;
31+
import org.junit.runners.Parameterized.Parameters;
32+
33+
import eu.stratosphere.nephele.configuration.Configuration;
34+
import eu.stratosphere.nephele.jobgraph.JobGraph;
35+
import eu.stratosphere.pact.common.contract.DataDistribution;
36+
import eu.stratosphere.pact.common.contract.FileDataSink;
37+
import eu.stratosphere.pact.common.contract.FileDataSource;
38+
import eu.stratosphere.pact.common.contract.Order;
39+
import eu.stratosphere.pact.common.contract.Ordering;
40+
import eu.stratosphere.pact.common.io.RecordInputFormat;
41+
import eu.stratosphere.pact.common.io.RecordOutputFormat;
42+
import eu.stratosphere.pact.common.plan.Plan;
43+
import eu.stratosphere.pact.common.plan.PlanAssembler;
44+
import eu.stratosphere.pact.common.type.PactRecord;
45+
import eu.stratosphere.pact.common.type.base.PactInteger;
46+
import eu.stratosphere.pact.common.type.base.parser.DecimalTextIntParser;
47+
import eu.stratosphere.pact.compiler.PactCompiler;
48+
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
49+
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
50+
import eu.stratosphere.pact.test.util.TestBase;
51+
52+
@RunWith(Parameterized.class)
53+
public class GlobalSortingMixedOrderITCase extends TestBase {
54+
55+
private static final Log LOG = LogFactory.getLog(GlobalSortingMixedOrderITCase.class);
56+
57+
private static final int RANGE_I1 = 100;
58+
private static final int RANGE_I2 = 20;
59+
private static final int RANGE_I3 = 20;
60+
61+
private String recordsPath = null;
62+
private String resultPath = null;
63+
64+
private ArrayList<TripleInt> records;
65+
66+
public GlobalSortingMixedOrderITCase(Configuration config) {
67+
super(config);
68+
}
69+
70+
@Override
71+
protected void preSubmit() throws Exception {
72+
73+
this.recordsPath = getFilesystemProvider().getTempDirPath() + "/records";
74+
this.resultPath = getFilesystemProvider().getTempDirPath() + "/result";
75+
76+
this.records = new ArrayList<TripleInt>();
77+
78+
//Generate records
79+
final Random rnd = new Random(1988);
80+
final int numRecordsPerSplit = 1000;
81+
82+
getFilesystemProvider().createDir(this.recordsPath);
83+
84+
final int numSplits = 4;
85+
for (int i = 0; i < numSplits; i++) {
86+
StringBuilder sb = new StringBuilder(numSplits*2);
87+
for (int j = 0; j < numRecordsPerSplit; j++) {
88+
final TripleInt val = new TripleInt(rnd.nextInt(RANGE_I1), rnd.nextInt(RANGE_I2), rnd.nextInt(RANGE_I3));
89+
this.records.add(val);
90+
sb.append(val);
91+
sb.append('\n');
92+
}
93+
getFilesystemProvider().createFile(recordsPath + "/part_" + i + ".txt", sb.toString());
94+
95+
if (LOG.isDebugEnabled())
96+
LOG.debug("Records Part " + (i + 1) + ":\n>" + sb.toString() + "<");
97+
}
98+
99+
}
100+
101+
@Override
102+
protected JobGraph getJobGraph() throws Exception {
103+
104+
GlobalSort globalSort = new GlobalSort();
105+
Plan plan = globalSort.getPlan(
106+
config.getString("GlobalSortingTest#NoSubtasks", "1"),
107+
getFilesystemProvider().getURIPrefix()+recordsPath,
108+
getFilesystemProvider().getURIPrefix()+resultPath);
109+
110+
PactCompiler pc = new PactCompiler();
111+
OptimizedPlan op = pc.compile(plan);
112+
113+
JobGraphGenerator jgg = new JobGraphGenerator();
114+
return jgg.compileJobGraph(op);
115+
}
116+
117+
@Override
118+
protected void postSubmit() throws Exception {
119+
//Construct expected result
120+
Collections.sort(this.records);
121+
122+
// Test results
123+
compareResultsByLinesInMemoryStrictOrder(this.records, this.resultPath);
124+
}
125+
126+
@Override
127+
public void stopCluster() throws Exception {
128+
getFilesystemProvider().delete(recordsPath, true);
129+
getFilesystemProvider().delete(resultPath, true);
130+
super.stopCluster();
131+
}
132+
133+
134+
@Parameters
135+
public static Collection<Object[]> getConfigurations() {
136+
137+
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
138+
139+
Configuration config = new Configuration();
140+
config.setInteger("GlobalSortingTest#NoSubtasks", 4);
141+
tConfigs.add(config);
142+
143+
return toParameterList(tConfigs);
144+
}
145+
146+
public static class TripleIntDistribution implements DataDistribution
147+
{
148+
private boolean ascendingI1, ascendingI2, ascendingI3;
149+
150+
public TripleIntDistribution(Order orderI1, Order orderI2, Order orderI3) {
151+
this.ascendingI1 = orderI1 != Order.DESCENDING;
152+
this.ascendingI2 = orderI2 != Order.DESCENDING;
153+
this.ascendingI3 = orderI3 != Order.DESCENDING;
154+
}
155+
156+
public TripleIntDistribution() {}
157+
158+
@Override
159+
public void write(DataOutput out) throws IOException {
160+
out.writeBoolean(this.ascendingI1);
161+
out.writeBoolean(this.ascendingI2);
162+
out.writeBoolean(this.ascendingI3);
163+
}
164+
165+
@Override
166+
public void read(DataInput in) throws IOException {
167+
this.ascendingI1 = in.readBoolean();
168+
this.ascendingI2 = in.readBoolean();
169+
this.ascendingI3 = in.readBoolean();
170+
}
171+
172+
@Override
173+
public PactRecord getBucketBoundary(int bucketNum, int totalNumBuckets)
174+
{
175+
final float bucketWidth = ((float) RANGE_I1) / totalNumBuckets;
176+
int boundVal = (int) ((bucketNum + 1) * bucketWidth);
177+
if (!this.ascendingI1) {
178+
boundVal = RANGE_I1 - boundVal;
179+
}
180+
181+
final PactRecord bound = new PactRecord(3);
182+
bound.setField(0, new PactInteger(boundVal));
183+
bound.setField(1, new PactInteger(RANGE_I2));
184+
bound.setField(2, new PactInteger(RANGE_I3));
185+
return bound;
186+
}
187+
188+
}
189+
190+
private static class GlobalSort implements PlanAssembler {
191+
192+
@Override
193+
public Plan getPlan(String... args) throws IllegalArgumentException {
194+
// parse program parameters
195+
final int noSubtasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
196+
final String recordsPath = (args.length > 1 ? args[1] : "");
197+
final String output = (args.length > 2 ? args[2] : "");
198+
199+
FileDataSource source = new FileDataSource(RecordInputFormat.class, recordsPath);
200+
source.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n");
201+
source.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, ",");
202+
source.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 3);
203+
source.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX + 0, DecimalTextIntParser.class);
204+
source.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX + 1, DecimalTextIntParser.class);
205+
source.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX + 2, DecimalTextIntParser.class);
206+
source.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX + 0, 0);
207+
source.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX + 1, 1);
208+
source.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX + 2, 2);
209+
210+
FileDataSink sink = new FileDataSink(RecordOutputFormat.class, output);
211+
sink.getParameters().setString(RecordOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
212+
sink.getParameters().setString(RecordOutputFormat.FIELD_DELIMITER_PARAMETER, ",");
213+
sink.getParameters().setBoolean(RecordOutputFormat.LENIENT_PARSING, true);
214+
sink.getParameters().setInteger(RecordOutputFormat.NUM_FIELDS_PARAMETER, 3);
215+
sink.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactInteger.class);
216+
sink.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, PactInteger.class);
217+
sink.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, PactInteger.class);
218+
sink.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 0);
219+
sink.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 1);
220+
sink.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 2, 2);
221+
222+
sink.setGlobalOrder(
223+
new Ordering(0, PactInteger.class, Order.DESCENDING)
224+
.appendOrdering(1, PactInteger.class, Order.ASCENDING)
225+
.appendOrdering(2, PactInteger.class, Order.DESCENDING),
226+
new TripleIntDistribution(Order.DESCENDING, Order.ASCENDING, Order.DESCENDING));
227+
sink.setInput(source);
228+
229+
Plan p = new Plan(sink);
230+
p.setDefaultParallelism(noSubtasks);
231+
return p;
232+
}
233+
}
234+
235+
/**
236+
* Three integers sorting descending, ascending, descending.
237+
*/
238+
static final class TripleInt implements Comparable<TripleInt>
239+
{
240+
private final int i1, i2, i3;
241+
242+
243+
TripleInt(int i1, int i2, int i3) {
244+
this.i1 = i1;
245+
this.i2 = i2;
246+
this.i3 = i3;
247+
}
248+
249+
public int getI1() {
250+
return i1;
251+
}
252+
253+
public int getI2() {
254+
return i2;
255+
}
256+
257+
public int getI3() {
258+
return i3;
259+
}
260+
261+
@Override
262+
public String toString() {
263+
StringBuilder bld = new StringBuilder(32);
264+
bld.append(this.i1);
265+
bld.append(',');
266+
bld.append(this.i2);
267+
bld.append(',');
268+
bld.append(this.i3);
269+
return bld.toString();
270+
}
271+
272+
@Override
273+
public int compareTo(TripleInt o) {
274+
return this.i1 < o.i1 ? 1 : this.i1 > o.i1 ? -1 :
275+
this.i2 < o.i2 ? -1 : this.i2 > o.i2 ? 1 :
276+
this.i3 < o.i3 ? 1 : this.i3 > o.i3 ? -1 : 0;
277+
278+
}
279+
}
280+
}

pact/pact-tests/src/test/java/eu/stratosphere/pact/test/util/TestBase.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.io.InputStream;
2424
import java.io.InputStreamReader;
2525
import java.util.ArrayList;
26+
import java.util.Arrays;
2627
import java.util.Collection;
2728
import java.util.Comparator;
29+
import java.util.Iterator;
2830
import java.util.LinkedList;
2931
import java.util.List;
3032
import java.util.PriorityQueue;
@@ -213,6 +215,54 @@ public int compare(String arg0, String arg1) {
213215
this.compareResultsByLinesInMemory(expectedResultStr, resultPath, defaultStrComp);
214216
}
215217

218+
protected <T> void compareResultsByLinesInMemoryStrictOrder(List<T> result, String resultPath) throws Exception
219+
{
220+
final ArrayList<String> resultFiles = new ArrayList<String>();
221+
222+
// Determine all result files
223+
if (getFilesystemProvider().isDir(resultPath)) {
224+
final String[] files = getFilesystemProvider().listFiles(resultPath);
225+
final Comparator<String> fileNameComp = new Comparator<String>() {
226+
@Override
227+
public int compare(String o1, String o2) {
228+
if (o1.length() < o2.length())
229+
return -1;
230+
else if (o1.length() > o2.length())
231+
return 1;
232+
else return o1.compareTo(o2);
233+
}
234+
};
235+
Arrays.sort(files, fileNameComp);
236+
237+
for (String file : files) {
238+
if (!getFilesystemProvider().isDir(file)) {
239+
resultFiles.add(resultPath+"/"+file);
240+
}
241+
}
242+
} else {
243+
resultFiles.add(resultPath);
244+
}
245+
246+
final Iterator<T> expectedLines = result.iterator();
247+
248+
for (String resultFile : resultFiles) {
249+
// read each result file
250+
final InputStream is = getFilesystemProvider().getInputStream(resultFile);
251+
final BufferedReader reader = new BufferedReader(new InputStreamReader(is));
252+
253+
// collect lines
254+
String line = null;
255+
while ((line = reader.readLine()) != null) {
256+
Assert.assertTrue("More lines in result than expected lines.", expectedLines.hasNext());
257+
String nextExpected = expectedLines.next().toString();
258+
assertEquals("Expected result and obtained result do not match.", nextExpected, line);
259+
}
260+
reader.close();
261+
}
262+
263+
Assert.assertFalse("More expected lines than obtained lines.", expectedLines.hasNext());
264+
}
265+
216266
/**
217267
* Compares the expectedResultString and the file(s) in the HDFS linewise.
218268
* Both results (expected and computed) are held in memory. Hence, this

0 commit comments

Comments
 (0)