From 26dbd2d43e1a7d5fcdbb65c6fda0d4c157871e02 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Thu, 22 Oct 2015 20:24:48 -0700 Subject: [PATCH] HBASE-14683 Fix Batching in buffered mutator is awful when adding lists of mutations. Summary: Send the list of mutations to AsyncProcess only after done adding the list otherwise there's a lot of contention Test Plan: UnitTests. Differential Revision: https://reviews.facebook.net/D49251 --- .../hadoop/hbase/client/BufferedMutatorImpl.java | 47 ++++++++-------------- 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 0b222b1..42c123d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -98,48 +99,34 @@ public class BufferedMutatorImpl implements BufferedMutator { @Override public synchronized void mutate(Mutation m) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - doMutate(m); + mutate(Arrays.asList(m)); } @Override public synchronized void mutate(List ms) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - for (Mutation m : ms) { - doMutate(m); - } - } - - /** - * Add the put to the buffer. If the buffer is already too large, sends the buffer to the - * cluster. - * - * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster. - * @throws InterruptedIOException if we were interrupted. - */ - private void doMutate(Mutation m) throws InterruptedIOException, - RetriesExhaustedWithDetailsException { if (closed) { throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); } - if (!(m instanceof Put) && !(m instanceof Delete)) { - throw new IllegalArgumentException("Pass a Delete or a Put"); - } - // This behavior is highly non-intuitive... it does not protect us against - // 94-incompatible behavior, which is a timing issue because hasError, the below code - // and setter of hasError are not synchronized. Perhaps it should be removed. - if (ap.hasError()) { - writeAsyncBuffer.add(m); - backgroundFlushCommits(true); - } + for (Mutation m : ms) { + if (m instanceof Put) { + validatePut((Put) m); + } - if (m instanceof Put) { - validatePut((Put) m); + // This behavior is highly non-intuitive... it does not protect us against + // 94-incompatible behavior, which is a timing issue because hasError, the below code + // and setter of hasError are not synchronized. Perhaps it should be removed. + if (ap.hasError()) { + writeAsyncBuffer.add(m); + backgroundFlushCommits(true); + } else { + writeAsyncBuffer.add(m); + currentWriteBufferSize += m.heapSize(); + } } - currentWriteBufferSize += m.heapSize(); - writeAsyncBuffer.add(m); - + // Now try and queue what needs to be queued. while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); } -- 2.6.1