diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index ed20a68..e74e0d5 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -193,6 +193,11 @@ io.dropwizard.metrics metrics-core + + org.mockito + mockito-all + test + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 0349321..13e9b85 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -19,6 +19,9 @@ */ package org.apache.hadoop.hbase.client; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.io.IOException; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; @@ -50,8 +53,6 @@ import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. * Each put will be sharded into different buffer queues based on its destination region server. @@ -97,7 +98,18 @@ public class HTableMultiplexer { */ public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) throws IOException { - this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf); + this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize); + } + + /** + * @param conn The HBase connection. + * @param conf The HBase configuration + * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for + * each region server before dropping the request. + */ + public HTableMultiplexer(Connection conn, Configuration conf, + int perRegionServerBufferQueueSize) { + this.conn = (ClusterConnection) conn; this.pool = HTable.getDefaultExecutor(conf); // how many times we could try in total, one more than retry number this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, @@ -117,6 +129,18 @@ public class HTableMultiplexer { } /** + * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already + * been closed. + * @throws IOException If there is an error closing the connection. + */ + @SuppressWarnings("deprecation") + public synchronized void close() throws IOException { + if (!getConnection().isClosed()) { + getConnection().close(); + } + } + + /** * The put request will be buffered by its corresponding buffer queue. Return false if the queue * is already full. * @param tableName @@ -170,13 +194,28 @@ public class HTableMultiplexer { * @return true if the request can be accepted by its corresponding buffer queue. */ public boolean put(final TableName tableName, final Put put, int maxAttempts) { + return _put(tableName, put, maxAttempts, false); + } + + /** + * Internal "put" which exposes a boolean flag to control whether or not the region location + * cache should be reloaded when trying to queue the {@link Put}. + * @param tableName Destination table for the Put + * @param put The Put to send + * @param maxAttempts Number of attempts to retry the {@code put} + * @param reloadCache Should the region location cache be reloaded + * @return true if the request was accepted in the queue, otherwise false + */ + boolean _put(final TableName tableName, final Put put, int maxAttempts, boolean reloadCache) { if (maxAttempts <= 0) { return false; } try { HTable.validatePut(put, maxKeyValueSize); - HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); + // Allow mocking to get at the connection, but don't expose the connection to users. + ClusterConnection conn = (ClusterConnection) getConnection(); + HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache); if (loc != null) { // Add the put pair into its corresponding queue. LinkedBlockingQueue queue = getQueue(loc); @@ -215,7 +254,8 @@ public class HTableMultiplexer { return new HTableMultiplexerStatus(serverToFlushWorkerMap); } - private LinkedBlockingQueue getQueue(HRegionLocation addr) { + @VisibleForTesting + LinkedBlockingQueue getQueue(HRegionLocation addr) { FlushWorker worker = serverToFlushWorkerMap.get(addr); if (worker == null) { synchronized (this.serverToFlushWorkerMap) { @@ -232,6 +272,11 @@ public class HTableMultiplexer { return worker.getQueue(); } + @VisibleForTesting + ClusterConnection getConnection() { + return this.conn; + } + /** * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. * report the number of buffered requests and the number of the failed (dropped) requests @@ -340,10 +385,11 @@ public class HTableMultiplexer { } } - private static class PutStatus { - private final HRegionInfo regionInfo; - private final Put put; - private final int maxAttempCount; + @VisibleForTesting + static class PutStatus { + final HRegionInfo regionInfo; + final Put put; + final int maxAttempCount; public PutStatus(HRegionInfo regionInfo, Put put, int maxAttempCount) { this.regionInfo = regionInfo; @@ -392,7 +438,8 @@ public class HTableMultiplexer { } } - private static class FlushWorker implements Runnable { + @VisibleForTesting + static class FlushWorker implements Runnable { private final HRegionLocation addr; private final LinkedBlockingQueue queue; private final HTableMultiplexer multiplexer; @@ -440,7 +487,7 @@ public class HTableMultiplexer { return this.maxLatency.getAndSet(0); } - private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { + boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { // Decrease the retry count final int retryCount = ps.maxAttempCount - 1; @@ -449,10 +496,10 @@ public class HTableMultiplexer { return false; } - int cnt = retryInQueue.incrementAndGet(); - if (cnt > maxRetryInQueue) { + int cnt = getRetryInQueue().incrementAndGet(); + if (cnt > getMaxRetryInQueue()) { // Too many Puts in queue for resubmit, give up this - retryInQueue.decrementAndGet(); + getRetryInQueue().decrementAndGet(); return false; } @@ -460,22 +507,21 @@ public class HTableMultiplexer { // The currentPut is failed. So get the table name for the currentPut. final TableName tableName = ps.regionInfo.getTable(); - long delayMs = ConnectionUtils.getPauseTime(multiplexer.flushPeriod, - multiplexer.maxAttempts - retryCount - 1); + long delayMs = getNextDelay(retryCount); if (LOG.isDebugEnabled()) { LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); } - executor.schedule(new Runnable() { + getExecutor().schedule(new Runnable() { @Override public void run() { boolean succ = false; try { - succ = FlushWorker.this.multiplexer.put(tableName, failedPut, retryCount); + succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount, true); } finally { - FlushWorker.this.retryInQueue.decrementAndGet(); + FlushWorker.this.getRetryInQueue().decrementAndGet(); if (!succ) { - FlushWorker.this.totalFailedPutCount.incrementAndGet(); + FlushWorker.this.getTotalFailedPutCount().incrementAndGet(); } } } @@ -483,6 +529,37 @@ public class HTableMultiplexer { return true; } + @VisibleForTesting + long getNextDelay(int retryCount) { + return ConnectionUtils.getPauseTime(multiplexer.flushPeriod, + multiplexer.maxAttempts - retryCount - 1); + } + + @VisibleForTesting + AtomicInteger getRetryInQueue() { + return this.retryInQueue; + } + + @VisibleForTesting + int getMaxRetryInQueue() { + return this.maxRetryInQueue; + } + + @VisibleForTesting + AtomicLong getTotalFailedPutCount() { + return this.totalFailedPutCount; + } + + @VisibleForTesting + HTableMultiplexer getMultiplexer() { + return this.multiplexer; + } + + @VisibleForTesting + ScheduledExecutorService getExecutor() { + return this.executor; + } + @Override public void run() { int failedCount = 0; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java new file mode 100644 index 0000000..8e0b9a7 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTableMultiplexer.FlushWorker; +import org.apache.hadoop.hbase.client.HTableMultiplexer.PutStatus; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Category(SmallTests.class) +public class TestHTableMultiplexerViaMocks { + + private static final int NUM_RETRIES = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; + private HTableMultiplexer mockMultiplexer; + private ClusterConnection mockConnection; + private HRegionLocation mockRegionLocation; + private HRegionInfo mockRegionInfo; + + private TableName tableName; + private Put put; + + @Before + public void setupTest() { + mockMultiplexer = mock(HTableMultiplexer.class); + mockConnection = mock(ClusterConnection.class); + mockRegionLocation = mock(HRegionLocation.class); + mockRegionInfo = mock(HRegionInfo.class); + + tableName = TableName.valueOf("my_table"); + put = new Put(getBytes("row1")); + put.addColumn(getBytes("f1"), getBytes("q1"), getBytes("v11")); + put.addColumn(getBytes("f1"), getBytes("q2"), getBytes("v12")); + put.addColumn(getBytes("f2"), getBytes("q1"), getBytes("v21")); + + // Call the real put(TableName, Put, int) method + when(mockMultiplexer.put(any(TableName.class), any(Put.class), anyInt())).thenCallRealMethod(); + + // Return the mocked ClusterConnection + when(mockMultiplexer.getConnection()).thenReturn(mockConnection); + + // Return the regionInfo from the region location + when(mockRegionLocation.getRegionInfo()).thenReturn(mockRegionInfo); + + // Make sure this RegionInfo points to our table + when(mockRegionInfo.getTable()).thenReturn(tableName); + } + + @Test public void useCacheOnInitialPut() throws Exception { + mockMultiplexer.put(tableName, put, NUM_RETRIES); + + verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES, false); + } + + @Test public void nonNullLocationQueuesPut() throws Exception { + final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + + // Call the real method for _put(TableName, Put, int, boolean) + when(mockMultiplexer._put(any(TableName.class), any(Put.class), anyInt(), anyBoolean())).thenCallRealMethod(); + + // Return a region location + when(mockConnection.getRegionLocation(tableName, put.getRow(), false)).thenReturn(mockRegionLocation); + when(mockMultiplexer.getQueue(mockRegionLocation)).thenReturn(queue); + + assertTrue("Put should have been queued", mockMultiplexer.put(tableName, put, NUM_RETRIES)); + + assertEquals(1, queue.size()); + final PutStatus ps = queue.take(); + assertEquals(put, ps.put); + assertEquals(mockRegionInfo, ps.regionInfo); + } + + @Test public void ignoreCacheOnRetriedPut() throws Exception { + FlushWorker mockFlushWorker = mock(FlushWorker.class); + ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class); + final AtomicInteger retryInQueue = new AtomicInteger(0); + final AtomicLong totalFailedPuts = new AtomicLong(0L); + final int maxRetryInQueue = 20; + final long delay = 100L; + + final PutStatus ps = new PutStatus(mockRegionInfo, put, NUM_RETRIES); + + // Call the real resubmitFailedPut(PutStatus, HRegionLocation) method + when(mockFlushWorker.resubmitFailedPut(any(PutStatus.class), any(HRegionLocation.class))).thenCallRealMethod(); + // Succeed on the re-submit without caching + when(mockMultiplexer._put(tableName, put, NUM_RETRIES - 1, true)).thenReturn(true); + + // Stub out the getters for resubmitFailedPut(PutStatus, HRegionLocation) + when(mockFlushWorker.getExecutor()).thenReturn(mockExecutor); + when(mockFlushWorker.getNextDelay(anyInt())).thenReturn(delay); + when(mockFlushWorker.getMultiplexer()).thenReturn(mockMultiplexer); + when(mockFlushWorker.getRetryInQueue()).thenReturn(retryInQueue); + when(mockFlushWorker.getMaxRetryInQueue()).thenReturn(maxRetryInQueue); + when(mockFlushWorker.getTotalFailedPutCount()).thenReturn(totalFailedPuts); + + // When a Runnable is scheduled, run that Runnable + when(mockExecutor.schedule(any(Runnable.class), eq(delay), eq(TimeUnit.MILLISECONDS))).thenAnswer( + new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + // Before we run this, should have one retry in progress. + assertEquals(1L, retryInQueue.get()); + + Object[] args = invocation.getArguments(); + assertEquals(3, args.length); + assertTrue("Argument should be an instance of Runnable", args[0] instanceof Runnable); + Runnable runnable = (Runnable) args[0]; + runnable.run(); + return null; + } + }); + + // The put should be rescheduled + assertTrue("Put should have been rescheduled", mockFlushWorker.resubmitFailedPut(ps, mockRegionLocation)); + + verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES - 1, true); + assertEquals(0L, totalFailedPuts.get()); + // Net result should be zero (added one before rerunning, subtracted one after running). + assertEquals(0L, retryInQueue.get()); + } + + @SuppressWarnings("deprecation") + @Test public void testConnectionClosing() throws IOException { + doCallRealMethod().when(mockMultiplexer).close(); + // If the connection is not closed + when(mockConnection.isClosed()).thenReturn(false); + + mockMultiplexer.close(); + + // We should close it + verify(mockConnection).close(); + } + + @SuppressWarnings("deprecation") + @Test public void testClosingAlreadyClosedConnection() throws IOException { + doCallRealMethod().when(mockMultiplexer).close(); + // If the connection is already closed + when(mockConnection.isClosed()).thenReturn(true); + + mockMultiplexer.close(); + + // We should not close it again + verify(mockConnection, times(0)).close(); + } + + /** + * @return UTF-8 byte representation for {@code str} + */ + private static byte[] getBytes(String str) { + return str.getBytes(UTF_8); + } +} +