diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index f9b8807..eaa5a3a 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -52,14 +52,17 @@ enum CreateTableState { CREATE_TABLE_WRITE_FS_LAYOUT = 2; CREATE_TABLE_ADD_TO_META = 3; CREATE_TABLE_ASSIGN_REGIONS = 4; - CREATE_TABLE_UPDATE_DESC_CACHE = 5; - CREATE_TABLE_POST_OPERATION = 6; + // moves to this state only when there are replicas + CREATE_TABLE_ASSIGN_REPLICA_REGIONS = 5; + CREATE_TABLE_UPDATE_DESC_CACHE = 6; + CREATE_TABLE_POST_OPERATION = 7; } message CreateTableStateData { required UserInformation user_info = 1; required TableSchema table_schema = 2; repeated RegionInfo region_info = 3; + required int32 currentReplicaUnderProcess = 4; } enum ModifyTableState { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 8dd34c2..eb94f8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -1260,12 +1260,18 @@ public abstract class BaseLoadBalancer implements LoadBalancer { boolean assigned = false; for (int j = 0; j < numServers; j++) { // try all servers one by one ServerName serverName = servers.get((j + serverIdx) % numServers); - if (!cluster.wouldLowerAvailability(region, serverName)) { - List serverRegions = - assignments.computeIfAbsent(serverName, k -> new ArrayList<>()); - serverRegions.add(region); - cluster.doAssignRegion(region, serverName); - serverIdx = (j + serverIdx + 1) % numServers; //remain from next server + List serverRegions = + assignments.computeIfAbsent(serverName, k -> new ArrayList<>()); + if (!RegionReplicaUtil.isDefaultReplica(region)) { + if (!replicaAvailable(region, serverName)) { + assignRegionToServer(cluster, serverName, serverRegions, region); + serverIdx = (j + serverIdx + 1) % numServers; + assigned = true; + break; + } + } else if (!cluster.wouldLowerAvailability(region, serverName)) { + assignRegionToServer(cluster, serverName, serverRegions, region); + serverIdx = (j + serverIdx + 1) % numServers; // remain from next server assigned = true; break; } @@ -1280,8 +1286,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int i = RANDOM.nextInt(numServers); ServerName server = servers.get(i); List serverRegions = assignments.computeIfAbsent(server, k -> new ArrayList<>()); - serverRegions.add(region); - cluster.doAssignRegion(region, server); + assignRegionToServer(cluster, server, serverRegions, region); } return assignments; } @@ -1535,11 +1540,16 @@ public abstract class BaseLoadBalancer implements LoadBalancer { List serverRegions = new ArrayList<>(max); for (int i = regionIdx; i < numRegions; i += numServers) { RegionInfo region = regions.get(i % numRegions); - if (cluster.wouldLowerAvailability(region, server)) { + if (!RegionReplicaUtil.isDefaultReplica(region)) { + if (replicaAvailable(region, server)) { + unassignedRegions.add(region); + } else { + assignRegionToServer(cluster, server, serverRegions, region); + } + } else if (cluster.wouldLowerAvailability(region, server)) { unassignedRegions.add(region); } else { - serverRegions.add(region); - cluster.doAssignRegion(region, server); + assignRegionToServer(cluster, server, serverRegions, region); } } assignments.put(server, serverRegions); @@ -1547,6 +1557,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + private void assignRegionToServer(Cluster cluster, ServerName server, + List serverRegions, RegionInfo region) { + serverRegions.add(region); + cluster.doAssignRegion(region, server); + } + protected Map> getRegionAssignmentsByServer( Collection regions) { if (this.services != null && this.services.getAssignmentManager() != null) { @@ -1556,6 +1572,25 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + boolean replicaAvailable(RegionInfo info, ServerName serverName) { + int replicaId = info.getReplicaId(); + for (int i = 0; i < replicaId; i++) { + RegionInfo regionInfo = RegionReplicaUtil.getRegionInfoForReplica(info, i); + List regionInfos = new ArrayList(1); + regionInfos.add(regionInfo); + Map> replicaAssignments = + this.services.getAssignmentManager().getSnapShotOfAssignment(regionInfos); + if (replicaAssignments != null) { + for (ServerName replicaServerName : replicaAssignments.keySet()) { + if (replicaServerName.equals(serverName)) { + return true; + } + } + } + } + return false; + } + @Override public void onConfigurationChange(Configuration conf) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java index e9804dd..529a524 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -37,17 +37,16 @@ import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; -import org.apache.hadoop.hbase.util.FSTableDescriptors; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.ModifyRegionUtils; -import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; -import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CreateTableState; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class CreateTableProcedure @@ -56,6 +55,7 @@ public class CreateTableProcedure private TableDescriptor tableDescriptor; private List newRegions; + private int currentReplicaUnderProcess = 1; public CreateTableProcedure() { // Required by the Procedure framework to create the procedure on replay @@ -106,8 +106,39 @@ public class CreateTableProcedure break; case CREATE_TABLE_ASSIGN_REGIONS: setEnablingState(env, getTableName()); - addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions)); - setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE); + if (tableDescriptor.getRegionReplication() > 1) { + List primaryRegions = new ArrayList(); + for (RegionInfo info : newRegions) { + if (RegionReplicaUtil.isDefaultReplica(info)) { + primaryRegions.add(info); + } + } + // assign only primary region + addChildProcedure(env.getAssignmentManager().createAssignProcedures(primaryRegions)); + setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REPLICA_REGIONS); + } else { + addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions)); + setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE); + } + break; + case CREATE_TABLE_ASSIGN_REPLICA_REGIONS: + //setEnablingState(env, getTableName()); + if(currentReplicaUnderProcess < tableDescriptor.getRegionReplication()) { + List regionReplicas = new ArrayList(); + for(RegionInfo info : newRegions) { + if(info.getReplicaId() == currentReplicaUnderProcess) { + regionReplicas.add(info); + } + } + LOG.info("Processing replicas "+currentReplicaUnderProcess); + // assign the first batch of child procedures + addChildProcedure(env.getAssignmentManager().createAssignProcedures(regionReplicas)); + LOG.info("Proccessed replicas "+currentReplicaUnderProcess); + currentReplicaUnderProcess++; + // do not update the state + } else { + setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE); + } break; case CREATE_TABLE_UPDATE_DESC_CACHE: setEnabledState(env, getTableName()); @@ -195,6 +226,7 @@ public class CreateTableProcedure state.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); } } + state.setCurrentReplicaUnderProcess(currentReplicaUnderProcess); serializer.serialize(state.build()); } @@ -215,6 +247,7 @@ public class CreateTableProcedure newRegions.add(ProtobufUtil.toRegionInfo(hri)); } } + currentReplicaUnderProcess = state.getCurrentReplicaUnderProcess(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCreateTableProcWithReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCreateTableProcWithReplicas.java new file mode 100644 index 0000000..29faf42 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCreateTableProcWithReplicas.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestCreateTableProcWithReplicas { + + private static final Log LOG = LogFactory.getLog(TestCreateTableProcWithReplicas.class); + + private static final int NB_SERVERS = 3; + private static Table table; + private static final byte[] row = "TestCreateTableProcWithReplicas".getBytes(); + + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + private static final byte[] f = HConstants.CATALOG_FAMILY; + + @BeforeClass + public static void before() throws Exception { + // Reduce the hdfs block size and prefetch to trigger the file-link reopen + // when the file is moved to archive (e.g. compaction) + HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192); + HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1); + HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); + HTU.getConfiguration().setInt(">hbase.master.wait.on.regionservers.mintostart", 3); + + HTU.startMiniCluster(NB_SERVERS); + Thread.sleep(5000); + final TableName tableName = TableName.valueOf(TestCreateTableProcWithReplicas.class.getSimpleName()); + + // Create table then get the single region for our new table. + createTableDirectlyFromHTD(tableName); + //enableReplicationByModification(tableName); + } + + private static void enableReplicationByModification(final TableName tableName) + throws IOException, InterruptedException { + table = HTU.createTable(tableName, f); + HBaseTestingUtility.setReplicas(HTU.getAdmin(), table.getName(), 3); + } + + private static void createTableDirectlyFromHTD(final TableName tableName) throws IOException { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.setRegionReplication(3); + // create a table with 3 replication + + table = HTU.createTable(htd, new byte[][] { f }, getSplits(20), + new Configuration(HTU.getConfiguration())); + } + + private static byte[][] getSplits(int numRegions) { + RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit(); + split.setFirstRow(Bytes.toBytes(0L)); + split.setLastRow(Bytes.toBytes(Long.MAX_VALUE)); + return split.split(numRegions); + } + + @AfterClass + public static void afterClass() throws Exception { + HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; + table.close(); + HTU.shutdownMiniCluster(); + } + + private HRegionServer getRS() { + return HTU.getMiniHBaseCluster().getRegionServer(0); + } + + private HRegionServer getSecondaryRS() { + return HTU.getMiniHBaseCluster().getRegionServer(1); + } + + private HRegionServer getTertiaryRS() { + return HTU.getMiniHBaseCluster().getRegionServer(2); + } + + @Test(timeout = 60000) + public void testRegionReplicasCreated() throws Exception { + Collection onlineRegions = getRS().getOnlineRegionsLocalContext(); + boolean res = checkDuplicates(onlineRegions); + assertFalse(res); + Collection onlineRegions2 = getSecondaryRS().getOnlineRegionsLocalContext(); + res = checkDuplicates(onlineRegions2); + assertFalse(res); + Collection onlineRegions3 = getTertiaryRS().getOnlineRegionsLocalContext(); + checkDuplicates(onlineRegions3); + assertFalse(res); + int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size(); + // includes system tables + assertEquals("the number of regions should be 62", totalRegions, 62); + } + + private boolean checkDuplicates(Collection onlineRegions3) throws Exception { + ArrayList copyOfRegion = new ArrayList(onlineRegions3); + for (HRegion region : copyOfRegion) { + RegionInfo regionInfo = region.getRegionInfo(); + RegionInfo regionInfoForReplica = + RegionReplicaUtil.getRegionInfoForDefaultReplica(regionInfo); + int i = 0; + for (Region actualRegion : onlineRegions3) { + if (regionInfoForReplica.equals( + RegionReplicaUtil.getRegionInfoForDefaultReplica(actualRegion.getRegionInfo()))) { + i++; + if (i > 1) { + System.out.println("duplicate found " + actualRegion.getRegionInfo() + " "+region.getRegionInfo()); + assertTrue(Bytes.equals(region.getRegionInfo().getStartKey(), + actualRegion.getRegionInfo().getStartKey())); + assertTrue(Bytes.equals(region.getRegionInfo().getEndKey(), + actualRegion.getRegionInfo().getEndKey())); + return true; + } + } + } + } + return false; + } +} \ No newline at end of file