when SparkSQL query on parquet table and filter by partition column the column name is case sensitive

0 votes
2 views
asked Aug 30, 2017 in Hadoop by admin (4,410 points)
SummaryIn Spark 1.6.x based distribution(CDH 5.7+), When query a hive table which is stored as parquet file, and when you want to filter by the partition column, you have to specify a case sensitive column name.

Symptoms

1) Create the table in Hive and store as parquet, not that we specify the table partition column as lower case name 'part':

CREATE TABLE `parquet_test`(
  `id` int COMMENT '',
  `str` string COMMENT '')
PARTITIONED BY (
  `part` string)
ROW FORMAT SERDE
  'parquet.hive.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'parquet.hive.DeprecatedParquetInputFormat'
OUTPUTFORMAT
  'parquet.hive.DeprecatedParquetOutputFormat';

insert into table parquet_test partition(part='part1') values(1,'test1');
insert into table parquet_test partition(part='part2') values(2,'test2');
insert into table parquet_test partition(part='part1') values(3,'test3');

2)  In Hive shell, we are able to query it with PART='part1' filter:

hive> select * from parquet_test where PART='part1';
OK
1    test1    part1
3    test3    part1
Time taken: 0.113 seconds, Fetched: 2 row(s)


3) but in default spark-shell, which is based on spark 1.6.x, you are not able to query it with the same filter on partition column:

scala>

scala> sqlContext.sql("select * from parquet_test where PART='part1' ").show();
+---+-----+-----+
| id|  str| part|
+---+-----+-----+
|  1|test1|part1|
|  3|test3|part1|
|  2|test2|part2|
+---+-----+-----+

The result shows the filter is just not taking effect, "PART='part1'" is totally ignored.

4) But in Cloudera Spark 2.1, "PART='part1'" also works:

[root@host tmp]# spark2-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/04/10 02:42:17 WARN spark.SparkContext: Support for Java 7 is deprecated as of Spark 2.0.0
Spark context Web UI available at http://mask.ip:4040
Spark context available as 'sc' (master = yarn, app id = application_1491814750660_0006).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0.cloudera1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql("select * from parquet_test where PART='part1' ").show();
+---+-----+-----+
| id|  str| part|
+---+-----+-----+
|  1|test1|part1|
|  3|test3|part1|
+---+-----+-----+


Note above spark version info:  version 2.1.0.cloudera1.

Applies To
  • Any default Spark 1.6.x included in :
  • CDH 5.7.x
  • CDH 5.8.x
  • CDH 5.9.x
Cause

Spark has its own Parquet read support and implementation, see discussion in [1], which has some differences with hive's Parquet support:

[1] https://github.com/apache/spark/pull/16797

Instructions

Two available solutions:

1) Either install and use Cloudera Spark 2.1 release:


2) Or, to control how Spark to deal with partition column name same as in Hive, you can configure below property:

sqlContext.setConf("spark.sql.hive.convertMetastoreParquet","false")


and then issue your query:
    

sqlContext.sql("select * from parquet_test where PART='part1' ").show();


it will honor hive meta and use case insensitive partition field name.

[root@host tmp]# spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc (master = yarn-client, app id = application_1491814750660_0007).
SQL context available as sqlContext.

scala> sqlContext.setConf("spark.sql.hive.convertMetastoreParquet","false")

scala> sqlContext.sql("select * from parquet_test where PART='part1' ").show();
+---+-----+-----+
| id|  str| part|
+---+-----+-----+
|  1|test1|part1|
|  3|test3|part1|
+---+-----+-----+


It returns the same result as in Hive shell.

Please log in or register to answer this question.

...