From 518ca0c2822146545f506cfc21a53e57d31382da Mon Sep 17 00:00:00 2001 From: huzheng Date: Fri, 17 Feb 2017 13:28:07 +0800 Subject: [PATCH] HBASE-17646: Implement Async getRegion method --- .../hadoop/hbase/AsyncMetaTableAccessor.java | 34 ++++++++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 95 ++++++++++++++++++++-- .../apache/hadoop/hbase/client/TestAsyncAdmin.java | 49 +++++++++++ 3 files changed, 169 insertions(+), 9 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java index f136d56..d09d29e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java @@ -27,12 +27,15 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.RawAsyncTable; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; /** * The asynchronous meta table accessor. Used to read/write region and assignment information store @@ -74,6 +77,37 @@ public class AsyncMetaTableAccessor { return future; } + public static CompletableFuture> getRegion(RawAsyncTable metaTable, + byte[] regionName) { + CompletableFuture> future = new CompletableFuture<>(); + byte[] row = regionName; + HRegionInfo parsedInfo = null; + try { + parsedInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName); + row = MetaTableAccessor.getMetaKeyForRegion(parsedInfo); + } catch (Exception parseEx) { + // Ignore if regionName is a encoded region name. + } + + final HRegionInfo finalHRI = parsedInfo; + metaTable.get(new Get(row).addFamily(HConstants.CATALOG_FAMILY)).whenComplete((r, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + RegionLocations locations = MetaTableAccessor.getRegionLocations(r); + HRegionLocation hrl = locations == null ? null + : locations.getRegionLocation(finalHRI == null ? 0 : finalHRI.getReplicaId()); + if (hrl == null) { + future.complete(null); + } else { + future.complete(new Pair<>(hrl.getRegionInfo(), hrl.getServerName())); + } + }); + + return future; + } + private static Optional getTableState(Result r) throws IOException { Cell cell = r.getColumnLatestCell(getTableFamily(), getStateColumn()); if (cell == null) return Optional.empty(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 9d5c509..3876570 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -32,16 +32,22 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -189,12 +195,10 @@ public class AsyncHBaseAdmin implements AsyncAdmin { if (controller.failed()) { future.completeExceptionally(new IOException(controller.errorText())); } else { - if (respConverter != null) { - try { - future.complete(respConverter.convert(resp)); - } catch (IOException e) { - future.completeExceptionally(e); - } + try { + future.complete(respConverter.convert(resp)); + } catch (IOException e) { + future.completeExceptionally(e); } } } @@ -507,8 +511,81 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture closeRegion(byte[] regionname, String serverName) { - throw new UnsupportedOperationException("closeRegion method depends on getRegion API, will support soon."); + public CompletableFuture closeRegion(byte[] regionName, String serverName) { + CompletableFuture future = new CompletableFuture<>(); + getRegion(regionName).whenComplete((p, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (p == null || p.getFirst() == null) { + future.completeExceptionally(new UnknownRegionException(Bytes.toStringBinary(regionName))); + return; + } + if (serverName != null) { + closeRegion(ServerName.valueOf(serverName), p.getFirst()).whenComplete((p2, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + }else{ + future.complete(null); + } + }); + } else { + if (p.getSecond() == null) { + future.completeExceptionally(new NotServingRegionException(regionName)); + } else { + closeRegion(p.getSecond(), p.getFirst()).whenComplete((p2, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + }else{ + future.complete(null); + } + }); + } + } + }); + return future; + } + + CompletableFuture> getRegion(byte[] regionName) { + if (regionName == null) { + return failedFuture(new IllegalArgumentException("Pass region name")); + } + CompletableFuture> future = new CompletableFuture<>(); + AsyncMetaTableAccessor.getRegion(metaTable, regionName).whenComplete( + (p, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (p != null) { + future.complete(p); + } else { + metaTable.scanAll( + new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)) + .whenComplete((results, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + String encodedName = Bytes.toString(regionName); + if (results != null && !results.isEmpty()) { + for (Result r : results) { + if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue; + RegionLocations rl = MetaTableAccessor.getRegionLocations(r); + if (rl != null) { + for (HRegionLocation h : rl.getRegionLocations()) { + if (h != null && encodedName.equals(h.getRegionInfo().getEncodedName())) { + future.complete(new Pair<>(h.getRegionInfo(), h.getServerName())); + return; + } + } + } + } + } + future.complete(null); + }); + } + }); + return future; } @Override @@ -530,7 +607,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { .action( (controller, stub) -> this. adminCall( controller, stub, ProtobufUtil.buildCloseRegionRequest(sn, hri.getRegionName()), - (s, c, req, done) -> s.closeRegion(controller, req, done), null)) + (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> null)) .serverName(sn).call(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java index 467f6c9..950d77d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -964,6 +965,35 @@ public class TestAsyncAdmin { } @Test + public void testCloseRegionThatFetchesTheHRIFromMeta() throws Exception { + TableName TABLENAME = TableName.valueOf("TestHBACloseRegion2"); + createTableWithDefaultConf(TABLENAME); + + HRegionInfo info = null; + HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TABLENAME); + List onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()); + for (HRegionInfo regionInfo : onlineRegions) { + if (!regionInfo.isMetaTable()) { + + if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion2")) { + info = regionInfo; + admin.closeRegion(regionInfo.getRegionNameAsString(), rs.getServerName().getServerName()) + .get(); + } + } + } + + boolean isInList = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()).contains(info); + long timeout = System.currentTimeMillis() + 10000; + while ((System.currentTimeMillis() < timeout) && (isInList)) { + Thread.sleep(100); + isInList = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()).contains(info); + } + + assertFalse("The region should not be present in online regions list.", isInList); + } + + @Test public void testCloseRegionWhenServerNameIsNull() throws Exception { byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegion3"); createTableWithDefaultConf(TableName.valueOf(TABLENAME)); @@ -1035,4 +1065,23 @@ public class TestAsyncAdmin { assertTrue("The region should be present in online regions list.", onlineRegions.contains(info)); } + + @Test + public void testGetRegion() throws Exception { + AsyncHBaseAdmin rawAdmin = (AsyncHBaseAdmin) admin; + + final TableName tableName = TableName.valueOf("testGetRegion"); + LOG.info("Started " + tableName); + TEST_UTIL.createMultiRegionTable(tableName, HConstants.CATALOG_FAMILY); + + try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { + HRegionLocation regionLocation = locator.getRegionLocation(Bytes.toBytes("mmm")); + HRegionInfo region = regionLocation.getRegionInfo(); + byte[] regionName = region.getRegionName(); + Pair pair = rawAdmin.getRegion(regionName).get(); + assertTrue(Bytes.equals(regionName, pair.getFirst().getRegionName())); + pair = rawAdmin.getRegion(region.getEncodedNameAsBytes()).get(); + assertTrue(Bytes.equals(regionName, pair.getFirst().getRegionName())); + } + } } -- 2.3.2 (Apple Git-55)