Skip to content

Commit 5cd9bce

Browse files
committed
add extra file-index file check
1 parent 2a70b56 commit 5cd9bce

File tree

5 files changed

+113
-20
lines changed

5 files changed

+113
-20
lines changed

paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ public AppendOnlyFileStoreScan newScan() {
155155
schemaManager,
156156
schema,
157157
manifestFileFactory(),
158+
fileIO,
159+
pathFactory(),
158160
options.scanManifestParallelism(),
159161
options.fileIndexReadEnabled());
160162
}

paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.paimon.fileindex.FileIndexResult;
2323
import org.apache.paimon.fs.FileIO;
2424
import org.apache.paimon.predicate.Predicate;
25-
import org.apache.paimon.predicate.PredicateBuilder;
2625
import org.apache.paimon.schema.TableSchema;
2726

2827
import java.io.IOException;
@@ -35,17 +34,16 @@ public class FileIndexEvaluator {
3534
public static FileIndexResult evaluate(
3635
FileIO fileIO,
3736
TableSchema dataSchema,
38-
List<Predicate> dataFilter,
37+
Predicate dataPredicate,
3938
DataFilePathFactory dataFilePathFactory,
4039
DataFileMeta file)
4140
throws IOException {
42-
if (dataFilter != null && !dataFilter.isEmpty()) {
41+
if (dataPredicate != null) {
4342
byte[] embeddedIndex = file.embeddedIndex();
4443
if (embeddedIndex != null) {
4544
try (FileIndexPredicate predicate =
4645
new FileIndexPredicate(embeddedIndex, dataSchema.logicalRowType())) {
47-
return predicate.evaluate(
48-
PredicateBuilder.and(dataFilter.toArray(new Predicate[0])));
46+
return predicate.evaluate(dataPredicate);
4947
}
5048
}
5149

@@ -65,8 +63,7 @@ public static FileIndexResult evaluate(
6563
dataFilePathFactory.toAlignedPath(indexFiles.get(0), file),
6664
fileIO,
6765
dataSchema.logicalRowType())) {
68-
return predicate.evaluate(
69-
PredicateBuilder.and(dataFilter.toArray(new Predicate[0])));
66+
return predicate.evaluate(dataPredicate);
7067
}
7168
}
7269
}

paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@
1919
package org.apache.paimon.operation;
2020

2121
import org.apache.paimon.AppendOnlyFileStore;
22-
import org.apache.paimon.fileindex.FileIndexPredicate;
22+
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.io.DataFilePathFactory;
24+
import org.apache.paimon.io.FileIndexEvaluator;
2325
import org.apache.paimon.manifest.ManifestEntry;
2426
import org.apache.paimon.manifest.ManifestFile;
2527
import org.apache.paimon.predicate.Predicate;
2628
import org.apache.paimon.schema.SchemaManager;
2729
import org.apache.paimon.schema.TableSchema;
2830
import org.apache.paimon.stats.SimpleStatsEvolution;
2931
import org.apache.paimon.stats.SimpleStatsEvolutions;
30-
import org.apache.paimon.types.RowType;
32+
import org.apache.paimon.utils.FileStorePathFactory;
3133
import org.apache.paimon.utils.SnapshotManager;
3234

3335
import javax.annotation.Nullable;
@@ -46,6 +48,9 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
4648

4749
private Predicate filter;
4850

51+
private FileIO fileIO;
52+
private FileStorePathFactory pathFactory;
53+
4954
// just cache.
5055
private final Map<Long, Predicate> dataFilterMapping = new ConcurrentHashMap<>();
5156

@@ -56,6 +61,8 @@ public AppendOnlyFileStoreScan(
5661
SchemaManager schemaManager,
5762
TableSchema schema,
5863
ManifestFile.Factory manifestFileFactory,
64+
FileIO fileIO,
65+
FileStorePathFactory pathFactory,
5966
Integer scanManifestParallelism,
6067
boolean fileIndexReadEnabled) {
6168
super(
@@ -69,6 +76,8 @@ public AppendOnlyFileStoreScan(
6976
this.simpleStatsEvolutions =
7077
new SimpleStatsEvolutions(sid -> scanTableSchema(sid).fields(), schema.id());
7178
this.fileIndexReadEnabled = fileIndexReadEnabled;
79+
this.fileIO = fileIO;
80+
this.pathFactory = pathFactory;
7281
}
7382

7483
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
@@ -102,24 +111,22 @@ protected boolean filterByStats(ManifestEntry entry) {
102111
}
103112

104113
private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry entry) {
105-
if (embeddedIndexBytes == null) {
106-
return true;
107-
}
108-
109-
RowType dataRowType = scanTableSchema(entry.file().schemaId()).logicalRowType();
110-
114+
TableSchema tableSchema = scanTableSchema(entry.file().schemaId());
111115
Predicate dataPredicate =
112116
dataFilterMapping.computeIfAbsent(
113117
entry.file().schemaId(),
114118
id ->
115119
simpleStatsEvolutions.toEvolutionSafeStatsFilter(
116120
entry.file().schemaId(), filter));
117121

118-
try (FileIndexPredicate predicate =
119-
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
120-
return predicate.evaluate(dataPredicate).remain();
122+
DataFilePathFactory dataFilePathFactory =
123+
pathFactory.createDataFilePathFactory(entry.partition(), entry.bucket());
124+
try {
125+
return FileIndexEvaluator.evaluate(
126+
fileIO, tableSchema, dataPredicate, dataFilePathFactory, entry.file())
127+
.remain();
121128
} catch (IOException e) {
122-
throw new RuntimeException("Exception happens while checking predicate.", e);
129+
throw new RuntimeException("Exception happens while checking file index.", e);
123130
}
124131
}
125132
}

paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
3939
import org.apache.paimon.partition.PartitionUtils;
4040
import org.apache.paimon.predicate.Predicate;
41+
import org.apache.paimon.predicate.PredicateBuilder;
4142
import org.apache.paimon.reader.EmptyFileRecordReader;
4243
import org.apache.paimon.reader.FileRecordReader;
4344
import org.apache.paimon.reader.ReaderSupplier;
@@ -198,11 +199,14 @@ private FileRecordReader<InternalRow> createFileReader(
198199
throws IOException {
199200
FileIndexResult fileIndexResult = null;
200201
if (fileIndexReadEnabled) {
202+
Predicate dataPredicate =
203+
PredicateBuilder.and(
204+
formatReaderMapping.getDataFilters().toArray(new Predicate[0]));
201205
fileIndexResult =
202206
FileIndexEvaluator.evaluate(
203207
fileIO,
204208
formatReaderMapping.getDataSchema(),
205-
formatReaderMapping.getDataFilters(),
209+
dataPredicate,
206210
dataFilePathFactory,
207211
file);
208212
if (!fileIndexResult.remain()) {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.fileindex
20+
21+
import org.apache.paimon.operation.AppendOnlyFileStoreScan
22+
import org.apache.paimon.predicate.PredicateBuilder
23+
import org.apache.paimon.spark.PaimonSparkTestBase
24+
import org.apache.paimon.table.FileStoreTable
25+
26+
import scala.jdk.CollectionConverters._
27+
28+
class FileIndexTest extends PaimonSparkTestBase {
29+
30+
test("test file index in plan phase") {
31+
Seq("bloom-filter", "bitmap", "bsi").foreach {
32+
filter =>
33+
Seq("1B", "1MB").foreach {
34+
size =>
35+
val fileIndexProp = if (filter == "bloom-filter") {
36+
s" 'file-index.bloom-filter.columns' = 'id'"
37+
} else if (filter.equals("bitmap")) {
38+
s" 'file-index.bitmap.columns' = 'id'"
39+
} else if (filter.equals("bsi")) {
40+
s" 'file-index.bsi.columns' = 'id'"
41+
}
42+
43+
withTable("T") {
44+
spark.sql(s"""
45+
|create table T (
46+
|id int,
47+
|name string)
48+
|USING paimon
49+
|TBLPROPERTIES(
50+
| $fileIndexProp,
51+
| 'file-index.in-manifest-threshold'= '$size')
52+
|""".stripMargin)
53+
54+
spark.sql("insert into T values(1,'a')")
55+
spark.sql("insert into T values(2,'b')")
56+
spark.sql("insert into T values(3,'c')")
57+
assert(sql("select * from `T$files`").collect().length == 3)
58+
59+
val fileNames = spark.sql("select input_file_name() from T where id == 2 ").collect()
60+
assert(fileNames.length == 1)
61+
val location = fileNames(0).getString(0)
62+
val fileName = location.substring(location.lastIndexOf('/') + 1)
63+
64+
val table: FileStoreTable = loadTable("T")
65+
val predicateBuilder = new PredicateBuilder(table.rowType)
66+
// predicate is 'id=2'
67+
val predicate = predicateBuilder.equal(0, 2)
68+
val files = table
69+
.store()
70+
.newScan()
71+
.asInstanceOf[AppendOnlyFileStoreScan]
72+
.withFilter(predicate)
73+
.plan()
74+
.files()
75+
.asScala
76+
.map(x => x.fileName())
77+
assert(files.length == 1)
78+
assert(fileName.equals(files.head))
79+
}
80+
}
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)