Hive Query Failing to Insert into Dynamically Partitioned Parquet Table

0 votes
asked Aug 19, 2017 in Hadoop by admin (4,410 points)
SummaryHive query fails with OutOfMemoryError error when issuing INSERT statement into a dynamically partitioned Parquet table
Last Published Date8/4/2017 2:09 PM

When running an INSERT ... SELECT statement, where the destination table is Parquet formatted and dynamic partitions are enabled, the following errors are observed:
Hive Client

Task with the most failures(4):
Diagnostic Messages for this Task:
Error: GC overhead limit exceeded

FAILED: Execution Error, return code 2 from
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 HDFS Read: 0 HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 0 msec
YARN MapReduce Error Logs

2016-10-27 17:08:04,317 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded
Applies To

Parquet is a columnar batch file format, which will buffer data into batches before writing to a file. Parquet is implemented with a pre-allocated buffer and only flushes the buffer out to HDFS when the buffer is full.

Inserting data into a partitioned table can therefore be a memory-intensive operation, because each data file, in each partition, requires a memory buffer to hold the data before it is written. Such inserts can also exceed HDFS limits on simultaneous open files, because each Mapper/Reducer could potentially write to a separate data file for each partition, all at the same time.

By default Hive allocates a 128MB buffer each open Parquet file buffer.  The buffer size is controlled by the configuration setting 'parquet.block.size'.  For best performance, the Parquet buffer size should be aligned to the HDFS block size so that each file sits within a single HDFS block so that each I/O request reads an entire data file without having to reach across the network to read subsequent blocks.
-- set Parquet buffer size to 256MB (in bytes)

set parquet.block.size=268435456;

Often, the INSERT ... SELECT statement is translated into a Mapper-only job if there are no joins or aggregations.  The Mappers simply reads the records and outputs them to the destination Parquet file. In this scenario, each Mapper must create a new Parquet file (buffer) for each partition it discovers based on the data it reads. If the Mapper discovers many partitions, it will require many open Parquet files at the same time.  This can often lead to Mappers crashing with an OutOfMemoryError exception.
There are several ways to mitigate this issue:

1) Enable the following Hive session variable 'hive.optimize.sort.dynamic.partition'  by setting it to 'true'. This will force the Mapper-only job to add a Reducer phase.  Each Reducer will only be responsible for a subset of the partitions and therefore will not open a file handle for all of the partitions.  The number of partitions each Reducer is responsible for will dictate the amount of memory required for the Reducers.
SET hive.optimize.sort.dynamic.partition=true; 
2) Increase the memory reserved for each Mapper so that all Parquet buffers fit into memory.

3) Break up the query into several smaller queries to reduce the number of partitions created with each query. This will have the effect of requiring fewer Parquet file buffers for each Mapper

Please log in or register to answer this question.