.../org/apache/hadoop/hbase/MetaTableAccessor.java | 2 +-
.../hadoop/hbase/client/RegionReplicaUtil.java | 43 +++++
.../master/procedure/CreateTableProcedure.java | 27 +--
.../master/procedure/EnableTableProcedure.java | 93 +++++++++-
.../master/procedure/ModifyTableProcedure.java | 39 +++-
.../TestRegionReplicasWithModifyTable.java | 196 +++++++++++++++++++++
6 files changed, 370 insertions(+), 30 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index bd8968d..cea691d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -271,7 +271,7 @@ public class MetaTableAccessor {
* @return An {@link Table} for hbase:meta
* @throws IOException
*/
- static Table getMetaHTable(final Connection connection)
+ public static Table getMetaHTable(final Connection connection)
throws IOException {
// We used to pass whole CatalogTracker in here, now we just pass in Connection
if (connection == null) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
index c2dcbc0..b1b8faa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.hbase.client;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -148,4 +150,45 @@ public class RegionReplicaUtil {
}
return 0;
}
+
+ public static List addReplicas(final TableDescriptor tableDescriptor,
+ final List regions, int oldReplicaCount, int newReplicaCount) {
+ int numRegionReplicas = tableDescriptor.getRegionReplication() - 1;
+ if (numRegionReplicas <= 0) {
+ return regions;
+ }
+ List hRegionInfos = new ArrayList<>((numRegionReplicas + 1) * regions.size());
+ for (int i = 0; i < regions.size(); i++) {
+ if (RegionReplicaUtil.isDefaultReplica(regions.get(i))) {
+ for (int j = oldReplicaCount + 1; j < newReplicaCount; j++) {
+ hRegionInfos.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), j));
+ }
+ }
+ }
+ hRegionInfos.addAll(regions);
+ return hRegionInfos;
+ }
+
+ /**
+ * Create any replicas for the regions (the default replicas that was
+ * already created is passed to the method)
+ * @param hTableDescriptor descriptor to use
+ * @param regions default replicas
+ * @return the combined list of default and non-default replicas
+ */
+ public static List addReplicas(final TableDescriptor tableDescriptor,
+ final List regions) {
+ int numRegionReplicas = tableDescriptor.getRegionReplication() - 1;
+ if (numRegionReplicas <= 0) {
+ return regions;
+ }
+ List hRegionInfos = new ArrayList<>((numRegionReplicas + 1) * regions.size());
+ for (int i = 0; i < regions.size(); i++) {
+ for (int j = 1; j <= numRegionReplicas; j++) {
+ hRegionInfos.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), j));
+ }
+ }
+ hRegionInfos.addAll(regions);
+ return hRegionInfos;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index a5c1584..e8770ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -340,7 +340,7 @@ public class CreateTableProcedure
ProcedureSyncWait.waitMetaRegions(env);
// Add replicas if needed
- List newRegions = addReplicas(env, tableDescriptor, regions);
+ List newRegions = RegionReplicaUtil.addReplicas(tableDescriptor, regions);
// Add regions to META
addRegionsToMeta(env, tableDescriptor, newRegions);
@@ -352,31 +352,6 @@ public class CreateTableProcedure
return newRegions;
}
- /**
- * Create any replicas for the regions (the default replicas that was
- * already created is passed to the method)
- * @param tableDescriptor descriptor to use
- * @param regions default replicas
- * @return the combined list of default and non-default replicas
- */
- private static List addReplicas(final MasterProcedureEnv env,
- final TableDescriptor tableDescriptor,
- final List regions) {
- int numRegionReplicas = tableDescriptor.getRegionReplication() - 1;
- if (numRegionReplicas <= 0) {
- return regions;
- }
- List hRegionInfos = new ArrayList<>((numRegionReplicas+1)*regions.size());
- for (int i = 0; i < regions.size(); i++) {
- for (int j = 1; j <= numRegionReplicas; j++) {
- hRegionInfos.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), j));
- }
- }
- hRegionInfos.addAll(regions);
- return hRegionInfos;
- }
-
-
protected static void setEnablingState(final MasterProcedureEnv env, final TableName tableName)
throws IOException {
// Mark the table as Enabling
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
index ff43d27..bd81589 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
@@ -19,13 +19,24 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
+import java.util.List;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.TableStateManager;
@@ -72,6 +83,7 @@ public class EnableTableProcedure
this.skipTableStateCheck = skipTableStateCheck;
}
+ @SuppressWarnings("deprecation")
@Override
protected Flow executeFromState(final MasterProcedureEnv env, final EnableTableState state)
throws InterruptedException {
@@ -98,7 +110,61 @@ public class EnableTableProcedure
setNextState(EnableTableState.ENABLE_TABLE_MARK_REGIONS_ONLINE);
break;
case ENABLE_TABLE_MARK_REGIONS_ONLINE:
- addChildProcedure(env.getAssignmentManager().createAssignProcedures(tableName));
+ Connection connection = env.getMasterServices().getConnection();
+ // we will need to get the tableDescriptor here to see if there is a change in the replica
+ // count
+ TableDescriptor hTableDescriptor =
+ env.getMasterServices().getTableDescriptors().get(tableName);
+
+ // Get the replica count
+ int regionReplicaCount = hTableDescriptor.getRegionReplication();
+
+ // Get the regions for the table from the memory
+ List regionsOfTable =
+ env.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
+
+ if (regionReplicaCount > 1) {
+ int currentMaxReplica = 0;
+ // Check if the regions in memory have replica regions as marked in META table
+ boolean foundRegion = false;
+ for (HRegionInfo regionInfo : regionsOfTable) {
+ if (foundRegion && RegionReplicaUtil.isDefaultReplica(regionInfo)) {
+ // Just break after we see the region replica for the first region
+ break;
+ } else if (RegionReplicaUtil.isDefaultReplica(regionInfo)) {
+ foundRegion = true;
+ }
+ if (regionInfo.getReplicaId() > currentMaxReplica) {
+ currentMaxReplica = regionInfo.getReplicaId();
+ }
+ }
+
+ // read the META table to know the actual number of replicas for the table - if there
+ // was a table modification on region replica then this will reflect the new entries also
+ int replicasFound =
+ getNumberOfReplicasFromMeta(connection, regionReplicaCount, regionsOfTable);
+ assert regionReplicaCount - 1 == replicasFound;
+ LOG.info("META entries added for the given regionReplicaCount " + regionReplicaCount
+ + " for the table " + tableName.getNameAsString());
+ if (currentMaxReplica == (regionReplicaCount - 1)) {
+ LOG.info(
+ "There is no change to the number of region replicas. Assigning the available regions. Current and previous"
+ + "replica count is " + regionReplicaCount);
+ } else if (currentMaxReplica > (regionReplicaCount - 1)) {
+ // TODO :we have additional regions as the replica count has been decreased. Unassign
+ // those regions
+ } else {
+ // the replicasFound is less than the regionReplication
+ LOG.info(
+ "The number of replicas has been changed(increased). Lets assign the new region replicas. The previous replica count was "
+ + (currentMaxReplica + 1) + ". The current replica count is "
+ + regionReplicaCount);
+ regionsOfTable = RegionReplicaUtil.addReplicas(hTableDescriptor, regionsOfTable,
+ currentMaxReplica, regionReplicaCount);
+ }
+ }
+ // Assign all the table regions. (including region replicas if added)
+ addChildProcedure(env.getAssignmentManager().createAssignProcedures(regionsOfTable));
setNextState(EnableTableState.ENABLE_TABLE_SET_ENABLED_TABLE_STATE);
break;
case ENABLE_TABLE_SET_ENABLED_TABLE_STATE:
@@ -122,6 +188,31 @@ public class EnableTableProcedure
return Flow.HAS_MORE_STATE;
}
+ private int getNumberOfReplicasFromMeta(Connection connection, int regionReplicaCount,
+ List regionsOfTable) throws IOException {
+ Result r = getRegionFromMeta(connection, regionsOfTable);
+ int replicasFound = 0;
+ for (int i = 1; i < regionReplicaCount; i++) {
+ // Since we have already added the entries to the META we will be getting only that here
+ List columnCells =
+ r.getColumnCells(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(i));
+ if (!columnCells.isEmpty()) {
+ replicasFound++;
+ }
+ }
+ return replicasFound;
+ }
+
+ private Result getRegionFromMeta(Connection connection, List regionsOfTable)
+ throws IOException {
+ byte[] metaKeyForRegion = MetaTableAccessor.getMetaKeyForRegion(regionsOfTable.get(0));
+ Get get = new Get(metaKeyForRegion);
+ get.addFamily(HConstants.CATALOG_FAMILY);
+ Table metaTable = MetaTableAccessor.getMetaHTable(connection);
+ Result r = metaTable.get(get);
+ return r;
+ }
+
@Override
protected void rollbackState(final MasterProcedureEnv env, final EnableTableState state)
throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 0fc08c6..9bc84ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -358,13 +359,47 @@ public class ModifyTableProcedure
connection);
}
}
+ // there was a replica but now the region replica count has been increased
+ if (newReplicaCount > oldReplicaCount && oldReplicaCount > 1) {
+ Connection connection = env.getMasterServices().getConnection();
+ // Get the existing table regions
+ List existingTableRegions =
+ MetaTableAccessor.getTableRegions(connection, getTableName());
+ // create the replicaRegion
+ List newRegionInfos = RegionReplicaUtil.addReplicas(newTableDescriptor,
+ existingTableRegions, oldReplicaCount, newReplicaCount);
+
+ // add to META
+ addRegionsToMeta(env, newTableDescriptor, newRegionInfos);
+ } else if (newReplicaCount > 1 && oldReplicaCount <= 1) {
+ // Setup replication for region replicas if needed
+ Connection connection = env.getMasterServices().getConnection();
+
+ // Get the existing table regions
+ List existingTableRegions =
+ MetaTableAccessor.getTableRegions(connection, getTableName());
+
+ // Delete the existing entries from META
+ MetaTableAccessor.deleteRegions(connection, existingTableRegions);
+
+ // create the replica regions
+ List newRegionInfos =
+ RegionReplicaUtil.addReplicas(newTableDescriptor, existingTableRegions);
+
+ // add all the new entries to the meta table
+ // TODO : Make the delete and add atomic??
+ addRegionsToMeta(env, newTableDescriptor, newRegionInfos);
- // Setup replication for region replicas if needed
- if (newReplicaCount > 1 && oldReplicaCount <= 1) {
ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration());
}
}
+ private static void addRegionsToMeta(final MasterProcedureEnv env,
+ final TableDescriptor tableDescriptor, final List regionInfos)
+ throws IOException {
+ MetaTableAccessor.addRegionsToMeta(env.getMasterServices().getConnection(), regionInfos,
+ tableDescriptor.getRegionReplication());
+ }
/**
* Action after modifying table.
* @param env MasterProcedureEnv
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicasWithModifyTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicasWithModifyTable.java
new file mode 100644
index 0000000..4fd1faf
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicasWithModifyTable.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestRegionReplicasWithModifyTable {
+
+ private static final Log LOG = LogFactory.getLog(TestRegionReplicasWithModifyTable.class);
+
+ private static final int NB_SERVERS = 3;
+ private static Table table;
+ private static final byte[] row = "TestRegionReplicasWithRestartScenarios".getBytes();
+
+ private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+ private static final byte[] f = HConstants.CATALOG_FAMILY;
+
+ @Rule
+ public TestName name = new TestName();
+
+ @BeforeClass
+ public static void before() throws Exception {
+ HTU.startMiniCluster(NB_SERVERS);
+ }
+
+ private static void enableReplicationByModification(final TableName tableName,
+ boolean withReplica, int initialReplicaCount, int enableReplicaCount, int splitCount)
+ throws IOException, InterruptedException {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ if (withReplica) {
+ htd.setRegionReplication(initialReplicaCount);
+ }
+ if (splitCount > 0) {
+ byte[][] splits = getSplits(splitCount);
+ table = HTU.createTable(htd, new byte[][] { f }, splits,
+ new Configuration(HTU.getConfiguration()));
+
+ } else {
+ table = HTU.createTable(htd, new byte[][] { f }, (byte[][]) null,
+ new Configuration(HTU.getConfiguration()));
+ }
+ HBaseTestingUtility.setReplicas(HTU.getAdmin(), table.getName(), enableReplicaCount);
+ }
+
+ private static byte[][] getSplits(int numRegions) {
+ RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
+ split.setFirstRow(Bytes.toBytes(0L));
+ split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
+ return split.split(numRegions);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
+ table.close();
+ HTU.shutdownMiniCluster();
+ }
+
+ private HRegionServer getRS() {
+ return HTU.getMiniHBaseCluster().getRegionServer(0);
+ }
+
+ private HRegionServer getSecondaryRS() {
+ return HTU.getMiniHBaseCluster().getRegionServer(1);
+ }
+
+ private HRegionServer getTertiaryRS() {
+ return HTU.getMiniHBaseCluster().getRegionServer(2);
+ }
+
+ @Test(timeout = 60000)
+ public void testRegionReplicasUsingEnableTable() throws Exception {
+ TableName tableName = null;
+ try {
+ tableName = TableName.valueOf(name.getMethodName());
+ enableReplicationByModification(tableName, false, 0, 3, 0);
+ List onlineRegions = getRS().getOnlineRegions(tableName);
+ List onlineRegions2 = getSecondaryRS().getOnlineRegions(tableName);
+ List onlineRegions3 = getTertiaryRS().getOnlineRegions(tableName);
+ int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size();
+ assertEquals("the number of regions should be more than 1", totalRegions, 3);
+ } finally {
+ disableAndDeleteTable(tableName);
+ }
+ }
+
+ private void disableAndDeleteTable(TableName tableName) throws IOException {
+ HTU.getAdmin().disableTable(tableName);
+ HTU.getAdmin().deleteTable(tableName);
+ }
+
+ @Test(timeout = 60000)
+ public void testRegionReplicasUsingEnableTableForMultipleRegions() throws Exception {
+ TableName tableName = null;
+ try {
+ tableName = TableName.valueOf(name.getMethodName());
+ enableReplicationByModification(tableName, false, 0, 3, 10);
+ List onlineRegions = getRS().getOnlineRegions(tableName);
+ List onlineRegions2 = getSecondaryRS().getOnlineRegions(tableName);
+ List onlineRegions3 = getTertiaryRS().getOnlineRegions(tableName);
+ int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size();
+ assertEquals("the number of regions should be equal to 30", totalRegions, 30);
+ } finally {
+ disableAndDeleteTable(tableName);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testRegionReplicasByEnableTableWhenReplicaCountIsIncreased() throws Exception {
+ TableName tableName = null;
+ try {
+ tableName = TableName.valueOf(name.getMethodName());
+ enableReplicationByModification(tableName, true, 2, 3, 0);
+ List onlineRegions = getRS().getOnlineRegions(tableName);
+ List onlineRegions2 = getSecondaryRS().getOnlineRegions(tableName);
+ List onlineRegions3 = getTertiaryRS().getOnlineRegions(tableName);
+ int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size();
+ assertEquals("the number of regions should be 3", totalRegions, 3);
+ } finally {
+ disableAndDeleteTable(tableName);
+ }
+ }
+
+ @Ignore // TODO : this needs to be fixed by unassigning those regions
+ @Test(timeout = 60000)
+ public void testRegionReplicasByEnableTableWhenReplicaCountIsDecreased() throws Exception {
+ TableName tableName = null;
+ try {
+ tableName = TableName.valueOf(name.getMethodName());
+ enableReplicationByModification(tableName, true, 3, 2, 0);
+ List onlineRegions = getRS().getOnlineRegions(tableName);
+ List onlineRegions2 = getSecondaryRS().getOnlineRegions(tableName);
+ List onlineRegions3 = getTertiaryRS().getOnlineRegions(tableName);
+ int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size();
+ assertEquals("the number of regions should be reduced to 2", totalRegions, 2);
+ } finally {
+ disableAndDeleteTable(tableName);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testRegionReplicasByEnableTableWhenReplicaCountIsIncreasedWithmultipleRegions()
+ throws Exception {
+ TableName tableName = null;
+ try {
+ tableName = TableName.valueOf(name.getMethodName());
+ enableReplicationByModification(tableName, true, 2, 3, 15);
+ List onlineRegions = getRS().getOnlineRegions(tableName);
+ List onlineRegions2 = getSecondaryRS().getOnlineRegions(tableName);
+ List onlineRegions3 = getTertiaryRS().getOnlineRegions(tableName);
+ int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size();
+ assertEquals("the number of regions should be equal to 45", totalRegions, 3 * 15);
+ } finally {
+ disableAndDeleteTable(tableName);
+ }
+ }
+}
|