diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc index 50103ba..7f5b15c 100644 --- a/be/src/runtime/data-stream-recvr.cc +++ b/be/src/runtime/data-stream-recvr.cc @@ -98,6 +98,8 @@ class DataStreamRecvr::SenderQueue { // is retrieved. scoped_ptr current_batch_; + int queue_num_buffered_bytes {0}; + // Set to true when the first batch has been received bool received_first_batch_; }; @@ -138,7 +140,7 @@ Status DataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) { DCHECK(!batch_queue_.empty()); RowBatch* result = batch_queue_.front().second; - recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first); + queue_num_buffered_bytes -= batch_queue_.front().first; VLOG_ROW << "fetched #rows=" << result->num_rows(); batch_queue_.pop_front(); data_removal__cv_.notify_one(); @@ -162,10 +164,10 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) { // received from a specific queue based on data order, and the pipeline will stall // if the merger is waiting for data from an empty queue that cannot be filled because // the limit has been reached. - while (!batch_queue_.empty() && recvr_->ExceedsLimit(batch_size) && !is_cancelled_) { + while (!batch_queue_.empty() && recvr_->ExceedsLimit(queue_num_buffered_bytes, batch_size) && !is_cancelled_) { CANCEL_SAFE_SCOPED_TIMER(recvr_->buffer_full_total_timer_, &is_cancelled_); VLOG_ROW << " wait removal: empty=" << (batch_queue_.empty() ? 1 : 0) - << " #buffered=" << recvr_->num_buffered_bytes_.Load() + << " #buffered=" << queue_num_buffered_bytes << " batch_size=" << batch_size << "\n"; // We only want one thread running the timer at any one time. Only @@ -213,7 +215,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) { VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size << "\n"; batch_queue_.push_back(make_pair(batch_size, batch)); - recvr_->num_buffered_bytes_.Add(batch_size); + queue_num_buffered_bytes += batch_size; data_arrival_cv_.notify_one(); } } @@ -293,13 +295,13 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_t total_buffer_limit_(total_buffer_limit), row_desc_(row_desc), is_merging_(is_merging), - num_buffered_bytes_(0), profile_(profile) { mem_tracker_.reset(new MemTracker(-1, "DataStreamRecvr", parent_tracker)); // Create one queue per sender if is_merging is true. int num_queues = is_merging ? num_senders : 1; sender_queues_.reserve(num_queues); int num_sender_per_queue = is_merging ? 1 : num_senders; + queue_buffer_limit = total_buffer_limit_ / num_sender_per_queue; for (int i = 0; i < num_queues; ++i) { SenderQueue* queue = sender_queue_pool_.Add(new SenderQueue(this, num_sender_per_queue, profile)); diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h index 07e9224..9cf5391 100644 --- a/be/src/runtime/data-stream-recvr.h +++ b/be/src/runtime/data-stream-recvr.h @@ -115,8 +115,8 @@ class DataStreamRecvr { /// Return true if the addition of a new batch of size 'batch_size' would exceed the /// total buffer limit. - bool ExceedsLimit(int batch_size) { - return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_; + bool ExceedsLimit(int& queue_num_buffered_bytes, int batch_size) { + return queue_num_buffered_bytes + batch_size > queue_buffer_limit; } /// DataStreamMgr instance used to create this recvr. (Not owned) @@ -131,6 +131,7 @@ class DataStreamRecvr { /// exceeds this value int total_buffer_limit_; + int queue_buffer_limit; /// Row schema, copied from the caller of CreateRecvr(). RowDescriptor row_desc_; @@ -138,9 +139,6 @@ class DataStreamRecvr { /// row batch queues are maintained in this case. bool is_merging_; - /// total number of bytes held across all sender queues. - AtomicInt32 num_buffered_bytes_; - /// Memtracker for batches in the sender queue(s). boost::scoped_ptr mem_tracker_;