diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 1880a0d7c0..fc5ef8063b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -834,6 +834,30 @@ public class MetaTableAccessor { return HConstants.STATE_QUALIFIER; } + /** + * Returns the column qualifier for serialized region state + * @param replicaId the replicaId of the region + * @return a byte[] for state qualifier + */ + @VisibleForTesting + public static byte[] getRegionStateColumn(int replicaId) { + return replicaId == 0 ? HConstants.STATE_QUALIFIER + : Bytes.toBytes(HConstants.STATE_QUALIFIER_STR + META_REPLICA_ID_DELIMITER + + String.format(RegionInfo.REPLICA_ID_FORMAT, replicaId)); + } + + /** + * Returns the column qualifier for serialized region state + * @param replicaId the replicaId of the region + * @return a byte[] for sn column qualifier + */ + @VisibleForTesting + public static byte[] getServerNameColumn(int replicaId) { + return replicaId == 0 ? HConstants.SERVERNAME_QUALIFIER + : Bytes.toBytes(HConstants.SERVERNAME_QUALIFIER_STR + META_REPLICA_ID_DELIMITER + + String.format(RegionInfo.REPLICA_ID_FORMAT, replicaId)); + } + /** * Returns the column qualifier for server column for replicaId * @param replicaId the replicaId of the region @@ -1386,7 +1410,10 @@ public class MetaTableAccessor { getSeqNumColumn(i), now); deleteReplicaLocations.addColumns(getCatalogFamily(), getStartCodeColumn(i), now); + deleteReplicaLocations.addColumns(getCatalogFamily(), getServerNameColumn(i), now); + deleteReplicaLocations.addColumns(getCatalogFamily(), getRegionStateColumn(i), now); } + deleteFromMetaTable(connection, deleteReplicaLocations); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java index c46070cd58..4c1b20c08b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java @@ -124,56 +124,52 @@ public class EnableTableProcedure List regionsOfTable = env.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName, true); - if (regionReplicaCount > 1) { - int currentMaxReplica = 0; - // Check if the regions in memory have replica regions as marked in META table - for (RegionInfo regionInfo : regionsOfTable) { - if (regionInfo.getReplicaId() > currentMaxReplica) { - // Iterating through all the list to identify the highest replicaID region. - // We can stop after checking with the first set of regions?? - currentMaxReplica = regionInfo.getReplicaId(); - } + int currentMaxReplica = 0; + // Check if the regions in memory have replica regions as marked in META table + for (RegionInfo regionInfo : regionsOfTable) { + if (regionInfo.getReplicaId() > currentMaxReplica) { + // Iterating through all the list to identify the highest replicaID region. + // We can stop after checking with the first set of regions?? + currentMaxReplica = regionInfo.getReplicaId(); } + } - // read the META table to know the actual number of replicas for the table - if there - // was a table modification on region replica then this will reflect the new entries also - int replicasFound = - getNumberOfReplicasFromMeta(connection, regionReplicaCount, regionsOfTable); - assert regionReplicaCount - 1 == replicasFound; - LOG.info(replicasFound + " META entries added for the given regionReplicaCount " - + regionReplicaCount + " for the table " + tableName.getNameAsString()); - if (currentMaxReplica == (regionReplicaCount - 1)) { - if (LOG.isDebugEnabled()) { - LOG.debug("There is no change to the number of region replicas." - + " Assigning the available regions." + " Current and previous" - + "replica count is " + regionReplicaCount); - } - } else if (currentMaxReplica > (regionReplicaCount - 1)) { - // we have additional regions as the replica count has been decreased. Delete - // those regions because already the table is in the unassigned state - LOG.info("The number of replicas " + (currentMaxReplica + 1) - + " is more than the region replica count " + regionReplicaCount); - List copyOfRegions = new ArrayList(regionsOfTable); - for (RegionInfo regionInfo : copyOfRegions) { - if (regionInfo.getReplicaId() > (regionReplicaCount - 1)) { - // delete the region from the regionStates - env.getAssignmentManager().getRegionStates().deleteRegion(regionInfo); - // remove it from the list of regions of the table - LOG.info("The regioninfo being removed is " + regionInfo + " " - + regionInfo.getReplicaId()); - regionsOfTable.remove(regionInfo); - } + // read the META table to know the actual number of replicas for the table - if there + // was a table modification on region replica then this will reflect the new entries also + int replicasFound = + getNumberOfReplicasFromMeta(connection, regionReplicaCount, regionsOfTable); + assert regionReplicaCount - 1 == replicasFound; + LOG.info(replicasFound + " META entries added for the given regionReplicaCount " + + regionReplicaCount + " for the table " + tableName.getNameAsString()); + if (currentMaxReplica == (regionReplicaCount - 1)) { + if (LOG.isDebugEnabled()) { + LOG.debug("There is no change to the number of region replicas." + + " Assigning the available regions." + " Current and previous" + + "replica count is " + regionReplicaCount); + } + } else if (currentMaxReplica > (regionReplicaCount - 1)) { + // we have additional regions as the replica count has been decreased. Delete + // those regions because already the table is in the unassigned state + LOG.info("The number of replicas " + (currentMaxReplica + 1) + + " is more than the region replica count " + regionReplicaCount); + List copyOfRegions = new ArrayList(regionsOfTable); + for (RegionInfo regionInfo : copyOfRegions) { + if (regionInfo.getReplicaId() > (regionReplicaCount - 1)) { + // delete the region from the regionStates + env.getAssignmentManager().getRegionStates().deleteRegion(regionInfo); + // remove it from the list of regions of the table + LOG.info( + "The regioninfo being removed is " + regionInfo + " " + regionInfo.getReplicaId()); + regionsOfTable.remove(regionInfo); } - } else { - // the replicasFound is less than the regionReplication - LOG.info( - "The number of replicas has been changed(increased)." - + " Lets assign the new region replicas. The previous replica count was " - + (currentMaxReplica + 1) + ". The current replica count is " - + regionReplicaCount); - regionsOfTable = RegionReplicaUtil.addReplicas(hTableDescriptor, regionsOfTable, - currentMaxReplica + 1, regionReplicaCount); } + } else { + // the replicasFound is less than the regionReplication + LOG.info("The number of replicas has been changed(increased)." + + " Lets assign the new region replicas. The previous replica count was " + + (currentMaxReplica + 1) + ". The current replica count is " + regionReplicaCount); + regionsOfTable = RegionReplicaUtil.addReplicas(hTableDescriptor, regionsOfTable, + currentMaxReplica + 1, regionReplicaCount); } // Assign all the table regions. (including region replicas if added). // createAssignProcedure will try to retain old assignments if possible. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index cb755feccf..2ed6be3226 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -31,7 +31,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; @@ -70,8 +69,10 @@ import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.cache.Cache; import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; @@ -276,7 +277,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) { super(controller, entryBuffers, numWriters); - this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout); + this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors); this.tableDescriptors = tableDescriptors; // A cache for the table "memstore replication enabled" flag. @@ -348,7 +349,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { * We should always replicate meta operations (e.g. flush) * and use the user HTD flag to decide whether or not replicate the memstore. */ - private boolean requiresReplication(final TableName tableName, final List entries) + public boolean requiresReplication(final TableName tableName, final List entries) throws IOException { // unit-tests may not the TableDescriptors, bypass the check and always replicate if (tableDescriptors == null) return true; @@ -390,9 +391,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { int operationTimeout; ExecutorService pool; Cache disabledAndDroppedTables; + TableDescriptors tableDescriptors; public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection, - ExecutorService pool, int operationTimeout) { + ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) { this.sink = sink; this.connection = connection; this.operationTimeout = operationTimeout; @@ -400,6 +402,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { = RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); this.pool = pool; + this.tableDescriptors = tableDescriptors; int nonExistentTableCacheExpiryMs = connection.getConfiguration() .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000); @@ -506,13 +509,14 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } boolean tasksCancelled = false; - for (Future task : tasks) { + for (int replicaId = 0; replicaId < tasks.size(); replicaId++) { try { - task.get(); + tasks.get(replicaId).get(); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } catch (ExecutionException e) { Throwable cause = e.getCause(); + boolean canBeSkipped = false; if (cause instanceof IOException) { // The table can be disabled or dropped at this time. For disabled tables, we have no // cheap mechanism to detect this case because meta does not contain this information. @@ -520,21 +524,33 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { // RPC. So instead we start the replay RPC with retries and check whether the table is // dropped or disabled which might cause SocketTimeoutException, or // RetriesExhaustedException or similar if we get IOE. - if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) { + if (cause instanceof TableNotFoundException + || connection.isTableDisabled(tableName)) { + disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. + canBeSkipped = true; + } else if (tableDescriptors != null) { + TableDescriptor tableDescriptor = tableDescriptors.get(tableName); + if (tableDescriptor != null + && tableDescriptor.getRegionReplication() <= (replicaId + 1)) { + canBeSkipped = true; + } + } + if (canBeSkipped) { if (LOG.isTraceEnabled()) { LOG.trace("Skipping " + entries.size() + " entries in table " + tableName - + " because received exception for dropped or disabled table", cause); + + " because received exception for dropped or disabled table", + cause); for (Entry entry : entries) { LOG.trace("Skipping : " + entry); } } - disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. if (!tasksCancelled) { sink.getSkippedEditsCounter().addAndGet(entries.size()); tasksCancelled = true; // so that we do not add to skipped counter again } continue; } + // otherwise rethrow throw (IOException)cause; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index 61a1fbfc56..04e188b653 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -24,11 +24,17 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.common.collect.Lists; import java.io.IOException; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Cell.Type; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -50,6 +56,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -65,8 +73,6 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - /** * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying * async wal replication replays the edits to the secondary region in various scenarios. @@ -263,7 +269,7 @@ public class TestRegionReplicaReplicationEndpoint { for (int i = 1; i < regionReplication; i++) { final Region region = regions[i]; // wait until all the data is replicated to all secondary regions - Waiter.waitFor(HTU.getConfiguration(), 90000, new Waiter.Predicate() { + Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { LOG.info("verifying replication for region replica:" + region.getRegionInfo()); @@ -342,7 +348,6 @@ public class TestRegionReplicaReplicationEndpoint { Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); Table table = connection.getTable(tableName); - try { // load the data to the table @@ -364,26 +369,34 @@ public class TestRegionReplicaReplicationEndpoint { @Test public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception { - testRegionReplicaReplicationIgnoresDisabledTables(false); + testRegionReplicaReplicationIgnores(false, false); } @Test public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception { - testRegionReplicaReplicationIgnoresDisabledTables(true); + testRegionReplicaReplicationIgnores(true, false); + } + + @Test + public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception { + testRegionReplicaReplicationIgnores(false, true); } - public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable) + public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication) throws Exception { + // tests having edits from a disabled or dropped table is handled correctly by skipping those // entries and further edits after the edits from dropped/disabled table can be replicated // without problems. - final TableName tableName = TableName.valueOf(name.getMethodName() + dropTable); + final TableName tableName = TableName.valueOf(name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + ""); HTableDescriptor htd = HTU.createTableDescriptor(tableName); int regionReplication = 3; htd.setRegionReplication(regionReplication); HTU.deleteTableIfAny(tableName); + HTU.getAdmin().createTable(htd); - TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable"); + TableName toBeDisabledTable = TableName.valueOf( + dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable")); HTU.deleteTableIfAny(toBeDisabledTable); htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); htd.setRegionReplication(regionReplication); @@ -403,30 +416,46 @@ public class TestRegionReplicaReplicationEndpoint { AtomicLong skippedEdits = new AtomicLong(); RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = - mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); - when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); + mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); + when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); + FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(), + FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath()); RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, - (ClusterConnection) connection, - Executors.newSingleThreadExecutor(), Integer.MAX_VALUE); + (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE, + fstd); RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); + Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A")) + .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build(); Entry entry = new Entry( new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), - new WALEdit()); + new WALEdit() + .add(cell)); HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table if (dropTable) { HTU.getAdmin().deleteTable(toBeDisabledTable); + } else if (disableReplication) { + htd.setRegionReplication(regionReplication - 2); + HTU.getAdmin().modifyTable(toBeDisabledTable, htd); + HTU.getAdmin().enableTable(toBeDisabledTable); } - sinkWriter.append(toBeDisabledTable, encodedRegionName, HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); assertEquals(2, skippedEdits.get()); + if (disableReplication) { + // enable replication again so that we can verify replication + HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table + htd.setRegionReplication(regionReplication); + HTU.getAdmin().modifyTable(toBeDisabledTable, htd); + HTU.getAdmin().enableTable(toBeDisabledTable); + } + try { // load some data to the to-be-dropped table