From e7682e69443f667aa957bf8a64e6881b1f3247ee Mon Sep 17 00:00:00 2001 From: Tim Armstrong Date: Thu, 28 Jul 2016 23:18:24 -0700 Subject: [PATCH] IMPALA-3936: test for BBM bug --- be/src/runtime/buffered-block-mgr-test.cc | 63 +++++++++++++++++++++++++++++++ be/src/runtime/buffered-block-mgr.cc | 28 +++++++++++++- be/src/runtime/buffered-block-mgr.h | 6 +++ 3 files changed, 95 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc index 90a63d6..f1456c5 100644 --- a/be/src/runtime/buffered-block-mgr-test.cc +++ b/be/src/runtime/buffered-block-mgr-test.cc @@ -771,6 +771,69 @@ TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) { TearDownMgrs(); } +// This exercises a code path where: +// 1. A block A is unpinned. +// 2. A block B is unpinned. +// 3. A write for block A is initiated. +// 4. Block A is pinned. +// 5. Block B is pinned, with block A passed in to be deleted. +// Block A's buffer will be transferred to block B. +// 6. The write for block A completes. +// Previously there was a bug where the buffer transfer happened before the write +// completed. +TEST_F(BufferedBlockMgrTest, TransferBufferDuringWrite) { + const int trials = 10; + const int max_num_buffers = 2; + BufferedBlockMgr::Client* client; + RuntimeState* query_state; + BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_, + 1, false, client_tracker_.get(), &client, &query_state); + + // Force writes to be delayed to enlarge window of opportunity for bug. + block_mgr->set_debug_write_delay_ms(100); + + for (int trial = 0; trial < trials; ++trial) { + vector blocks; + AllocateBlocks(block_mgr, client, 2, &blocks); + + // Force the second block to be written and have its buffer freed. + // We only have one buffer to share between the first and second blocks now. + LOG(INFO) << ""; + LOG(INFO) << "Iter"; + LOG(INFO) << "Unpin " << blocks[1]; + ASSERT_OK(blocks[1]->Unpin()); + + // Create another client. Reserving different numbers of buffers can send it + // down different code paths because the original client is entitled to different + // number of buffers. + // int reserved_buffers = trial % 3; + int reserved_buffers = 2; + BufferedBlockMgr::Client* tmp_client; + EXPECT_TRUE(block_mgr->RegisterClient("tmp_client", reserved_buffers, false, + client_tracker_.get(), query_state, &tmp_client).ok()); + BufferedBlockMgr::Block* tmp_block; + ASSERT_OK(block_mgr->GetNewBlock(tmp_client, NULL, &tmp_block)); + LOG(INFO) << "Flushed " << blocks[1]; + + // Initiate the write, repin the bloc, then immediately try to swap the buffer to + // the second block while the write is still in flight. + LOG(INFO) << "Unpin " << blocks[0]; + ASSERT_OK(blocks[0]->Unpin()); + bool pinned; + ASSERT_OK(blocks[0]->Pin(&pinned)); + ASSERT_TRUE(pinned); + + LOG(INFO) << "About to Pin " << blocks[1]; + ASSERT_OK(blocks[1]->Pin(&pinned, blocks[0], false)); + ASSERT_TRUE(pinned); + LOG(INFO) << "Pinned " << blocks[1]; + + blocks[1]->Delete(); + tmp_block->Delete(); + block_mgr->ClearReservations(tmp_client); + } +} + // Test that all APIs return cancelled after close. TEST_F(BufferedBlockMgrTest, Close) { int max_num_buffers = 5; diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc index a9e95cd..20a160a 100644 --- a/be/src/runtime/buffered-block-mgr.cc +++ b/be/src/runtime/buffered-block-mgr.cc @@ -221,7 +221,8 @@ BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr is_cancelled_(false), writes_issued_(0), encryption_(FLAGS_disk_spill_encryption), - check_integrity_(FLAGS_disk_spill_encryption) { + check_integrity_(FLAGS_disk_spill_encryption), + debug_write_delay_ms_(0); { } Status BufferedBlockMgr::Create(RuntimeState* state, MemTracker* parent, @@ -487,14 +488,21 @@ Status BufferedBlockMgr::TransferBuffer(Block* dst, Block* src, bool unpin) { DCHECK(src != NULL); unique_lock lock(lock_); - // First write out the src block. DCHECK(src->is_pinned_); DCHECK(!dst->is_pinned_); DCHECK(dst->buffer_desc_ == NULL); DCHECK_EQ(src->buffer_desc_->len, max_block_size_); + + LOG(INFO) << "TransferBuffer() " << src << " -> " << dst << " src->in_write_: " << src->in_write_; + + // Ensure that there aren't any writes in flight for 'src'. + while (src->in_write_) { + src->write_complete_cv_.wait(lock); + } src->is_pinned_ = false; if (unpin) { + // First write out the src block so we can grab its buffer. src->client_local_ = true; status = WriteUnpinnedBlock(src); if (!status.ok()) { @@ -624,8 +632,10 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_blo // Block was not evicted or had no data, nothing left to do. if (in_mem || block->valid_data_len_ == 0) { + LOG(INFO) << "PinBlock found buffer"; return DeleteOrUnpinBlock(release_block, unpin); } + LOG(INFO) << "PinBlock didn't find buffer"; if (!block->is_pinned_) { if (release_block == NULL) return Status::OK(); @@ -753,6 +763,7 @@ Status BufferedBlockMgr::WriteUnpinnedBlocks() { } Status BufferedBlockMgr::WriteUnpinnedBlock(Block* block) { + LOG(INFO) << "WriteUnpinnedBlock " << block; // Assumes block manager lock is already taken. DCHECK(!block->is_pinned_) << block->DebugString(); DCHECK(!block->in_write_) << block->DebugString(); @@ -833,12 +844,17 @@ Status BufferedBlockMgr::AllocateScratchSpace(int64_t block_size, } void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) { +#ifndef NDEBUG + if (debug_write_delay_ms_) sleep(debug_write_delay_ms_ / 1.0); +#endif Status status = Status::OK(); lock_guard lock(lock_); + LOG(INFO) << "WriteComplete " << block; outstanding_writes_counter_->Add(-1); DCHECK(Validate()) << endl << DebugInternal(); DCHECK(is_cancelled_ || block->in_write_) << "WriteComplete() for block not in write." << endl << block->DebugString(); + DCHECK(block->buffer_desc_ != NULL); if (!block->client_local_) { DCHECK_GT(non_local_outstanding_writes_, 0) << block->DebugString(); --non_local_outstanding_writes_; @@ -909,6 +925,8 @@ void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) { is_cancelled_ = true; buffer_available_cv_.notify_all(); } + + LOG(INFO) << "WriteComplete exiting " << block; } void BufferedBlockMgr::DeleteBlock(Block* block) { @@ -980,6 +998,7 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) { *in_mem = false; unique_lock l(lock_); + LOG(INFO) << "Acquired lock in FindBufferForBlock"; if (is_cancelled_) return Status::CANCELLED; // First check if there is enough reserved memory to satisfy this request. @@ -1001,6 +1020,7 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) { // only happens if the buffer has not already been allocated by the block mgr. // This check should ensure that the memory cannot be consumed by another client // of the block mgr. + LOG(INFO) << "HERE"; return Status::OK(); } @@ -1026,7 +1046,9 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) { *in_mem = true; } else { BufferDescriptor* buffer_desc = NULL; + LOG(INFO) << "Calling FindBuffer"; RETURN_IF_ERROR(FindBuffer(l, &buffer_desc)); + LOG(INFO) << "Returned from FindBuffer"; if (buffer_desc == NULL) { // There are no free buffers or blocks we can evict. We need to fail this request. @@ -1093,6 +1115,7 @@ Status BufferedBlockMgr::FindBuffer(unique_lock& lock, return Status::OK(); } + LOG(INFO) << "HERE"; // Second, try to pick a buffer from the free list. if (free_io_buffers_.empty()) { // There are no free buffers. If spills are disabled or there no unpinned blocks we @@ -1113,6 +1136,7 @@ Status BufferedBlockMgr::FindBuffer(unique_lock& lock, // Third, this block needs to use a buffer that was unpinned from another block. // Get a free buffer from the front of the queue and assign it to the block. do { + LOG(INFO) << "HERE"; if (unpinned_blocks_.empty() && non_local_outstanding_writes_ == 0) { return Status::OK(); } diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h index 141943f..9a1cd8b 100644 --- a/be/src/runtime/buffered-block-mgr.h +++ b/be/src/runtime/buffered-block-mgr.h @@ -505,6 +505,8 @@ class BufferedBlockMgr { bool Validate() const; std::string DebugInternal() const; + void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; } + /// Size of the largest/default block in bytes. const int64_t max_block_size_; @@ -664,6 +666,10 @@ class BufferedBlockMgr { /// and hence no real reason to keep this separate from encryption. When true, blocks /// will have an integrity check (SHA-256) performed after being read from disk. const bool check_integrity_; + + /// Debug option to delay write completion. + int debug_write_delay_ms_; + }; // class BufferedBlockMgr } // namespace impala. -- 2.5.0