From 042d2d4a6f443baccef7b14ecb6c9ed8d5100510 Mon Sep 17 00:00:00 2001 From: Ankit Singhal Date: Fri, 20 Jul 2018 14:13:18 -0700 Subject: HBASE-20908 Infinite loop on regionserver if region replica are reduced --- .../RegionReplicaReplicationEndpoint.java | 28 +++++++--- .../TestRegionReplicaReplicationEndpoint.java | 63 +++++++++++++++------- 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index 235e27a6eb..efbb8f3921 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -323,7 +323,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) { super(controller, entryBuffers, numWriters); - this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout); + this.sinkWriter = + new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors); this.tableDescriptors = tableDescriptors; // A cache for the table "memstore replication enabled" flag. @@ -437,9 +438,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { int operationTimeout; ExecutorService pool; Cache disabledAndDroppedTables; + TableDescriptors tableDescriptors; public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection, - ExecutorService pool, int operationTimeout) { + ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) { this.sink = sink; this.connection = connection; this.operationTimeout = operationTimeout; @@ -447,6 +449,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { = RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); this.pool = pool; + this.tableDescriptors = tableDescriptors; int nonExistentTableCacheExpiryMs = connection.getConfiguration() .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000); @@ -555,13 +558,14 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } boolean tasksCancelled = false; - for (Future task : tasks) { + for (int replicaId = 0; replicaId < tasks.size(); replicaId++) { try { - task.get(); + tasks.get(replicaId).get(); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } catch (ExecutionException e) { Throwable cause = e.getCause(); + boolean canBeSkipped = false; if (cause instanceof IOException) { // The table can be disabled or dropped at this time. For disabled tables, we have no // cheap mechanism to detect this case because meta does not contain this information. @@ -570,14 +574,26 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { // check whether the table is dropped or disabled which might cause // SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE. if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) { + disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. + canBeSkipped = true; + } else if (tableDescriptors != null) { + HTableDescriptor tableDescriptor = tableDescriptors.get(tableName); + if (tableDescriptor != null + // (replicaId + 1) as no task is added for primary replica for replication + && tableDescriptor.getRegionReplication() <= (replicaId + 1)) { + canBeSkipped = true; + } + } + if (canBeSkipped) { if (LOG.isTraceEnabled()) { LOG.trace("Skipping " + entries.size() + " entries in table " + tableName - + " because received exception for dropped or disabled table", cause); + + " because received exception for dropped or disabled table", + cause); for (Entry entry : entries) { LOG.trace("Skipping : " + entry); } } - disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. + if (!tasksCancelled) { sink.getSkippedEditsCounter().addAndGet(entries.size()); tasksCancelled = true; // so that we do not add to skipped counter again diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index 2c8119a55e..7e099693ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -18,41 +18,49 @@ package org.apache.hadoop.hbase.replication.regionserver; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.common.collect.Lists; import java.io.IOException; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.log4j.Level; import org.junit.AfterClass; @@ -60,8 +68,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; - /** * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying * async wal replication replays the edits to the secondary region in various scenarios. @@ -229,7 +235,7 @@ public class TestRegionReplicaReplicationEndpoint { for (int i = 1; i < regionReplication; i++) { final Region region = regions[i]; // wait until all the data is replicated to all secondary regions - Waiter.waitFor(HTU.getConfiguration(), 90000, new Waiter.Predicate() { + Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { LOG.info("verifying replication for region replica:" + region.getRegionInfo()); @@ -307,7 +313,6 @@ public class TestRegionReplicaReplicationEndpoint { Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); Table table = connection.getTable(tableName); - try { // load the data to the table @@ -327,29 +332,35 @@ public class TestRegionReplicaReplicationEndpoint { } } - @Test (timeout = 240000) + @Test(timeout = 240000) public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception { - testRegionReplicaReplicationIgnoresDisabledTables(false); + testRegionReplicaReplicationIgnores(false, false); } - @Test (timeout = 240000) + @Test(timeout = 240000) public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception { - testRegionReplicaReplicationIgnoresDisabledTables(true); + testRegionReplicaReplicationIgnores(true, false); } - public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable) + @Test(timeout = 240000) + public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception { + testRegionReplicaReplicationIgnores(false, true); + } + + public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication) throws Exception { // tests having edits from a disabled or dropped table is handled correctly by skipping those // entries and further edits after the edits from dropped/disabled table can be replicated // without problems. TableName tableName = TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables" - + dropTable); + + "_drop_" + dropTable + "_disabledReplication_" + disableReplication); HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); int regionReplication = 3; htd.setRegionReplication(regionReplication); HTU.deleteTableIfAny(tableName); HTU.getHBaseAdmin().createTable(htd); - TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable"); + TableName toBeDisabledTable = TableName.valueOf( + dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable")); HTU.deleteTableIfAny(toBeDisabledTable); htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); htd.setRegionReplication(regionReplication); @@ -371,14 +382,16 @@ public class TestRegionReplicaReplicationEndpoint { RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); + FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(), + FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath()); RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, (ClusterConnection) connection, - Executors.newSingleThreadExecutor(), Integer.MAX_VALUE); + Executors.newSingleThreadExecutor(), Integer.MAX_VALUE, fstd); + RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); - Entry entry = new Entry( new WALKey(encodedRegionName, toBeDisabledTable, 1), new WALEdit()); @@ -386,13 +399,23 @@ public class TestRegionReplicaReplicationEndpoint { HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table if (dropTable) { HTU.getHBaseAdmin().deleteTable(toBeDisabledTable); + } else if (disableReplication) { + htd.setRegionReplication(regionReplication - 2); + HTU.getHBaseAdmin().modifyTable(toBeDisabledTable, htd); + HTU.getHBaseAdmin().enableTable(toBeDisabledTable); } sinkWriter.append(toBeDisabledTable, encodedRegionName, HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); assertEquals(2, skippedEdits.get()); - + if (disableReplication) { + // enable replication again so that we can verify replication + HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table + htd.setRegionReplication(regionReplication); + HTU.getHBaseAdmin().modifyTable(toBeDisabledTable, htd); + HTU.getHBaseAdmin().enableTable(toBeDisabledTable); + } try { // load some data to the to-be-dropped table -- 2.15.1 (Apple Git-101)