Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-3286

Add software prefetching to hash table build and probe

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • Impala 2.5.0
    • Impala 2.6.0
    • Backend

    Description

      Add software prefetching to speedup Hash table build and probe for joins and aggregations.

      Sample code with prefetching

      Status PartitionedAggregationNode::Open(RuntimeState* state) {
        SCOPED_TIMER(runtime_profile_->total_time_counter());
        RETURN_IF_ERROR(ExecNode::Open(state));
      
        RETURN_IF_ERROR(Expr::Open(grouping_expr_ctxs_, state));
        RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state));
      
        DCHECK_EQ(aggregate_evaluators_.size(), agg_fn_ctxs_.size());
        for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
          RETURN_IF_ERROR(aggregate_evaluators_[i]->Open(state, agg_fn_ctxs_[i]));
        }
      
        RETURN_IF_ERROR(children_[0]->Open(state));
      
        // Streaming preaggregations do all processing in GetNext().
        if (is_streaming_preagg_) return Status::OK();
      
        RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
        // Read all the rows from the child and process them.
        bool eos = false;
        do {
          RETURN_IF_CANCELLED(state);
          RETURN_IF_ERROR(QueryMaintenance(state));
          RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
      
          if (UNLIKELY(VLOG_ROW_IS_ON)) {
            for (int i = 0; i < batch.num_rows(); ++i) {
              TupleRow* row = batch.GetRow(i);
              VLOG_ROW << "input row: " << PrintRow(row, children_[0]->row_desc());
            }
          }
      
          HashTableCtx* ht_ctx = ht_ctx_.get();
          if (enable_prefetch && (ht_ctx != NULL)) {
            int num_rows = batch.num_rows();
            for (int i = 0; i < num_rows; ++i) {
              uint32_t hash_value;
              int32_t probe_value = ht_ctx->GetIntCol(batch.GetRow(i));
              ht_ctx->HashQuickInt(probe_value, &hash_value);
              Partition* dst_partition = hash_partitions_[hash_value >> (32 - NUM_PARTITIONING_BITS)];
              if (dst_partition->is_spilled())
                continue;
              HashTable* ht = dst_partition->hash_tbl.get();
              ht->Prefetch(hash_value);
            }
          }
      
          SCOPED_TIMER(build_timer_);
          if (process_row_batch_fn_ != NULL) {
            RETURN_IF_ERROR(process_row_batch_fn_(this, &batch, ht_ctx_.get()));
          } else if (grouping_expr_ctxs_.empty()) {
            RETURN_IF_ERROR(ProcessBatchNoGrouping(&batch));
          } else {
            // There is grouping, so we will do partitioned aggregation.
            RETURN_IF_ERROR(ProcessBatch<false>(&batch, ht_ctx_.get()));
          }
      

      Attachments

        Issue Links

          Activity

            People

              kwho Michael Ho
              mmokhtar Mostafa Mokhtar
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: