87
87
import org .apache .hadoop .hive .ql .io .parquet .vector .VectorizedParquetRecordReader ;
88
88
import org .apache .hadoop .hive .ql .io .sarg .ConvertAstToSearchArg ;
89
89
import org .apache .hadoop .hive .ql .io .sarg .SearchArgument ;
90
+ import org .apache .hadoop .hive .ql .metadata .DefaultStorageHandler ;
90
91
import org .apache .hadoop .hive .ql .metadata .DummyPartition ;
91
92
import org .apache .hadoop .hive .ql .metadata .HiveException ;
92
93
import org .apache .hadoop .hive .ql .metadata .HiveStorageHandler ;
222
223
import static org .apache .iceberg .TableProperties .MERGE_MODE ;
223
224
import static org .apache .iceberg .TableProperties .UPDATE_MODE ;
224
225
225
- public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler , HiveStorageHandler {
226
+ public class HiveIcebergStorageHandler extends DefaultStorageHandler implements HiveStoragePredicateHandler {
226
227
private static final Logger LOG = LoggerFactory .getLogger (HiveIcebergStorageHandler .class );
227
228
228
229
private static final String ICEBERG_URI_PREFIX = "iceberg://" ;
@@ -245,8 +246,6 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
245
246
246
247
private static final List <FieldSchema > EMPTY_ORDERING = ImmutableList .of ();
247
248
248
- private Configuration conf ;
249
-
250
249
@ Override
251
250
public Class <? extends InputFormat > getInputFormatClass () {
252
251
return HiveIcebergInputFormat .class ;
@@ -316,17 +315,6 @@ public void commitJob(JobContext originalContext) {
316
315
}
317
316
}
318
317
319
- @ Override
320
- public void configureTableJobProperties (TableDesc tableDesc , Map <String , String > map ) {
321
-
322
- }
323
-
324
- // Override annotation commented out, since this interface method has been introduced only in Hive 3
325
- // @Override
326
- public void configureInputJobCredentials (TableDesc tableDesc , Map <String , String > secrets ) {
327
-
328
- }
329
-
330
318
@ Override
331
319
public void configureJobConf (TableDesc tableDesc , JobConf jobConf ) {
332
320
setCommonJobConf (jobConf );
@@ -378,21 +366,6 @@ public boolean supportsPartitioning() {
378
366
return true ;
379
367
}
380
368
381
- @ Override
382
- public Configuration getConf () {
383
- return conf ;
384
- }
385
-
386
- @ Override
387
- public void setConf (Configuration conf ) {
388
- this .conf = conf ;
389
- }
390
-
391
- @ Override
392
- public String toString () {
393
- return this .getClass ().getName ();
394
- }
395
-
396
369
/**
397
370
* @param jobConf Job configuration for InputFormat to access
398
371
* @param deserializer Deserializer
@@ -1357,11 +1330,12 @@ static String encodeString(String rawString) {
1357
1330
return HiveConf .EncoderDecoderFactory .URL_ENCODER_DECODER .encode (rawString );
1358
1331
}
1359
1332
1360
- String getPathForAuth (String locationProperty ) {
1333
+ private String getPathForAuth (String locationProperty ) {
1361
1334
return getPathForAuth (locationProperty ,
1362
1335
SessionStateUtil .getProperty (conf , SessionStateUtil .DEFAULT_TABLE_LOCATION ).orElse (null ));
1363
1336
}
1364
1337
1338
+ @ VisibleForTesting
1365
1339
String getPathForAuth (String locationProperty , String defaultTableLocation ) {
1366
1340
boolean maskDefaultLocation = conf .getBoolean (ConfVars .HIVE_ICEBERG_MASK_DEFAULT_LOCATION .varname ,
1367
1341
ConfVars .HIVE_ICEBERG_MASK_DEFAULT_LOCATION .defaultBoolVal );
@@ -1393,7 +1367,6 @@ private boolean arePathsInSameFs(String locationProperty, String defaultTableLoc
1393
1367
1394
1368
@ Override
1395
1369
public void validateSinkDesc (FileSinkDesc sinkDesc ) throws SemanticException {
1396
- HiveStorageHandler .super .validateSinkDesc (sinkDesc );
1397
1370
if (sinkDesc .getInsertOverwrite ()) {
1398
1371
Table table = IcebergTableUtil .getTable (conf , sinkDesc .getTableInfo ().getProperties ());
1399
1372
if (table .currentSnapshot () != null &&
@@ -1486,7 +1459,7 @@ public List<FieldSchema> sortColumns(org.apache.hadoop.hive.ql.metadata.Table hm
1486
1459
s .transform ().toString (), s .direction ().name (), s .nullOrder ().name ()))).collect (Collectors .toList ());
1487
1460
}
1488
1461
1489
- private void setCommonJobConf (JobConf jobConf ) {
1462
+ private static void setCommonJobConf (JobConf jobConf ) {
1490
1463
jobConf .set ("tez.mrreader.config.update.properties" , "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids" );
1491
1464
}
1492
1465
@@ -1642,12 +1615,13 @@ static void overlayTableProperties(Configuration configuration, TableDesc tableD
1642
1615
1643
1616
// serialize table object into config
1644
1617
Table serializableTable = SerializableTable .copyOf (table );
1618
+
1645
1619
// set table format-version and write-mode information from tableDesc
1646
- List <String > writeConfigList = ImmutableList .of (
1620
+ final List <String > writeConfigList = ImmutableList .of (
1647
1621
FORMAT_VERSION , DELETE_MODE , UPDATE_MODE , MERGE_MODE );
1648
- if ( IcebergTableUtil . isV2TableOrAbove ( props :: getProperty )) {
1649
- writeConfigList . forEach ( cfg -> serializableTable .properties ().computeIfAbsent (cfg , props ::getProperty ));
1650
- }
1622
+ writeConfigList . forEach ( cfg ->
1623
+ serializableTable .properties ().computeIfAbsent (cfg , props ::getProperty ));
1624
+
1651
1625
checkAndSkipIoConfigSerialization (configuration , serializableTable );
1652
1626
map .put (InputFormatConfig .SERIALIZED_TABLE_PREFIX + tableDesc .getTableName (),
1653
1627
SerializationUtil .serializeToBase64 (serializableTable ));
@@ -2038,12 +2012,6 @@ public void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
2038
2012
Table table = IcebergTableUtil .getTable (conf , hmsTable .getTTable ());
2039
2013
List <PartitionField > partitionFields = IcebergTableUtil .getPartitionFields (table ,
2040
2014
policy != RewritePolicy .PARTITION );
2041
- validatePartSpecImpl (hmsTable , partitionSpec , partitionFields );
2042
- }
2043
-
2044
- private void validatePartSpecImpl (org .apache .hadoop .hive .ql .metadata .Table hmsTable ,
2045
- Map <String , String > partitionSpec , List <PartitionField > partitionFields ) throws SemanticException {
2046
- Table table = IcebergTableUtil .getTable (conf , hmsTable .getTTable ());
2047
2015
if (hmsTable .getSnapshotRef () != null && IcebergTableUtil .hasUndergonePartitionEvolution (table )) {
2048
2016
// for this case we rewrite the query as delete query, so validations would be done as part of delete.
2049
2017
return ;
@@ -2139,11 +2107,6 @@ public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable)
2139
2107
public Partition getPartition (org .apache .hadoop .hive .ql .metadata .Table table ,
2140
2108
Map <String , String > partitionSpec , RewritePolicy policy ) throws SemanticException {
2141
2109
validatePartSpec (table , partitionSpec , policy );
2142
- return getPartitionImpl (table , partitionSpec );
2143
- }
2144
-
2145
- private Partition getPartitionImpl (org .apache .hadoop .hive .ql .metadata .Table table ,
2146
- Map <String , String > partitionSpec ) throws SemanticException {
2147
2110
try {
2148
2111
String partName = Warehouse .makePartName (partitionSpec , false );
2149
2112
return new DummyPartition (table , partName , partitionSpec );
0 commit comments