From 748fbdddb2a00c8de4a9d3678572e9483bc4cf8e Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 23 Jan 2019 21:26:56 +0800 Subject: [PATCH] HBASE-21761 Align the methods in RegionLocator and AsyncTableRegionLocator --- .../hbase/client/AsyncConnectionImpl.java | 2 +- .../hbase/client/AsyncTableRegionLocator.java | 74 +++++++- .../client/AsyncTableRegionLocatorImpl.java | 89 +++------- .../hadoop/hbase/client/RegionLocator.java | 3 + .../apache/hadoop/hbase/util/FutureUtils.java | 33 ++-- .../client/AbstractTestRegionLocator.java | 165 ++++++++++++++++++ .../client/TestAsyncTableRegionLocator.java | 116 ++++++------ .../hbase/client/TestRegionLocator.java | 133 +++----------- 8 files changed, 361 insertions(+), 254 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 3cbd950370..4f6f083c82 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -149,7 +149,7 @@ class AsyncConnectionImpl implements AsyncConnection { @Override public AsyncTableRegionLocator getRegionLocator(TableName tableName) { - return new AsyncTableRegionLocatorImpl(tableName, locator); + return new AsyncTableRegionLocatorImpl(tableName, this); } // we will override this method for testing retry caller, so do not remove this method. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java index f67204a547..321f44e87b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; /** @@ -56,7 +59,7 @@ public interface AsyncTableRegionLocator { * @param reload true to reload information or false to use cached information */ default CompletableFuture getRegionLocation(byte[] row, boolean reload) { - return getRegionLocation(row, RegionReplicaUtil.DEFAULT_REPLICA_ID, reload); + return getRegionLocation(row, RegionInfo.DEFAULT_REPLICA_ID, reload); } /** @@ -82,9 +85,78 @@ public interface AsyncTableRegionLocator { */ CompletableFuture getRegionLocation(byte[] row, int replicaId, boolean reload); + /** + * Find all the replicas for the region on which the given row is being served. + * @param row Row to find. + * @return Locations for all the replicas of the row. + * @throws IOException if a remote or network exception occurs + */ + default CompletableFuture> getRegionLocations(byte[] row) { + return getRegionLocations(row, false); + } + + /** + * Find all the replicas for the region on which the given row is being served. + * @param row Row to find. + * @param reload true to reload information or false to use cached information + * @return Locations for all the replicas of the row. + * @throws IOException if a remote or network exception occurs + */ + CompletableFuture> getRegionLocations(byte[] row, boolean reload); + /** * Retrieves all of the regions associated with this table. + *

+ * Usually we will go to meta table directly in this method so there is no {@code reload} + * parameter. + *

+ * Notice that the location for region replicas other than the default replica are also returned. * @return a {@link List} of all regions associated with this table. */ CompletableFuture> getAllRegionLocations(); + + /** + * Gets the starting row key for every region in the currently open table. + *

+ * This is mainly useful for the MapReduce integration. + * @return Array of region starting row keys + * @throws IOException if a remote or network exception occurs + */ + default CompletableFuture> getStartKeys() throws IOException { + return getStartEndKeys().thenApply( + startEndKeys -> startEndKeys.stream().map(Pair::getFirst).collect(Collectors.toList())); + } + + /** + * Gets the ending row key for every region in the currently open table. + *

+ * This is mainly useful for the MapReduce integration. + * @return Array of region ending row keys + * @throws IOException if a remote or network exception occurs + */ + default CompletableFuture> getEndKeys() throws IOException { + return getStartEndKeys().thenApply( + startEndKeys -> startEndKeys.stream().map(Pair::getSecond).collect(Collectors.toList())); + } + + /** + * Gets the starting and ending row keys for every region in the currently open table. + *

+ * This is mainly useful for the MapReduce integration. + * @return Pair of arrays of region starting and ending row keys + * @throws IOException if a remote or network exception occurs + */ + default CompletableFuture>> getStartEndKeys() throws IOException { + return getAllRegionLocations().thenApply( + locs -> locs.stream().filter(loc -> RegionReplicaUtil.isDefaultReplica(loc.getRegion())) + .map(HRegionLocation::getRegion).map(r -> Pair.newPair(r.getStartKey(), r.getEndKey())) + .collect(Collectors.toList())); + } + + /** + * Clear all the entries in the region location cache. + *

+ * This may cause performance issue so use it with caution. + */ + void clearRegionLocationCache(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java index 606ee7acef..145e975d83 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java @@ -17,20 +17,15 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.BiConsumer; -import java.util.stream.Collectors; -import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - /** * The implementation of AsyncRegionLocator. */ @@ -39,11 +34,11 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator { private final TableName tableName; - private final AsyncRegionLocator locator; + private final AsyncConnectionImpl conn; - public AsyncTableRegionLocatorImpl(TableName tableName, AsyncRegionLocator locator) { + public AsyncTableRegionLocatorImpl(TableName tableName, AsyncConnectionImpl conn) { this.tableName = tableName; - this.locator = locator; + this.conn = conn; } @Override @@ -54,67 +49,29 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator { @Override public CompletableFuture getRegionLocation(byte[] row, int replicaId, boolean reload) { - return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload, - -1L); + return conn.getLocator().getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, + reload, -1L); } - // this is used to prevent stack overflow if there are thousands of regions for the table. If the - // location is in cache, the CompletableFuture will be completed immediately inside the same - // thread, and then in the action we will call locate again, also in the same thread. If all the - // locations are in cache, and we do not use whenCompleteAsync to break the tie, the stack will be - // very very deep and cause stack overflow. - @VisibleForTesting - static final ThreadLocal STACK_DEPTH = new ThreadLocal() { - - @Override - protected MutableInt initialValue() { - return new MutableInt(0); + @Override + public CompletableFuture> getAllRegionLocations() { + if (TableName.isMetaTableName(tableName)) { + return conn.registry.getMetaRegionLocation() + .thenApply(locs -> Arrays.asList(locs.getRegionLocations())); } - }; - - @VisibleForTesting - static final int MAX_STACK_DEPTH = 16; + return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), + Optional.of(tableName)); + } - private void locate(CompletableFuture> future, - ConcurrentLinkedQueue result, byte[] row) { - BiConsumer listener = (loc, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - result.add(loc); - if (ConnectionUtils.isEmptyStartRow(loc.getRegion().getStartKey())) { - future.complete(result.stream() - .sorted((l1, l2) -> RegionInfo.COMPARATOR.compare(l1.getRegion(), l2.getRegion())) - .collect(Collectors.toList())); - } else { - locate(future, result, loc.getRegion().getStartKey()); - } - }; - MutableInt depth = STACK_DEPTH.get(); - boolean async = depth.incrementAndGet() >= MAX_STACK_DEPTH; - try { - CompletableFuture f = - locator.getRegionLocation(tableName, row, RegionLocateType.BEFORE, -1L); - if (async) { - FutureUtils.addListenerAsync(f, listener); - } else { - FutureUtils.addListener(f, listener); - } - } finally { - if (depth.decrementAndGet() == 0) { - STACK_DEPTH.remove(); - } - } + @Override + public CompletableFuture> getRegionLocations(byte[] row, boolean reload) { + return conn.getLocator() + .getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L) + .thenApply(locs -> Arrays.asList(locs.getRegionLocations())); } @Override - public CompletableFuture> getAllRegionLocations() { - ConcurrentLinkedQueue result = new ConcurrentLinkedQueue<>(); - CompletableFuture> future = new CompletableFuture<>(); - // start from end to start, as when locating we will do reverse scan, so we will prefetch the - // location of the regions before the current one. - locate(future, result, HConstants.EMPTY_END_ROW); - return future; + public void clearRegionLocationCache() { + conn.getLocator().clearCache(tableName); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java index fbea0f5711..c7440c6704 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java @@ -108,6 +108,9 @@ public interface RegionLocator extends Closeable { /** * Retrieves all of the regions associated with this table. *

+ * Usually we will go to meta table directly in this method so there is no {@code reload} + * parameter. + *

* Notice that the location for region replicas other than the default replica are also returned. * @return a {@link List} of all regions associated with this table. * @throws IOException if a remote or network exception occurs diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java index 6c3e026753..02ce655ed2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -17,12 +17,18 @@ */ package org.apache.hadoop.hbase.util; +import java.io.IOException; +import java.io.InterruptedIOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.function.BiConsumer; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; + /** * Helper class for processing futures. */ @@ -59,21 +65,18 @@ public final class FutureUtils { } /** - * Almost the same with the {@link #addListener(CompletableFuture, BiConsumer)} method above, the - * difference is that in this method we will call - * {@link CompletableFuture#whenCompleteAsync(BiConsumer)} instead of - * {@link CompletableFuture#whenComplete(BiConsumer)}. - * @see #addListener(CompletableFuture, BiConsumer) + * A helper class for getting the result of a Future, and convert the error to an + * {@link IOException}. */ - @SuppressWarnings("FutureReturnValueIgnored") - public static void addListenerAsync(CompletableFuture future, - BiConsumer action) { - future.whenCompleteAsync((resp, error) -> { - try { - action.accept(resp, error); - } catch (Throwable t) { - LOG.error("Unexpected error caught when processing CompletableFuture", t); - } - }); + public static T get(Future future) throws IOException { + try { + return future.get(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + Throwables.propagateIfPossible(cause, IOException.class); + throw new IOException(cause); + } } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java new file mode 100644 index 0000000000..b21c33f99b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.After; +import org.junit.Test; + +public abstract class AbstractTestRegionLocator { + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + protected static TableName TABLE_NAME = TableName.valueOf("Locator"); + + protected static byte[] FAMILY = Bytes.toBytes("family"); + + protected static int REGION_REPLICATION = 3; + + protected static byte[][] SPLIT_KEYS; + + protected static void startClusterAndCreateTable() throws Exception { + UTIL.startMiniCluster(3); + TableDescriptor td = + TableDescriptorBuilder.newBuilder(TABLE_NAME).setRegionReplication(REGION_REPLICATION) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); + SPLIT_KEYS = new byte[9][]; + for (int i = 0; i < 9; i++) { + SPLIT_KEYS[i] = Bytes.toBytes(Integer.toString(i + 1)); + } + UTIL.getAdmin().createTable(td, SPLIT_KEYS); + UTIL.waitTableAvailable(TABLE_NAME); + UTIL.getAdmin().balancerSwitch(false, true); + } + + @After + public void tearDownAfterTest() throws IOException { + clearCache(); + } + + private byte[] getStartKey(int index) { + return index == 0 ? HConstants.EMPTY_START_ROW : SPLIT_KEYS[index - 1]; + } + + private byte[] getEndKey(int index) { + return index == SPLIT_KEYS.length ? HConstants.EMPTY_END_ROW : SPLIT_KEYS[index]; + } + + private void assertStartKeys(byte[][] startKeys) { + assertEquals(SPLIT_KEYS.length + 1, startKeys.length); + for (int i = 0; i < startKeys.length; i++) { + assertArrayEquals(getStartKey(i), startKeys[i]); + } + } + + private void assertEndKeys(byte[][] endKeys) { + assertEquals(SPLIT_KEYS.length + 1, endKeys.length); + for (int i = 0; i < endKeys.length; i++) { + assertArrayEquals(getEndKey(i), endKeys[i]); + } + } + + @Test + public void testStartEndKeys() throws IOException { + assertStartKeys(getStartKeys()); + assertEndKeys(getEndKeys()); + Pair startEndKeys = getStartEndKeys(); + assertStartKeys(startEndKeys.getFirst()); + assertEndKeys(startEndKeys.getSecond()); + } + + private void assertRegionLocation(HRegionLocation loc, int index, int replicaId) { + RegionInfo region = loc.getRegion(); + byte[] startKey = getStartKey(index); + assertArrayEquals(startKey, region.getStartKey()); + assertArrayEquals(getEndKey(index), region.getEndKey()); + assertEquals(replicaId, region.getReplicaId()); + ServerName expected = + UTIL.getMiniHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) + .filter(rs -> rs.getRegions(TABLE_NAME).stream().map(Region::getRegionInfo) + .anyMatch(r -> r.containsRow(startKey) && r.getReplicaId() == replicaId)) + .findFirst().get().getServerName(); + assertEquals(expected, loc.getServerName()); + } + + @Test + public void testGetRegionLocation() throws IOException { + for (int i = 0; i <= SPLIT_KEYS.length; i++) { + for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) { + assertRegionLocation(getRegionLocation(getStartKey(i), replicaId), i, replicaId); + } + } + } + + @Test + public void testGetRegionLocations() throws IOException { + for (int i = 0; i <= SPLIT_KEYS.length; i++) { + List locs = getRegionLocations(getStartKey(i)); + assertEquals(REGION_REPLICATION, locs.size()); + for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) { + assertRegionLocation(locs.get(replicaId), i, replicaId); + } + } + } + + @Test + public void testGetAllRegionLocations() throws IOException { + List locs = getAllRegionLocations(); + assertEquals(REGION_REPLICATION * (SPLIT_KEYS.length + 1), locs.size()); + Collections.sort(locs, (l1, l2) -> { + int c = Bytes.compareTo(l1.getRegion().getStartKey(), l2.getRegion().getStartKey()); + if (c != 0) { + return c; + } + return Integer.compare(l1.getRegion().getReplicaId(), l2.getRegion().getReplicaId()); + }); + for (int i = 0; i <= SPLIT_KEYS.length; i++) { + for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) { + assertRegionLocation(locs.get(i * REGION_REPLICATION + replicaId), i, replicaId); + } + } + } + + protected abstract byte[][] getStartKeys() throws IOException; + + protected abstract byte[][] getEndKeys() throws IOException; + + protected abstract Pair getStartEndKeys() throws IOException; + + protected abstract HRegionLocation getRegionLocation(byte[] row, int replicaId) + throws IOException; + + protected abstract List getRegionLocations(byte[] row) throws IOException; + + protected abstract List getAllRegionLocations() throws IOException; + + protected abstract void clearCache() throws IOException; +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java index 86520047b1..f32693aa0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java @@ -17,102 +17,86 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.commons.math3.util.Pair; +import static org.apache.hadoop.hbase.util.FutureUtils.get; + +import java.io.IOException; +import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; @Category({ MediumTests.class, ClientTests.class }) -public class TestAsyncTableRegionLocator { +public class TestAsyncTableRegionLocator extends AbstractTestRegionLocator { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncTableRegionLocator.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private static TableName TABLE_NAME = TableName.valueOf("async"); - private static AsyncConnection CONN; + private static AsyncTableRegionLocator LOCATOR; + @BeforeClass public static void setUp() throws Exception { - TEST_UTIL.startMiniCluster(3); - TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("cf")); - TEST_UTIL.waitTableAvailable(TABLE_NAME); - TEST_UTIL.getAdmin().balancerSwitch(false, true); - CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + startClusterAndCreateTable(); + CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + LOCATOR = CONN.getRegionLocator(TABLE_NAME); } @AfterClass public static void tearDown() throws Exception { Closeables.close(CONN, true); - TEST_UTIL.shutdownMiniCluster(); + UTIL.shutdownMiniCluster(); + } + + @Override + protected byte[][] getStartKeys() throws IOException { + return get(LOCATOR.getStartKeys()).toArray(new byte[0][]); + } + + @Override + protected byte[][] getEndKeys() throws IOException { + return get(LOCATOR.getEndKeys()).toArray(new byte[0][]); } - private void assertLocEquals(Map region2Loc) - throws InterruptedException, ExecutionException { - for (HRegionLocation loc : CONN.getRegionLocator(TABLE_NAME).getAllRegionLocations().get()) { - ServerName expected = region2Loc.remove(loc.getRegion()); - assertNotNull(expected); - assertEquals(expected, loc.getServerName()); + @Override + protected Pair getStartEndKeys() throws IOException { + List> startEndKeys = get(LOCATOR.getStartEndKeys()); + byte[][] startKeys = new byte[startEndKeys.size()][]; + byte[][] endKeys = new byte[startEndKeys.size()][]; + for (int i = 0, n = startEndKeys.size(); i < n; i++) { + Pair pair = startEndKeys.get(i); + startKeys[i] = pair.getFirst(); + endKeys[i] = pair.getSecond(); } + return Pair.newPair(startKeys, endKeys); + } + + @Override + protected HRegionLocation getRegionLocation(byte[] row, int replicaId) throws IOException { + return get(LOCATOR.getRegionLocation(row, replicaId)); + } + + @Override + protected List getRegionLocations(byte[] row) throws IOException { + return get(LOCATOR.getRegionLocations(row)); + } + + @Override + protected List getAllRegionLocations() throws IOException { + return get(LOCATOR.getAllRegionLocations()); } - @Test - public void testGetAll() throws InterruptedException, ExecutionException { - Map region2Loc = TEST_UTIL.getMiniHBaseCluster() - .getRegionServerThreads().stream().map(t -> t.getRegionServer()) - .flatMap(rs -> rs.getRegions(TABLE_NAME).stream() - .map(r -> Pair.create(r.getRegionInfo(), rs.getServerName()))) - .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond)); - MutableInt maxDepth = new MutableInt(0); - MutableInt depth = new MutableInt(0) { - - private static final long serialVersionUID = 5887112211305087650L; - - @Override - public int incrementAndGet() { - int val = super.incrementAndGet(); - if (val > maxDepth.intValue()) { - maxDepth.setValue(val); - } - return val; - } - }; - // first time, read from meta - AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth); - assertLocEquals(new HashMap<>(region2Loc)); - assertTrue(maxDepth.intValue() > 0); - assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH); - - // second time, read from cache - maxDepth.setValue(0); - depth.setValue(0); - AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth); - assertLocEquals(new HashMap<>(region2Loc)); - assertTrue(maxDepth.intValue() > 0); - assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH); + @Override + protected void clearCache() throws IOException { + LOCATOR.clearRegionLocationCache(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java index e0634ae6d1..49ce8b15cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java @@ -17,150 +17,73 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - import java.io.IOException; -import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + @Category({ MediumTests.class, ClientTests.class }) -public class TestRegionLocator { +public class TestRegionLocator extends AbstractTestRegionLocator { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestRegionLocator.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private static TableName TABLE_NAME = TableName.valueOf("Locator"); - - private static byte[] FAMILY = Bytes.toBytes("family"); - - private static int REGION_REPLICATION = 3; - - private static byte[][] SPLIT_KEYS; + private static RegionLocator LOCATOR; @BeforeClass public static void setUp() throws Exception { - UTIL.startMiniCluster(3); - TableDescriptor td = - TableDescriptorBuilder.newBuilder(TABLE_NAME).setRegionReplication(REGION_REPLICATION) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); - SPLIT_KEYS = new byte[9][]; - for (int i = 0; i < 9; i++) { - SPLIT_KEYS[i] = Bytes.toBytes(Integer.toString(i + 1)); - } - UTIL.getAdmin().createTable(td, SPLIT_KEYS); - UTIL.waitTableAvailable(TABLE_NAME); - UTIL.getAdmin().balancerSwitch(false, true); + startClusterAndCreateTable(); + LOCATOR = UTIL.getConnection().getRegionLocator(TABLE_NAME); } @AfterClass public static void tearDown() throws Exception { + Closeables.close(LOCATOR, true); UTIL.shutdownMiniCluster(); } - @After - public void tearDownAfterTest() throws IOException { - try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) { - locator.clearRegionLocationCache(); - } - } - - private byte[] getStartKey(int index) { - return index == 0 ? HConstants.EMPTY_START_ROW : SPLIT_KEYS[index - 1]; - } - - private byte[] getEndKey(int index) { - return index == SPLIT_KEYS.length ? HConstants.EMPTY_END_ROW : SPLIT_KEYS[index]; + @Override + protected byte[][] getStartKeys() throws IOException { + return LOCATOR.getStartKeys(); } - private void assertStartKeys(byte[][] startKeys) { - assertEquals(SPLIT_KEYS.length + 1, startKeys.length); - for (int i = 0; i < startKeys.length; i++) { - assertArrayEquals(getStartKey(i), startKeys[i]); - } + @Override + protected byte[][] getEndKeys() throws IOException { + return LOCATOR.getEndKeys(); } - private void assertEndKeys(byte[][] endKeys) { - assertEquals(SPLIT_KEYS.length + 1, endKeys.length); - for (int i = 0; i < endKeys.length; i++) { - assertArrayEquals(getEndKey(i), endKeys[i]); - } + @Override + protected Pair getStartEndKeys() throws IOException { + return LOCATOR.getStartEndKeys(); } - @Test - public void testStartEndKeys() throws IOException { - try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) { - assertStartKeys(locator.getStartKeys()); - assertEndKeys(locator.getEndKeys()); - Pair startEndKeys = locator.getStartEndKeys(); - assertStartKeys(startEndKeys.getFirst()); - assertEndKeys(startEndKeys.getSecond()); - } + @Override + protected HRegionLocation getRegionLocation(byte[] row, int replicaId) throws IOException { + return LOCATOR.getRegionLocation(row, replicaId); } - private void assertRegionLocation(HRegionLocation loc, int index, int replicaId) { - RegionInfo region = loc.getRegion(); - byte[] startKey = getStartKey(index); - assertArrayEquals(startKey, region.getStartKey()); - assertArrayEquals(getEndKey(index), region.getEndKey()); - assertEquals(replicaId, region.getReplicaId()); - ServerName expected = - UTIL.getMiniHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) - .filter(rs -> rs.getRegions(TABLE_NAME).stream().map(Region::getRegionInfo) - .anyMatch(r -> r.containsRow(startKey) && r.getReplicaId() == replicaId)) - .findFirst().get().getServerName(); - assertEquals(expected, loc.getServerName()); + @Override + protected List getRegionLocations(byte[] row) throws IOException { + return LOCATOR.getRegionLocations(row); } - @Test - public void testGetRegionLocation() throws IOException { - try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) { - for (int i = 0; i <= SPLIT_KEYS.length; i++) { - for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) { - assertRegionLocation(locator.getRegionLocation(getStartKey(i), replicaId), i, replicaId); - } - } - } + @Override + protected List getAllRegionLocations() throws IOException { + return LOCATOR.getAllRegionLocations(); } - @Test - public void testGetAllRegionLocations() throws IOException { - try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) { - List locs = locator.getAllRegionLocations(); - assertEquals(REGION_REPLICATION * (SPLIT_KEYS.length + 1), locs.size()); - Collections.sort(locs, (l1, l2) -> { - int c = Bytes.compareTo(l1.getRegion().getStartKey(), l2.getRegion().getStartKey()); - if (c != 0) { - return c; - } - return Integer.compare(l1.getRegion().getReplicaId(), l2.getRegion().getReplicaId()); - }); - for (int i = 0; i <= SPLIT_KEYS.length; i++) { - for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) { - assertRegionLocation(locs.get(i * REGION_REPLICATION + replicaId), i, replicaId); - } - } - } + @Override + protected void clearCache() throws IOException { + LOCATOR.clearRegionLocationCache(); } } -- 2.17.1