From fe1f7627b8992b3efad16d320b0a15c2d4d493b1 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Mon, 26 Oct 2015 14:47:36 -0700 Subject: [PATCH] Testing unlock Summary: Use concurrent collections and atomic longs to keep track of edits in buffered mutator. This keeps buffered mutator as thread safe but it means that shared buffered mutators are not contending on thread locking. Test Plan: Unit Tests. Differential Revision: https://reviews.facebook.net/D49467 --- .../hadoop/hbase/client/BufferedMutatorImpl.java | 140 ++++++++++++++++----- .../hadoop/hbase/client/TestAsyncProcess.java | 30 +---- 2 files changed, 110 insertions(+), 60 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 5341d47..afce1c2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -30,8 +30,12 @@ import java.io.InterruptedIOException; import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** *

@@ -55,13 +59,18 @@ public class BufferedMutatorImpl implements BufferedMutator { protected ClusterConnection connection; // non-final so can be overridden in test private final TableName tableName; private volatile Configuration conf; + @VisibleForTesting - List writeAsyncBuffer = new LinkedList<>(); + ConcurrentLinkedQueue writeAsyncBuffer = new ConcurrentLinkedQueue(); + protected AtomicLong currentWriteBufferSize = new AtomicLong(0); + ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock(); + private long writeBufferSize; private final int maxKeyValueSize; - protected long currentWriteBufferSize = 0; private boolean closed = false; private final ExecutorService pool; + + @VisibleForTesting protected AsyncProcess ap; // non-final so can be overridden in test BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, @@ -97,39 +106,72 @@ public class BufferedMutatorImpl implements BufferedMutator { } @Override - public synchronized void mutate(Mutation m) throws InterruptedIOException, + public void mutate(Mutation m) throws InterruptedIOException, RetriesExhaustedWithDetailsException { mutate(Arrays.asList(m)); } @Override - public synchronized void mutate(List ms) throws InterruptedIOException, + public void mutate(List ms) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - if (closed) { - throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); - } - for (Mutation m : ms) { - if (m instanceof Put) { - validatePut((Put) m); + // For the most part this will only get the read lock. + // That should allow lots of different threads to + Lock readLock = bufferLock.readLock(); + readLock.lock(); + + Lock writeLock = null; + try { + if (closed) { + throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); } - currentWriteBufferSize += m.heapSize(); - } + + + long toAddSize = 0; + for (Mutation m : ms) { + if (m instanceof Put) { + validatePut((Put) m); + } + toAddSize += m.heapSize(); + } + // This behavior is highly non-intuitive... it does not protect us against // 94-incompatible behavior, which is a timing issue because hasError, the below code // and setter of hasError are not synchronized. Perhaps it should be removed. if (ap.hasError()) { + currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); backgroundFlushCommits(true); } else { + currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); } + boolean alreadyFlushed = false; + // Now try and queue what needs to be queued. + while (currentWriteBufferSize.get() > writeBufferSize) { + // If other threads keep adding so much that this thread is stuck grab write lock. + // This will block others but it should force them to stop adding. + if (alreadyFlushed && writeLock == null) { + readLock.unlock(); + readLock = null; - // Now try and queue what needs to be queued. - while (currentWriteBufferSize > writeBufferSize) { - backgroundFlushCommits(false); + writeLock = bufferLock.writeLock(); + writeLock.lock(); + } + + alreadyFlushed = true; + backgroundFlushCommits(false); + } + + } finally { + if ( readLock != null) { + readLock.unlock(); + } + if (writeLock != null) { + writeLock.unlock(); + } } } @@ -139,11 +181,14 @@ public class BufferedMutatorImpl implements BufferedMutator { } @Override - public synchronized void close() throws IOException { - if (this.closed) { - return; - } + public void close() throws IOException { + Lock lock = bufferLock.writeLock(); + lock.lock(); + try { + if (this.closed) { + return; + } // As we can have an operation in progress even if the buffer is empty, we call // backgroundFlushCommits at least one time. backgroundFlushCommits(true); @@ -159,19 +204,29 @@ public class BufferedMutatorImpl implements BufferedMutator { break; } } while (!terminated); + } catch (InterruptedException e) { LOG.warn("waitForTermination interrupted"); + } finally { this.closed = true; + lock.unlock(); } } @Override - public synchronized void flush() throws InterruptedIOException, + public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException { - // As we can have an operation in progress even if the buffer is empty, we call - // backgroundFlushCommits at least one time. - backgroundFlushCommits(true); + Lock lock = bufferLock.writeLock(); + lock.lock(); + + try { + // As we can have an operation in progress even if the buffer is empty, we call + // backgroundFlushCommits at least one time. + backgroundFlushCommits(true); + } finally { + lock.unlock(); + } } /** @@ -184,17 +239,38 @@ public class BufferedMutatorImpl implements BufferedMutator { */ private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException { + + LinkedList buffer = new LinkedList<>(); + // Keep track of the size so that this thread doesn't spin forever + long dequeuedSize = 0; + try { + // Grab all of the available mutations. + Mutation m = null; + + while ( + (writeBufferSize <= 0 || dequeuedSize < writeBufferSize) + && (m = writeAsyncBuffer.poll()) != null ) { + buffer.add(m); + long size = m.heapSize(); + dequeuedSize += size; + currentWriteBufferSize.addAndGet(-size); + } + + if (!synchronous && dequeuedSize == 0) { + return; + } + if (!synchronous) { - ap.submit(tableName, writeAsyncBuffer, true, null, false); + ap.submit(tableName, buffer, true, null, false); if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); } } if (synchronous || ap.hasError()) { - while (!writeAsyncBuffer.isEmpty()) { - ap.submit(tableName, writeAsyncBuffer, true, null, false); + while (!buffer.isEmpty()) { + ap.submit(tableName, buffer, true, null, false); } RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); if (error != null) { @@ -206,11 +282,11 @@ public class BufferedMutatorImpl implements BufferedMutator { } } } finally { - currentWriteBufferSize = 0; - for (Row mut : writeAsyncBuffer) { - if (mut instanceof Mutation) { - currentWriteBufferSize += ((Mutation) mut).heapSize(); - } + for (Mutation mut : buffer) { + long size = mut.heapSize(); + currentWriteBufferSize.addAndGet(size); + dequeuedSize -= size; + writeAsyncBuffer.add(mut); } } } @@ -224,7 +300,7 @@ public class BufferedMutatorImpl implements BufferedMutator { public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException, InterruptedIOException { this.writeBufferSize = writeBufferSize; - if (currentWriteBufferSize > writeBufferSize) { + if (currentWriteBufferSize.get() > writeBufferSize) { flush(); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index a20ca4f..f616e35 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -698,7 +698,7 @@ public class TestAsyncProcess { Put put = createPut(1, false); - Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); try { ht.put(put); if (bufferOn) { @@ -707,7 +707,7 @@ public class TestAsyncProcess { Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); // The table should have sent one request, maybe after multiple attempts AsyncRequestFuture ars = null; for (AsyncRequestFuture someReqs : ap.allReqs) { @@ -760,32 +760,6 @@ public class TestAsyncProcess { Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size()); } - -/* - @Test - public void testWithNoClearOnFail() throws IOException { - HTable ht = new HTable(); - ht.ap = new MyAsyncProcess(createHConnection(), conf, true); - ht.setAutoFlushTo(false); - - Put p = createPut(1, false); - ht.put(p); - Assert.assertEquals(0, ht.writeAsyncBuffer.size()); - - try { - ht.flushCommits(); - } catch (RetriesExhaustedWithDetailsException expected) { - } - Assert.assertEquals(1, ht.writeAsyncBuffer.size()); - - try { - ht.close(); - } catch (RetriesExhaustedWithDetailsException expected) { - } - Assert.assertEquals(1, ht.writeAsyncBuffer.size()); - } - */ - @Test public void testBatch() throws IOException, InterruptedException { ClusterConnection conn = new MyConnectionImpl(conf); -- 2.6.1