Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-3002

Inconsistent accounting of exchange rows between impala-shell and profile summary

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: Impala 2.5.0
    • Fix Version/s: Impala 2.8.0
    • Component/s: Clients
    • Environment:
      Bolt-80

      Description

      Check 04:EXCHANGE

      Summary from impala-shell

      Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail
      06:AGGREGATE 1 201.91ms 201.91ms 1 1 300.00 KB -1 B FINALIZE
      05:EXCHANGE 1 1.28ms 1.28ms 72 1 0 B -1 B UNPARTITIONED
      03:AGGREGATE 72 5.01s 6.47s 72 1 3.22 MB 10.00 MB  
      02:HASH JOIN 72 203.42s 262.39s 3.26B 6.36B 10.51 GB 11.63 GB INNER JOIN, BROADCAST
      I--04:EXCHANGE 72 13.07s 14.28s 7.83B 200.00M 0 B 0 B BROADCAST
      I 00:SCAN HDFS 72 550.48ms 634.76ms 108.70M 200.00M 476.01 MB 176.00 MB tpch_10000_decimal_parquet.part
      01:SCAN HDFS 72 4.64s 7.13s 60.00B 60.00B 1.18 GB 352.00 MB tpch_10000_decimal_parquet.lineitem

      And from profiles

      Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail
      06:AGGREGATE 1 201.907ms 201.907ms 1 1 300.00 KB -1.00 B FINALIZE
      05:EXCHANGE 1 1.284ms 1.284ms 72 1 0 -1.00 B UNPARTITIONED
      03:AGGREGATE 72 5s011ms 6s467ms 72 1 3.22 MB 10.00 MB
      02:HASH JOIN 72 3m23s 4m22s 3.26B 6.36B 10.51 GB 11.63 GB INNER JOIN, BROADCAST
      I--04:EXCHANGE 72 13s070ms 14s275ms 108.70M 200.00M 0 0 BROADCAST
      I 00:SCAN HDFS 72 550.477ms 634.764ms 108.70M 200.00M 476.01 MB 176.00 MB tpch_10000_decimal_parquet....
      01:SCAN HDFS 72 4s641ms 7s125ms 60.00B 60.00B 1.18 GB 352.00 MB tpch_10000_decimal_parquet....

        Activity

        Hide
        dhecht Dan Hecht added a comment -

        This looks like a bug in the shell to me. In particular:

        impala_client.py:build_summary_table
            if node.is_broadcast and is_fragment_root:
              cardinality = max_stats.cardinality
            else:
              cardinality = agg_stats.cardinality
        

        Why the is_fragment_root. Note that the version in the backend doesn't have that:

        PrintExecSummary
          row.push_back(PrettyPrinter::Print(
              node.is_broadcast ? max_stats.cardinality : agg_stats.cardinality,
              TUnit::UNIT));
        

        Note also that the shell code has been duplicated into impala_beeswax.py.

        Henry Robinson, you added the shell version in 01320f4f458dcc321c92c3793880d8413ef84cf6, but even at that time, the C++ version didn't have this condition. Do you remember why you added it? Feel free to reassign, of course, but wanted to check if you remember.

        Show
        dhecht Dan Hecht added a comment - This looks like a bug in the shell to me. In particular: impala_client.py:build_summary_table if node.is_broadcast and is_fragment_root: cardinality = max_stats.cardinality else : cardinality = agg_stats.cardinality Why the is_fragment_root. Note that the version in the backend doesn't have that: PrintExecSummary row.push_back(PrettyPrinter::Print( node.is_broadcast ? max_stats.cardinality : agg_stats.cardinality, TUnit::UNIT)); Note also that the shell code has been duplicated into impala_beeswax.py. Henry Robinson , you added the shell version in 01320f4f458dcc321c92c3793880d8413ef84cf6, but even at that time, the C++ version didn't have this condition. Do you remember why you added it? Feel free to reassign, of course, but wanted to check if you remember.
        Hide
        mmokhtar Mostafa Mokhtar added a comment -

        This is also an issues for broadcast joins.

        select count(*) from date_dim, store_returns where d_date_sk = sr_returned_date_sk;
        

        Check the the Actual Vs. Est. #Rows for 04:EXCHANGE

        | Operator        | #Hosts | Avg Time | Max Time | #Rows   | Est. #Rows | Peak Mem | Est. Peak Mem | Detail                           |
        | 06:AGGREGATE    | 1      | 62.70ms  | 62.70ms  | 1       | 1          | 72.00 KB | -1 B          | FINALIZE                         |
        | 05:EXCHANGE     | 1      | 131.78us | 131.78us | 15      | 1          | 0 B      | -1 B          | UNPARTITIONED                    |
        | 03:AGGREGATE    | 15     | 62.86ms  | 68.54ms  | 15      | 1          | 20.00 KB | 10.00 MB      |                                  |
        | 02:HASH JOIN    | 15     | 522.31ms | 643.02ms | 277.92M | 288.00M    | 4.03 MB  | 313.88 KB     | INNER JOIN, BROADCAST            |
        | |--04:EXCHANGE  | 15     | 1.91ms   | 2.46ms   | 1.10M   | 73.05K     | 0 B      | 0 B           | BROADCAST                        |
        | |  00:SCAN HDFS | 1      | 55.36ms  | 55.36ms  | 73.05K  | 73.05K     | 1.97 MB  | 16.00 MB      | date_dim      |
        | 01:SCAN HDFS    | 15     | 118.79ms | 218.92ms | 277.92M | 288.00M    | 3.94 MB  | 0 B           | store_returns |
        

        Plan

        +----------------------------------------------------------+
        | Explain String                                           |
        +----------------------------------------------------------+
        | Estimated Per-Host Requirements: Memory=16.31MB VCores=2 |
        |                                                          |
        | 06:AGGREGATE [FINALIZE]                                  |
        | |  output: count:merge(*)                                |
        | |  hosts=15 per-host-mem=unavailable                     |
        | |  tuple-ids=2 row-size=8B cardinality=1                 |
        | |                                                        |
        | 05:EXCHANGE [UNPARTITIONED]                              |
        | |  hosts=15 per-host-mem=unavailable                     |
        | |  tuple-ids=2 row-size=8B cardinality=1                 |
        | |                                                        |
        | 03:AGGREGATE                                             |
        | |  output: count(*)                                      |
        | |  hosts=15 per-host-mem=10.00MB                         |
        | |  tuple-ids=2 row-size=8B cardinality=1                 |
        | |                                                        |
        | 02:HASH JOIN [INNER JOIN, BROADCAST]                     |
        | |  hash predicates: sr_returned_date_sk = d_date_sk      |
        | |  runtime filters: RF000 <- d_date_sk                   |
        | |  hosts=15 per-host-mem=313.88KB                        |
        | |  tuple-ids=1,0 row-size=8B cardinality=287999764       |
        | |                                                        |
        | |--04:EXCHANGE [BROADCAST]                               |
        | |  |  hosts=1 per-host-mem=0B                            |
        | |  |  tuple-ids=0 row-size=4B cardinality=73049          |
        | |  |                                                     |
        | |  00:SCAN HDFS [tpcds_1000_parquet.date_dim, RANDOM]    |
        | |     partitions=1/1 files=1 size=2.17MB                 |
        | |     table stats: 73049 rows total                      |
        | |     column stats: all                                  |
        | |     hosts=1 per-host-mem=16.00MB                       |
        | |     tuple-ids=0 row-size=4B cardinality=73049          |
        | |                                                        |
        | 01:SCAN HDFS [tpcds_1000_parquet.store_returns, RANDOM]  |
        |    partitions=2004/2004 files=2004 size=18.74GB          |
        |    runtime filters: RF000 -> sr_returned_date_sk         |
        |    table stats: 287999764 rows total                     |
        |    column stats: all                                     |
        |    hosts=15 per-host-mem=0B                              |
        |    tuple-ids=1 row-size=4B cardinality=287999764         |
        +----------------------------------------------------------+
        
        Show
        mmokhtar Mostafa Mokhtar added a comment - This is also an issues for broadcast joins. select count(*) from date_dim, store_returns where d_date_sk = sr_returned_date_sk; Check the the Actual Vs. Est. #Rows for 04:EXCHANGE | Operator | #Hosts | Avg Time | Max Time | #Rows | Est. #Rows | Peak Mem | Est. Peak Mem | Detail | | 06:AGGREGATE | 1 | 62.70ms | 62.70ms | 1 | 1 | 72.00 KB | -1 B | FINALIZE | | 05:EXCHANGE | 1 | 131.78us | 131.78us | 15 | 1 | 0 B | -1 B | UNPARTITIONED | | 03:AGGREGATE | 15 | 62.86ms | 68.54ms | 15 | 1 | 20.00 KB | 10.00 MB | | | 02:HASH JOIN | 15 | 522.31ms | 643.02ms | 277.92M | 288.00M | 4.03 MB | 313.88 KB | INNER JOIN, BROADCAST | | |--04:EXCHANGE | 15 | 1.91ms | 2.46ms | 1.10M | 73.05K | 0 B | 0 B | BROADCAST | | | 00:SCAN HDFS | 1 | 55.36ms | 55.36ms | 73.05K | 73.05K | 1.97 MB | 16.00 MB | date_dim | | 01:SCAN HDFS | 15 | 118.79ms | 218.92ms | 277.92M | 288.00M | 3.94 MB | 0 B | store_returns | Plan +----------------------------------------------------------+ | Explain String | +----------------------------------------------------------+ | Estimated Per-Host Requirements: Memory=16.31MB VCores=2 | | | | 06:AGGREGATE [FINALIZE] | | | output: count:merge(*) | | | hosts=15 per-host-mem=unavailable | | | tuple-ids=2 row-size=8B cardinality=1 | | | | | 05:EXCHANGE [UNPARTITIONED] | | | hosts=15 per-host-mem=unavailable | | | tuple-ids=2 row-size=8B cardinality=1 | | | | | 03:AGGREGATE | | | output: count(*) | | | hosts=15 per-host-mem=10.00MB | | | tuple-ids=2 row-size=8B cardinality=1 | | | | | 02:HASH JOIN [INNER JOIN, BROADCAST] | | | hash predicates: sr_returned_date_sk = d_date_sk | | | runtime filters: RF000 <- d_date_sk | | | hosts=15 per-host-mem=313.88KB | | | tuple-ids=1,0 row-size=8B cardinality=287999764 | | | | | |--04:EXCHANGE [BROADCAST] | | | | hosts=1 per-host-mem=0B | | | | tuple-ids=0 row-size=4B cardinality=73049 | | | | | | | 00:SCAN HDFS [tpcds_1000_parquet.date_dim, RANDOM] | | | partitions=1/1 files=1 size=2.17MB | | | table stats: 73049 rows total | | | column stats: all | | | hosts=1 per-host-mem=16.00MB | | | tuple-ids=0 row-size=4B cardinality=73049 | | | | | 01:SCAN HDFS [tpcds_1000_parquet.store_returns, RANDOM] | | partitions=2004/2004 files=2004 size=18.74GB | | runtime filters: RF000 -> sr_returned_date_sk | | table stats: 287999764 rows total | | column stats: all | | hosts=15 per-host-mem=0B | | tuple-ids=1 row-size=4B cardinality=287999764 | +----------------------------------------------------------+
        Hide
        twmarshall Thomas Tauber-Marshall added a comment -

        commit 7fad3e5dc38c1097db6be24da0cda6941f554150
        Author: Thomas Tauber-Marshall <tmarshall@cloudera.com>
        Date: Mon Oct 10 10:32:55 2016 -0700

        IMPALA-3002/IMPALA-1473: Cardinality observability cleanup

        IMPALA-3002:
        The shell prints an incorrect value for '#Rows' in the exec
        summary for broadcast nodes due to incorrect logic around
        whether to use max or agg stats. This patch makes the behavior
        consistent with the way the be treats exec summaries in
        summary-util.cc. This incorrect logic was also duplicated in
        the impala_beeswax test framework.

        IMPALA-1473:
        When there is a merging exchange with a limit, we may copy rows
        into the output batch beyond the limit. In this case, we currently
        update the output batch's size to reflect the limit, but we also
        need to update ExecNode::num_rows_returned_ or the exec summary
        may show that the exchange node returned more rows than it really
        did.

        Additionally, PlanFragmentExecutor::GetNext does not update
        rows_produced_counter_ in some cases, leading the runtime profile
        to display an incorrect value for 'RowsProduced'.

        Change-Id: I386719370386c9cff09b8b35d15dc712dc6480aa
        Reviewed-on: http://gerrit.cloudera.org:8080/4679
        Reviewed-by: Matthew Jacobs <mj@cloudera.com>
        Tested-by: Internal Jenkins

        Show
        twmarshall Thomas Tauber-Marshall added a comment - commit 7fad3e5dc38c1097db6be24da0cda6941f554150 Author: Thomas Tauber-Marshall <tmarshall@cloudera.com> Date: Mon Oct 10 10:32:55 2016 -0700 IMPALA-3002 / IMPALA-1473 : Cardinality observability cleanup IMPALA-3002 : The shell prints an incorrect value for '#Rows' in the exec summary for broadcast nodes due to incorrect logic around whether to use max or agg stats. This patch makes the behavior consistent with the way the be treats exec summaries in summary-util.cc. This incorrect logic was also duplicated in the impala_beeswax test framework. IMPALA-1473 : When there is a merging exchange with a limit, we may copy rows into the output batch beyond the limit. In this case, we currently update the output batch's size to reflect the limit, but we also need to update ExecNode::num_rows_returned_ or the exec summary may show that the exchange node returned more rows than it really did. Additionally, PlanFragmentExecutor::GetNext does not update rows_produced_counter_ in some cases, leading the runtime profile to display an incorrect value for 'RowsProduced'. Change-Id: I386719370386c9cff09b8b35d15dc712dc6480aa Reviewed-on: http://gerrit.cloudera.org:8080/4679 Reviewed-by: Matthew Jacobs <mj@cloudera.com> Tested-by: Internal Jenkins

          People

          • Assignee:
            twmarshall Thomas Tauber-Marshall
            Reporter:
            mmokhtar Mostafa Mokhtar
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development