Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-4794

Impala's count(distinct ...) plans are not robust to data skew

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: Impala 2.7.0, Impala 2.8.0, Impala 2.9.0
    • Fix Version/s: Impala 2.10.0
    • Component/s: Frontend
    • Labels:

      Description

      The below shows two equivalent queries that compute count(distinct ...). The first plan has fewer aggregations and exchanges, so will be faster if the grouping column (int_col) is unskewed. However, the second plan performs much better than the first if the grouping column is skewed. E.g. we saw an example where instead of int_col, the expression was two-valued along the lines of select distinct case when flag = 1 then 'yes' else 'no'

      [localhost:21000] > explain select int_col, count(distinct bigint_col) from functional.alltypes group by int_col;
      Query: explain select int_col, count(distinct bigint_col) from functional.alltypes group by int_col
      +-----------------------------------------------------------+
      | Explain String                                            |
      +-----------------------------------------------------------+
      | Estimated Per-Host Requirements: Memory=180.00MB VCores=2 |
      |                                                           |
      | PLAN-ROOT SINK                                            |
      | |                                                         |
      | 05:EXCHANGE [UNPARTITIONED]                               |
      | |                                                         |
      | 02:AGGREGATE [FINALIZE]                                   |
      | |  output: count(bigint_col)                              |
      | |  group by: int_col                                      |
      | |                                                         |
      | 04:AGGREGATE                                              |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 03:EXCHANGE [HASH(int_col)]                               |
      | |                                                         |
      | 01:AGGREGATE [STREAMING]                                  |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 00:SCAN HDFS [functional.alltypes]                        |
      |    partitions=24/24 files=24 size=478.45KB                |
      +-----------------------------------------------------------+
      
      [localhost:21000] > explain select int_col, count(*) from (select distinct int_col, bigint_col from functional.alltypes) a group by int_col;
      Query: explain select int_col, count(*) from (select distinct int_col, bigint_col from functional.alltypes) a group by int_col
      +-----------------------------------------------------------+
      | Explain String                                            |
      +-----------------------------------------------------------+
      | Estimated Per-Host Requirements: Memory=180.00MB VCores=2 |
      |                                                           |
      | PLAN-ROOT SINK                                            |
      | |                                                         |
      | 07:EXCHANGE [UNPARTITIONED]                               |
      | |                                                         |
      | 06:AGGREGATE [FINALIZE]                                   |
      | |  output: count:merge(*)                                 |
      | |  group by: int_col                                      |
      | |                                                         |
      | 05:EXCHANGE [HASH(int_col)]                               |
      | |                                                         |
      | 02:AGGREGATE [STREAMING]                                  |
      | |  output: count(*)                                       |
      | |  group by: int_col                                      |
      | |                                                         |
      | 04:AGGREGATE [FINALIZE]                                   |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 03:EXCHANGE [HASH(int_col,bigint_col)]                    |
      | |                                                         |
      | 01:AGGREGATE [STREAMING]                                  |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 00:SCAN HDFS [functional.alltypes]                        |
      |    partitions=24/24 files=24 size=478.45KB                |
      +-----------------------------------------------------------+
      Fetched 26 row(s) in 0.01s
      

      We may be better off generating the second plan by default (or with some query option) since it is more robust.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                tianyiwang Tianyi Wang
                Reporter:
                tarmstrong Tim Armstrong
              • Votes:
                0 Vote for this issue
                Watchers:
                7 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: