Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Done
-
0.12.0
-
None
-
None
-
None
-
Ubuntu LXC, hadoop-2
Description
When a bulk-ETL operation is in progress, the query plan only sorts based on the SORTED BY key.
This means that the FileSinkOperator in the reducer has to keep all the dynamic partition RecordWriters open till the end of the reducer lifetime.
A more MR-friendly approach would be to sort by <partition_col,sorted_col> so that the data entering the reducer will not require to keep exactly one partition and bucket open at any given time.
As a test-case a partitioned insert for the TPC-h benchmark's lineitem table will suffice
create table lineitem (L_ORDERKEY INT, ... partitioned by (L_SHIPDATE STRING) clustered by (l_orderkey) sorted by (l_orderkey) into 4 buckets stored as ORC; explain from (select L_ORDERKEY , ...) tbl insert overwrite table lineitem partition (L_SHIPDATE) select * ;
The generated plan very clearly has
Reduce Output Operator key expressions: expr: _col0 type: int sort order: + Map-reduce partition columns: expr: _col0 type: int tag: -1
And col0 being L_ORDERKEY.
In the FileSinkOperator over at the reducer side, this results in a larger than usual number of open files.
This causes memory pressure due to the compression buffers used by ORC/RCFile and really slows down the reducers.
A side-effect of this is that I had to pump 350Gb of TPC-h data through 4 reducers, which on occasion took > 1 hour to get from opening a file in the FS to writing the first ORC stripe.
This caused HDFS lease expiry and the task dying from that error.
All of these can be avoided by adding the partition column to the sort keys as well as the partition keys & keeping only one writer open in the FileSinkOperator.