Details
-
Improvement
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
Impala 2.5.0
Description
Add software prefetching to speedup Hash table build and probe for joins and aggregations.
Sample code with prefetching
Status PartitionedAggregationNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Open(state)); RETURN_IF_ERROR(Expr::Open(grouping_expr_ctxs_, state)); RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state)); DCHECK_EQ(aggregate_evaluators_.size(), agg_fn_ctxs_.size()); for (int i = 0; i < aggregate_evaluators_.size(); ++i) { RETURN_IF_ERROR(aggregate_evaluators_[i]->Open(state, agg_fn_ctxs_[i])); } RETURN_IF_ERROR(children_[0]->Open(state)); // Streaming preaggregations do all processing in GetNext(). if (is_streaming_preagg_) return Status::OK(); RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker()); // Read all the rows from the child and process them. bool eos = false; do { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos)); if (UNLIKELY(VLOG_ROW_IS_ON)) { for (int i = 0; i < batch.num_rows(); ++i) { TupleRow* row = batch.GetRow(i); VLOG_ROW << "input row: " << PrintRow(row, children_[0]->row_desc()); } } HashTableCtx* ht_ctx = ht_ctx_.get(); if (enable_prefetch && (ht_ctx != NULL)) { int num_rows = batch.num_rows(); for (int i = 0; i < num_rows; ++i) { uint32_t hash_value; int32_t probe_value = ht_ctx->GetIntCol(batch.GetRow(i)); ht_ctx->HashQuickInt(probe_value, &hash_value); Partition* dst_partition = hash_partitions_[hash_value >> (32 - NUM_PARTITIONING_BITS)]; if (dst_partition->is_spilled()) continue; HashTable* ht = dst_partition->hash_tbl.get(); ht->Prefetch(hash_value); } } SCOPED_TIMER(build_timer_); if (process_row_batch_fn_ != NULL) { RETURN_IF_ERROR(process_row_batch_fn_(this, &batch, ht_ctx_.get())); } else if (grouping_expr_ctxs_.empty()) { RETURN_IF_ERROR(ProcessBatchNoGrouping(&batch)); } else { // There is grouping, so we will do partitioned aggregation. RETURN_IF_ERROR(ProcessBatch<false>(&batch, ht_ctx_.get())); }
Attachments
Issue Links
- breaks
-
IMPALA-9725 LEFT ANTI JOIN produces wrong result when PHJ build spills
- Resolved
- relates to
-
IMPALA-2737 Investigate partition-oriented agg and join processing
- Resolved