diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc index 1ddf368..3d3b0e7 100644 --- a/be/src/exec/partitioned-aggregation-node-ir.cc +++ b/be/src/exec/partitioned-aggregation-node-ir.cc @@ -26,8 +26,10 @@ using namespace impala; Status PartitionedAggregationNode::ProcessBatchNoGrouping( RowBatch* batch, HashTableCtx* ht_ctx) { - for (int i = 0; i < batch->num_rows(); ++i) { - UpdateTuple(&agg_fn_ctxs_[0], singleton_output_tuple_, batch->GetRow(i)); + int num_rows = batch->num_rows(); + int num_tuples_per_row = batch->num_tuples_per_row(); + for (int i = 0; i < num_rows; ++i) { + UpdateTuple(&agg_fn_ctxs_[0], singleton_output_tuple_, batch->GetRow(i, num_tuples_per_row)); } return Status::OK(); } @@ -43,10 +45,11 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch, HashTableCtx* h // TODO: Once we have a histogram with the number of rows per partition, we will have // accurate resize calls. int num_rows = batch->num_rows(); + int num_tuples_per_row = batch->num_tuples_per_row(); RETURN_IF_ERROR(CheckAndResizeHashPartitions(num_rows, ht_ctx)); for (int i = 0; i < num_rows; ++i) { - RETURN_IF_ERROR(ProcessRow(batch->GetRow(i), ht_ctx)); + RETURN_IF_ERROR(ProcessRow(batch->GetRow(i, num_tuples_per_row), ht_ctx)); } return Status::OK(); @@ -147,8 +150,11 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize, DCHECK_EQ(out_batch->num_rows(), 0); DCHECK_LE(in_batch->num_rows(), out_batch->capacity()); - for (int i = 0; i < in_batch->num_rows(); ++i) { - TupleRow* in_row = in_batch->GetRow(i); + int num_rows = in_batch->num_rows(); + int num_tuples_per_row = in_batch->num_tuples_per_row(); + + for (int i = 0; i < num_rows; ++i) { + TupleRow* in_row = in_batch->GetRow(i, num_tuples_per_row); uint32_t hash; if (!ht_ctx_->EvalAndHashProbe(in_row, &hash)) continue; const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc index 5a13a87..7b4fd69 100644 --- a/be/src/exec/partitioned-hash-join-node-ir.cc +++ b/be/src/exec/partitioned-hash-join-node-ir.cc @@ -44,8 +44,12 @@ int PartitionedHashJoinNode::ProcessProbeBatch( ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; const int num_conjuncts = conjunct_ctxs_.size(); + int probe_num_rows = probe_batch_->num_rows(); + int probe_num_tuples_per_row = probe_batch_->num_tuples_per_row(); + int out_num_tuples_per_row = out_batch->num_tuples_per_row(); + DCHECK(!out_batch->AtCapacity()); - TupleRow* out_row = out_batch->GetRow(out_batch->AddRow()); + TupleRow* out_row = out_batch->GetRow(out_batch->AddRow(), out_num_tuples_per_row); const int max_rows = out_batch->capacity() - out_batch->num_rows(); int num_rows_added = 0; @@ -168,14 +172,14 @@ next_row: // moving to the row. DCHECK(hash_tbl_iterator_.AtEnd()); - if (UNLIKELY(probe_batch_pos_ == probe_batch_->num_rows())) { + if (UNLIKELY(probe_batch_pos_ == probe_num_rows)) { // Finished this batch. current_probe_row_ = NULL; goto end; } // Establish current_probe_row_ and find its corresponding partition. - current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++); + current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++, probe_num_tuples_per_row); matched_probe_ = false; uint32_t hash; if (!ht_ctx->EvalAndHashProbe(current_probe_row_, &hash)) { @@ -252,9 +256,11 @@ int PartitionedHashJoinNode::ProcessProbeBatch( } Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch) { - for (int i = 0; i < build_batch->num_rows(); ++i) { + int num_rows = build_batch->num_rows(); + int num_tuples_per_row = build_batch->num_tuples_per_row(); + for (int i = 0; i < num_rows; ++i) { DCHECK(buildStatus_.ok()); - TupleRow* build_row = build_batch->GetRow(i); + TupleRow* build_row = build_batch->GetRow(i, num_tuples_per_row); uint32_t hash; if (!ht_ctx_->EvalAndHashBuild(build_row, &hash)) { if (null_aware_partition_ != NULL) { diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index b6becb9..1dfe685 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -140,6 +140,14 @@ class RowBatch { return reinterpret_cast(tuple_ptrs_ + row_idx * num_tuples_per_row_); } + TupleRow* IR_ALWAYS_INLINE GetRow(int row_idx, int num_tuples_per_row) { + DCHECK(tuple_ptrs_ != NULL); + DCHECK_GE(row_idx, 0); + DCHECK_LT(row_idx, capacity_); + DCHECK_EQ(num_tuples_per_row, num_tuples_per_row_); + return reinterpret_cast(tuple_ptrs_ + row_idx * num_tuples_per_row); + } + int row_byte_size() { return num_tuples_per_row_ * sizeof(Tuple*); } MemPool* tuple_data_pool() { return &tuple_data_pool_; } int num_io_buffers() const { return io_buffers_.size(); } @@ -221,6 +229,7 @@ class RowBatch { int IR_ALWAYS_INLINE capacity() const { return capacity_; } const RowDescriptor& row_desc() const { return row_desc_; } + int num_tuples_per_row() const { return num_tuples_per_row_; } /// Max memory that this row batch can accumulate before it is considered at capacity. /// This is a soft capacity: row batches may exceed the capacity, preferably only by a