Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-4862

Planner's peak resource estimates do not accurately reflect the behaviour of joins and unions in the backend

    XMLWordPrintableJSON

Details

    Description

      In the following example the way the peak resource estimate is computed from per-node estimates is wrong. It should be 476.41MB, because the scan node is Open()ed in the backend while the concurrent join builds are executing.

      set explain_level=1;
      explain select * from tpch.lineitem inner join tpch.orders on l_orderkey = o_orderkey
      
      Estimated Per-Host Requirements: Memory=388.41MB
      
      PLAN-ROOT SINK
      |
      04:EXCHANGE [UNPARTITIONED]
      |  hosts=3 per-host-mem=0B
      |  tuple-ids=0,1 row-size=454B cardinality=5757710
      |
      02:HASH JOIN [INNER JOIN, BROADCAST]
      |  hash predicates: l_orderkey = o_orderkey
      |  runtime filters: RF000 <- o_orderkey
      |  hosts=3 per-host-mem=300.41MB
      |  tuple-ids=0,1 row-size=454B cardinality=5757710
      |
      |--03:EXCHANGE [BROADCAST]
      |  |  hosts=2 per-host-mem=0B
      |  |  tuple-ids=1 row-size=191B cardinality=1500000
      |  |
      |  01:SCAN HDFS [tpch.orders, RANDOM]
      |     partitions=1/1 files=1 size=162.56MB
      |     table stats: 1500000 rows total
      |     column stats: all
      |     hosts=2 per-host-mem=88.00MB
      |     tuple-ids=1 row-size=191B cardinality=1500000
      |
      00:SCAN HDFS [tpch.lineitem, RANDOM]
         partitions=1/1 files=1 size=718.94MB
         runtime filters: RF000 -> l_orderkey
         table stats: 6001215 rows total
         column stats: all
         hosts=3 per-host-mem=88.00MB
         tuple-ids=0 row-size=263B cardinality=6001215
      

      Another example is this one, where in the backend the aggregations can execute concurrently with the join builds

      [localhost:21000] > explain select straight_join * from (select id, count(*) from functional.alltypes group by id) t1 inner join functional.alltypes t2 on t1.id = t2.id;
      Query: explain select straight_join * from (select id, count(*) from functional.alltypes group by id) t1 inner join functional.alltypes t2 on t1.id = t2.id
      +-----------------------------------------------------+
      | Explain String                                      |
      +-----------------------------------------------------+
      | Estimated Per-Host Requirements: Memory=180.00MB    |
      |                                                     |
      | PLAN-ROOT SINK                                      |
      | |                                                   |
      | 07:EXCHANGE [UNPARTITIONED]                         |
      | |  hosts=3 per-host-mem=0B                          |
      | |  tuple-ids=1,3 row-size=109B cardinality=7300     |
      | |                                                   |
      | 03:HASH JOIN [INNER JOIN, PARTITIONED]              |
      | |  hash predicates: id = t2.id                      |
      | |  runtime filters: RF000 <- t2.id                  |
      | |  hosts=3 per-host-mem=253.55KB                    |
      | |  tuple-ids=1,3 row-size=109B cardinality=7300     |
      | |                                                   |
      | |--06:EXCHANGE [HASH(t2.id)]                        |
      | |  |  hosts=3 per-host-mem=0B                       |
      | |  |  tuple-ids=3 row-size=97B cardinality=7300     |
      | |  |                                                |
      | |  02:SCAN HDFS [functional.alltypes t2, RANDOM]    |
      | |     partitions=24/24 files=24 size=478.45KB       |
      | |     table stats: 7300 rows total                  |
      | |     column stats: all                             |
      | |     hosts=3 per-host-mem=160.00MB                 |
      | |     tuple-ids=3 row-size=97B cardinality=7300     |
      | |                                                   |
      | 05:AGGREGATE [FINALIZE]                             |
      | |  output: count:merge(*)                           |
      | |  group by: id                                     |
      | |  hosts=3 per-host-mem=10.00MB                     |
      | |  tuple-ids=1 row-size=12B cardinality=7300        |
      | |                                                   |
      | 04:EXCHANGE [HASH(id)]                              |
      | |  hosts=3 per-host-mem=0B                          |
      | |  tuple-ids=1 row-size=12B cardinality=7300        |
      | |                                                   |
      | 01:AGGREGATE [STREAMING]                            |
      | |  output: count(*)                                 |
      | |  group by: id                                     |
      | |  hosts=3 per-host-mem=10.00MB                     |
      | |  tuple-ids=1 row-size=12B cardinality=7300        |
      | |                                                   |
      | 00:SCAN HDFS [functional.alltypes, RANDOM]          |
      |    partitions=24/24 files=24 size=478.45KB          |
      |    runtime filters: RF000 -> functional.alltypes.id |
      |    table stats: 7300 rows total                     |
      |    column stats: all                                |
      |    hosts=3 per-host-mem=160.00MB                    |
      |    tuple-ids=0 row-size=4B cardinality=7300         |
      +-----------------------------------------------------+
      

      The behaviour for unions also is not accurate - branches of unions within the same fragment are execute serially, but anything below an exchanges is executed concurrently.

      Attachments

        Issue Links

          Activity

            People

              tarmstrong Tim Armstrong
              tarmstrong Tim Armstrong
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: