Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-5168

Codegen hash computation in DataStreamSender::Send for partition exchange.

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, Impala 2.10.0, Impala 2.11.0
    • Impala 2.13.0, Impala 3.1.0
    • Backend
    • ghx-label-9

    Description

      Hash partition computation for exchange operators can benefit from codegen, profile data ~20% of CPU in the fragment thread is consumed by RawValue::GetHashValueFnv & ExprContext::GetValue

          // hash-partition batch's rows across channels
          int num_channels = channels_.size();
          for (int i = 0; i < batch->num_rows(); ++i) {
            TupleRow* row = batch->GetRow(i);
            uint32_t hash_val = HashUtil::FNV_SEED;
            for (int i = 0; i < partition_expr_ctxs_.size(); ++i) {
              ExprContext* ctx = partition_expr_ctxs_[i];
              void* partition_val = ctx->GetValue(row);
              // We can't use the crc hash function here because it does not result
              // in uncorrelated hashes with different seeds.  Instead we must use
              // fnv hash.
              // TODO: fix crc hash/GetHashValue()
              hash_val =
                  RawValue::GetHashValueFnv(partition_val, ctx->root()->type(), hash_val);
            }
            ExprContext::FreeLocalAllocations(partition_expr_ctxs_);
            RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
          }
      
      Function Stack Effective Time %
      Total 100%
      clone 99%
      start_thread 99%
      thread_proxy 99%
      boost::detail::thread_data<boost::_bi::bind_t<>::run 99%
      boost::_bi::bind_t<void, void (), ::operator() 99%
      operator()<void (const std::basic_string< 99%
      impala::Thread::SuperviseThread 99%
      boost::function0<void>::operator() 99%
      impala::QueryExecMgr::ExecFInstance 99%
      impala::FragmentInstanceState::Exec 99%
      impala::PlanFragmentExecutor::Exec 99%
      impala::PlanFragmentExecutor::ExecInternal 96%
      impala::DataStreamSender::Send 91%
      impala::DataStreamSender::Channel::AddRow 56%
      impala::RawValue::GetHashValueFnv 11%
      impala::ExprContext::GetValue 11%
      impala::ExprContext::FreeLocalAllocations 6%
      impala::RowBatch::GetRow 1%
      std::vector<impala::ExprContext*, std::allocator<impala::ExprContext*>>::size 1%
      impala::Expr::type 0%
      impala::ExprContext::GetValue 0%
      impala::RuntimeState::CheckQueryState 0%
      impala::HdfsScanNode::GetNext 3%
      impala::RowBatch::Reset 1%
      Status 0%
      ~ScopedTimer 0%
      [Unknown stack frame(s)] 4%

      Query used in repro

      select /* +straight_join */  count(*) 
      from store_sales a join   /* +shuffle */ 
           store_returns b on 
      a.ss_item_sk = b.sr_item_sk 
         where a.ss_ticket_number = b.sr_ticket_number and ss_sold_date_sk between 2450816 and 2451500  and sr_returned_date_sk between 2450816 and 2451500
         group by a.ss_ticket_number 
         having count(*) > 9999999999
      

      Explain plan

      +------------------------------------------------------------------------------------------+
      | Explain String                                                                           |
      +------------------------------------------------------------------------------------------+
      | Estimated Per-Host Requirements: Memory=3.43GB VCores=3                                  |
      |                                                                                          |
      | PLAN-ROOT SINK                                                                           |
      | |                                                                                        |
      | 08:EXCHANGE [UNPARTITIONED]                                                              |
      | |                                                                                        |
      | 07:AGGREGATE [FINALIZE]                                                                  |
      | |  output: count:merge(*)                                                                |
      | |  group by: a.ss_ticket_number                                                          |
      | |  having: count(*) > 9999999999                                                         |
      | |                                                                                        |
      | 06:EXCHANGE [HASH(a.ss_ticket_number)]                                                   |
      | |                                                                                        |
      | 03:AGGREGATE [STREAMING]                                                                 |
      | |  output: count(*)                                                                      |
      | |  group by: a.ss_ticket_number                                                          |
      | |                                                                                        |
      | 02:HASH JOIN [INNER JOIN, PARTITIONED]                                                   |
      | |  hash predicates: a.ss_item_sk = b.sr_item_sk, a.ss_ticket_number = b.sr_ticket_number |
      | |  runtime filters: RF000 <- b.sr_item_sk, RF001 <- b.sr_ticket_number                   |
      | |                                                                                        |
      | |--05:EXCHANGE [HASH(b.sr_item_sk,b.sr_ticket_number)]                                   |
      | |  |                                                                                     |
      | |  01:SCAN HDFS [tpcds_3000_parquet.store_returns b]                                     |
      | |     partitions=681/2004 files=681 size=13.73GB                                         |
      | |                                                                                        |
      | 04:EXCHANGE [HASH(a.ss_item_sk,a.ss_ticket_number)]                                      |
      | |                                                                                        |
      | 00:SCAN HDFS [tpcds_3000_parquet.store_sales a]                                          |
      |    partitions=683/1824 files=944 size=140.19GB                                           |
      |    runtime filters: RF000 -> a.ss_item_sk, RF001 -> a.ss_ticket_number                   |
      +------------------------------------------------------------------------------------------+
      

      Attachments

        Issue Links

          Activity

            People

              kwho Michael Ho
              mmokhtar Mostafa Mokhtar
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: