From 73ac73ba1f1d0cc23d61055b78b9f7810ffc7789 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Thu, 22 Oct 2015 20:24:48 -0700 Subject: [PATCH] HBASE-14683 Fix Batching in buffered mutator is awful when adding lists of mutations. Summary: Send the list of mutations to AsyncProcess only after done adding the list otherwise there's a lot of contention Test Plan: UnitTests. Differential Revision: https://reviews.facebook.net/D49251 --- .../hadoop/hbase/client/BufferedMutatorImpl.java | 48 ++++++++-------------- 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 0b222b1..07f8066 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -98,48 +99,31 @@ public class BufferedMutatorImpl implements BufferedMutator { @Override public synchronized void mutate(Mutation m) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - doMutate(m); + mutate(Arrays.asList(m)); } @Override public synchronized void mutate(List ms) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - for (Mutation m : ms) { - doMutate(m); - } - } - - /** - * Add the put to the buffer. If the buffer is already too large, sends the buffer to the - * cluster. - * - * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster. - * @throws InterruptedIOException if we were interrupted. - */ - private void doMutate(Mutation m) throws InterruptedIOException, - RetriesExhaustedWithDetailsException { if (closed) { throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); } - if (!(m instanceof Put) && !(m instanceof Delete)) { - throw new IllegalArgumentException("Pass a Delete or a Put"); - } - - // This behavior is highly non-intuitive... it does not protect us against - // 94-incompatible behavior, which is a timing issue because hasError, the below code - // and setter of hasError are not synchronized. Perhaps it should be removed. - if (ap.hasError()) { - writeAsyncBuffer.add(m); - backgroundFlushCommits(true); - } - if (m instanceof Put) { - validatePut((Put) m); + for (Mutation m : ms) { + // This behavior is highly non-intuitive... it does not protect us against + // 94-incompatible behavior, which is a timing issue because hasError, the below code + // and setter of hasError are not synchronized. Perhaps it should be removed. + if (ap.hasError()) { + writeAsyncBuffer.add(m); + backgroundFlushCommits(true); + } else { + if (m instanceof Put) { + validatePut((Put) m); + } + writeAsyncBuffer.add(m); + currentWriteBufferSize += m.heapSize(); + } } - - currentWriteBufferSize += m.heapSize(); - writeAsyncBuffer.add(m); - while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); } -- 2.6.1