From 10565e116fb8d5cdfcb47eed6c8b1fc2e46da58c Mon Sep 17 00:00:00 2001 From: Thiruvel Thirumoolan Date: Fri, 8 Nov 2019 14:48:21 -0800 Subject: [PATCH] HBASE-22356 API to get hdfs block distribution from regionservers --- .../hbase/shaded/protobuf/ProtobufUtil.java | 16 ++ .../shaded/protobuf/RequestConverter.java | 9 + .../src/main/protobuf/Admin.proto | 21 +++ hbase-protocol/src/main/protobuf/Admin.proto | 21 +++ .../hadoop/hbase/HDFSBlocksDistribution.java | 11 ++ .../hbase/regionserver/RSRpcServices.java | 37 ++++ .../hbase/TestHDFSBlocksDistributionRPC.java | 176 ++++++++++++++++++ .../hadoop/hbase/master/MockRegionServer.java | 6 + 8 files changed, 297 insertions(+) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/TestHDFSBlocksDistributionRPC.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 81082174bc..39b93ab7c3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -128,6 +128,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetHDFSBlockDistRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetHDFSBlockDistResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; @@ -136,6 +138,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.HDFSBlocksDistribution; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ServerInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; @@ -1824,6 +1827,19 @@ public final class ProtobufUtil { return getRegionInfos(response); } + public static List getHDFSBlockDistribution( + final RpcController controller, final AdminService.BlockingInterface admin) + throws IOException { + GetHDFSBlockDistRequest request = RequestConverter.buildGetHDFSBlockDistRequest(); + GetHDFSBlockDistResponse response; + try { + response = admin.getBlockDistribution(controller, request); + } catch (ServiceException se) { + throw getRemoteException(se); + } + return response.getRegionHDFSBlockDistList(); + } + /** * Get the list of region info from a GetOnlineRegionResponse * diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index d45423c95b..59f78ae20d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetHDFSBlockDistRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; @@ -1934,4 +1935,12 @@ public final class RequestConverter { return IsSnapshotCleanupEnabledRequest.newBuilder().build(); } + /** + * Create a protocol buffer GetHDFSBlockDistRequest for all regions/regions on a server. + * + * @return a protocol buffer GetHDFSBlockDistRequest + */ + public static GetHDFSBlockDistRequest buildGetHDFSBlockDistRequest() { + return GetHDFSBlockDistRequest.newBuilder().build(); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto index 85b9113a27..4a1484b5cf 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto @@ -280,6 +280,24 @@ message ExecuteProceduresRequest { message ExecuteProceduresResponse { } +message GetHDFSBlockDistRequest { +} + +message HostAndWeight { + required string host = 1; + required uint64 weight = 2; +} + +message HDFSBlocksDistribution { + required RegionSpecifier region = 1; + required uint64 uniqueBlocksTotalWeight = 2; + repeated HostAndWeight hostandweight = 3; +} + +message GetHDFSBlockDistResponse { + repeated HDFSBlocksDistribution regionHDFSBlockDist = 1; +} + service AdminService { rpc GetRegionInfo(GetRegionInfoRequest) returns(GetRegionInfoResponse); @@ -344,4 +362,7 @@ service AdminService { rpc ExecuteProcedures(ExecuteProceduresRequest) returns(ExecuteProceduresResponse); + + rpc GetBlockDistribution(GetHDFSBlockDistRequest) + returns(GetHDFSBlockDistResponse); } diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index 2c1d1749cf..b6b79fe781 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -255,6 +255,24 @@ message UpdateConfigurationRequest { message UpdateConfigurationResponse { } +message GetHDFSBlockDistRequest { +} + +message HostAndWeight { + required string host = 1; + required uint64 weight = 2; +} + +message HDFSBlocksDistribution { + required RegionSpecifier region = 1; + required uint64 uniqueBlocksTotalWeight = 2; + repeated HostAndWeight hostandweight = 3; +} + +message GetHDFSBlockDistResponse { + repeated HDFSBlocksDistribution regionHDFSBlockDist = 1; +} + service AdminService { rpc GetRegionInfo(GetRegionInfoRequest) returns(GetRegionInfoResponse); @@ -306,4 +324,7 @@ service AdminService { rpc UpdateConfiguration(UpdateConfigurationRequest) returns(UpdateConfigurationResponse); + + rpc GetBlockDistribution(GetHDFSBlockDistRequest) + returns(GetHDFSBlockDistResponse); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java index aa25fef452..aa4f485821 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java @@ -28,6 +28,7 @@ import java.util.TreeSet; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; /** * Data structure to describe the distribution of HDFS blocks among hosts. @@ -244,4 +245,14 @@ public class HDFSBlocksDistribution { return orderedHosts.descendingSet().toArray(new HostAndWeight[orderedHosts.size()]); } + public static HDFSBlocksDistribution convertBlockDistribution( + AdminProtos.HDFSBlocksDistribution pblockDist) { + HDFSBlocksDistribution blockDist = new HDFSBlocksDistribution(); + for (AdminProtos.HostAndWeight pHostAndWeight : pblockDist.getHostandweightList()) { + blockDist.addHostsAndBlockWeight(new String[] {pHostAndWeight.getHost()}, + pHostAndWeight.getWeight()); + } + blockDist.uniqueBlocksTotalWeight = pblockDist.getUniqueBlocksTotalWeight(); + return blockDist; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 379e254010..b1a330491b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.PrivateCellUtil; @@ -168,6 +169,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; @@ -183,6 +185,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetHDFSBlockDistRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetHDFSBlockDistResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; @@ -193,6 +197,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.HostAndWeight; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; @@ -3815,6 +3820,38 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + @Override + @QosPriority(priority=HConstants.ADMIN_QOS) + public GetHDFSBlockDistResponse getBlockDistribution(RpcController controller, + GetHDFSBlockDistRequest request) throws ServiceException { + + try { + checkOpen(); + requestCount.increment(); + + GetHDFSBlockDistResponse.Builder builder = GetHDFSBlockDistResponse.newBuilder(); + for (HRegion region : regionServer.getRegions()) { + HDFSBlocksDistribution hdfsBlockDistribution = region.getHDFSBlocksDistribution(); + AdminProtos.HDFSBlocksDistribution.Builder hdfsBlkDist = + AdminProtos.HDFSBlocksDistribution.newBuilder(); + hdfsBlkDist.setRegion(RequestConverter. + buildRegionSpecifier(RegionSpecifier.RegionSpecifierType.REGION_NAME, + region.getRegionInfo().getRegionName())); + hdfsBlkDist.setUniqueBlocksTotalWeight(hdfsBlockDistribution.getUniqueBlocksTotalWeight()); + for (HDFSBlocksDistribution.HostAndWeight hostAndWeight : + hdfsBlockDistribution.getTopHostsWithWeights()) { + HostAndWeight.Builder hostnWeight = HostAndWeight.newBuilder(); + hostnWeight.setHost(hostAndWeight.getHost()).setWeight(hostAndWeight.getWeight()); + hdfsBlkDist.addHostandweight(hostnWeight); + } + builder.addRegionHDFSBlockDist(hdfsBlkDist); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + @VisibleForTesting public RpcScheduler getRpcScheduler() { return rpcServer.getScheduler(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHDFSBlocksDistributionRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHDFSBlocksDistributionRPC.java new file mode 100644 index 0000000000..2cd2cda365 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHDFSBlocksDistributionRPC.java @@ -0,0 +1,176 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; + +@Category({MediumTests.class}) +public class TestHDFSBlocksDistributionRPC { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHDFSBlocksDistributionRPC.class); + + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final Configuration conf = UTIL.getConfiguration(); + private static final int SLAVES = 4; + + private static final TableName TABLE_1 = TableName.valueOf("table_1"); + private static final TableName TABLE_2 = TableName.valueOf("table_2"); + private static final TableName TABLE_3 = TableName.valueOf("table_3"); + private static final TableName[] tables = new TableName[]{TABLE_1, TABLE_2, TABLE_3}; + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final int REGIONS = 16; + + private static Admin admin; + + @BeforeClass + public static void beforeClass() throws Exception { + conf.set(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, ""+ SLAVES); + UTIL.startMiniCluster(SLAVES); + UTIL.getDFSCluster().waitClusterUp(); + UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(); + admin = UTIL.getAdmin(); + admin.balancerSwitch(false, true); + } + + @AfterClass + public static void afterClass() throws Exception { + for (TableName table : tables) { + UTIL.deleteTable(table); + } + UTIL.shutdownMiniCluster(); + } + + private static void createTables() throws IOException { + for (TableName tableName : tables) { + UTIL.createMultiRegionTable(tableName, FAMILY, REGIONS); + UTIL.waitUntilAllRegionsAssigned(tableName); + assertEquals("Region mismatch", REGIONS, + admin.getConnection().getRegionLocator(tableName).getAllRegionLocations().size()); + UTIL.loadTable(admin.getConnection().getTable(tableName), FAMILY); + UTIL.flush(tableName); + } + } + + @Test + public void testHDFSBlockDistribution() throws Exception { + + createTables(); + + // Wait until region server report is done. + int rsHeartBeatInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); + Thread.sleep(2 * rsHeartBeatInterval); + + // Check if regions match with the hdfsblock dist from the server + ClusterMetrics clusterMetrics = admin.getClusterMetrics(); + for (ServerName sn : clusterMetrics.getServersName()) { + List regions = admin.getRegions(sn); + Map blockDistributionMap = getHDFSBlockDistribution(sn); + checkRegionsAndBlockDist(sn, regions, blockDistributionMap, + clusterMetrics.getLiveServerMetrics().get(sn)); + } + } + + private void checkRegionsAndBlockDist(ServerName sn, Collection regions, + Map blockDistributionMap, ServerMetrics load) + throws IOException { + + assertEquals("No of regions and block dist map doesn't match", + regions.size(), blockDistributionMap.size()); + + for (RegionInfo info : regions) { + assertTrue("Region not in block dist map:" + info.getRegionNameAsString() + + " regionMap: " + blockDistributionMap, + blockDistributionMap.containsKey(info.getRegionName())); + + HDFSBlocksDistribution proto = blockDistributionMap.get(info.getRegionName()); + + // Check protobuf object's locality with that from cluster metrics. + float locality = load.getRegionMetrics().get(info.getRegionName()).getDataLocality(); + assertEquals("Locality from cluster metrics and getBlockDist doesn't match: " + + info.getRegionNameAsString(), + locality, proto.getBlockLocalityIndex(sn.getHostname()), 0); + + if (locality > 0) { + assertTrue("No host and weight present for region: " + info.getRegionNameAsString(), + proto.getHostAndWeights().size() > 0); + } + + // Compute block dist and check if all hosts and weight match. + HDFSBlocksDistribution computed = computeBlockDistribution(info); + assertEquals("Host and Weight does not match for " + info.getRegionNameAsString(), + computed.getHostAndWeights().size(), proto.getHostAndWeights().size()); + assertEquals("Total weight doesn't match for " + info.getRegionNameAsString(), + computed.getUniqueBlocksTotalWeight(), proto.getUniqueBlocksTotalWeight()); + + for (HDFSBlocksDistribution.HostAndWeight hostAndWeight : computed.getTopHostsWithWeights()) { + HDFSBlocksDistribution.HostAndWeight protoHAndW = + proto.getHostAndWeights().get(hostAndWeight.getHost()); + assertEquals("Weight doesn't match", hostAndWeight.getWeight(), protoHAndW.getWeight()); + } + } + } + + private HDFSBlocksDistribution computeBlockDistribution(RegionInfo info) throws IOException { + return HRegion.computeHDFSBlocksDistribution(UTIL.getConfiguration(), + admin.getDescriptor(info.getTable()), info); + } + + private Map getHDFSBlockDistribution( + final ServerName sn) throws IOException { + + HRegionServer regionServer = UTIL.getHBaseCluster().getRegionServer(sn); + List blockDistribution = + ProtobufUtil.getHDFSBlockDistribution(null, regionServer.getRSRpcServices()); + Map blkDistMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (AdminProtos.HDFSBlocksDistribution hdfsBlockDistribution : blockDistribution) { + byte[] regionName = hdfsBlockDistribution.getRegion().getValue().toByteArray(); + blkDistMap.put(regionName, + HDFSBlocksDistribution.convertBlockDistribution(hdfsBlockDistribution)); + } + return blkDistMap; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 3ab52968c9..a83bd73d0a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -673,6 +673,12 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface, return null; } + @Override + public AdminProtos.GetHDFSBlockDistResponse getBlockDistribution(RpcController controller, + AdminProtos.GetHDFSBlockDistRequest request) throws ServiceException { + return null; + } + @Override public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots( RpcController controller, GetSpaceQuotaSnapshotsRequest request) -- 2.21.0 (Apple Git-122.2)