Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-4794

Impala's count(distinct ...) plans are not robust to data skew

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: Impala 2.7.0, Impala 2.8.0, Impala 2.9.0
    • Fix Version/s: Impala 2.10.0
    • Component/s: Frontend
    • Labels:

      Description

      The below shows two equivalent queries that compute count(distinct ...). The first plan has fewer aggregations and exchanges, so will be faster if the grouping column (int_col) is unskewed. However, the second plan performs much better than the first if the grouping column is skewed. E.g. we saw an example where instead of int_col, the expression was two-valued along the lines of select distinct case when flag = 1 then 'yes' else 'no'

      [localhost:21000] > explain select int_col, count(distinct bigint_col) from functional.alltypes group by int_col;
      Query: explain select int_col, count(distinct bigint_col) from functional.alltypes group by int_col
      +-----------------------------------------------------------+
      | Explain String                                            |
      +-----------------------------------------------------------+
      | Estimated Per-Host Requirements: Memory=180.00MB VCores=2 |
      |                                                           |
      | PLAN-ROOT SINK                                            |
      | |                                                         |
      | 05:EXCHANGE [UNPARTITIONED]                               |
      | |                                                         |
      | 02:AGGREGATE [FINALIZE]                                   |
      | |  output: count(bigint_col)                              |
      | |  group by: int_col                                      |
      | |                                                         |
      | 04:AGGREGATE                                              |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 03:EXCHANGE [HASH(int_col)]                               |
      | |                                                         |
      | 01:AGGREGATE [STREAMING]                                  |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 00:SCAN HDFS [functional.alltypes]                        |
      |    partitions=24/24 files=24 size=478.45KB                |
      +-----------------------------------------------------------+
      
      [localhost:21000] > explain select int_col, count(*) from (select distinct int_col, bigint_col from functional.alltypes) a group by int_col;
      Query: explain select int_col, count(*) from (select distinct int_col, bigint_col from functional.alltypes) a group by int_col
      +-----------------------------------------------------------+
      | Explain String                                            |
      +-----------------------------------------------------------+
      | Estimated Per-Host Requirements: Memory=180.00MB VCores=2 |
      |                                                           |
      | PLAN-ROOT SINK                                            |
      | |                                                         |
      | 07:EXCHANGE [UNPARTITIONED]                               |
      | |                                                         |
      | 06:AGGREGATE [FINALIZE]                                   |
      | |  output: count:merge(*)                                 |
      | |  group by: int_col                                      |
      | |                                                         |
      | 05:EXCHANGE [HASH(int_col)]                               |
      | |                                                         |
      | 02:AGGREGATE [STREAMING]                                  |
      | |  output: count(*)                                       |
      | |  group by: int_col                                      |
      | |                                                         |
      | 04:AGGREGATE [FINALIZE]                                   |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 03:EXCHANGE [HASH(int_col,bigint_col)]                    |
      | |                                                         |
      | 01:AGGREGATE [STREAMING]                                  |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 00:SCAN HDFS [functional.alltypes]                        |
      |    partitions=24/24 files=24 size=478.45KB                |
      +-----------------------------------------------------------+
      Fetched 26 row(s) in 0.01s
      

      We may be better off generating the second plan by default (or with some query option) since it is more robust.

        Attachments

          Activity

            People

            • Assignee:
              tianyiwang Tianyi Wang
              Reporter:
              tarmstrong Tim Armstrong
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: