diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index f1b6e76687..684285c88a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -35,6 +35,7 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -62,13 +63,12 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - /** * Class that handles the source of a replication stream. * Currently does not handle more than 1 slave @@ -492,9 +492,11 @@ public class ReplicationSource implements ReplicationSourceInterface { // In rare case, zookeeper setting may be messed up. That leads to the incorrect // peerClusterId value, which is the same as the source clusterId if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { - this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " - + peerClusterId + " which is not allowed by ReplicationEndpoint:" - + replicationEndpoint.getClass().getName(), null, false); + this.terminate("This replication attempt is not allowed by ReplicationEndpoint: " + + replicationEndpoint.getClass().getSimpleName() + + " due to invalid target hbase znode setting", + new ReplicationException("Target cluster has the same clusterId " + peerClusterId + + " as the source " + clusterId), false); this.manager.removeSource(this); return; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 274ccabfbe..6bf6e1cb4a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -17,17 +17,25 @@ */ package org.apache.hadoop.hbase.replication.regionserver; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.OptionalLong; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -48,12 +56,6 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; -import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; -import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper; -import org.apache.hadoop.hbase.replication.regionserver.Replication; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -62,6 +64,14 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.spi.LoggingEvent; +import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -319,5 +329,76 @@ public class TestReplicationSource { Assert.assertEquals(1001L, shipper.getStartPosition()); conf.unset("replication.source.maxretriesmultiplier"); } + + class TestAppender extends AppenderSkeleton { + private final List log = new ArrayList<>(); + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + protected void append(final LoggingEvent loggingEvent) { + log.add(loggingEvent); + } + + @Override + public void close() { + } + + public List getLog() { + return new ArrayList<>(log); + } + } + + /** + * Test to check the source is terminated in case of same hbase clusterId in ZK + * See HBASE-21312 + * @throws IOException when unable to connect to ZK + */ + @Test + public void testSamePeerClusterID() throws Exception { + + TestAppender appender = new TestAppender(); + LogManager.getLogger(ReplicationSource.class).setLevel(Level.ERROR); + LogManager.getLogger(ReplicationSource.class).addAppender(appender); + + MiniHBaseCluster srcCluster = TEST_UTIL.getMiniHBaseCluster(); + + HRegionServer srcRS = srcCluster.startRegionServer().getRegionServer(); + + ZKWatcher zkwTgt = new ZKWatcher(TEST_UTIL_PEER.getConfiguration(), "tgt", null); + UUID srcHBaseId = null; + try { + srcHBaseId = ZKClusterId.getUUIDForCluster(srcRS.getZooKeeper()); + ZKUtil.setData(zkwTgt, "/hbase/hbaseid", Bytes.toBytes(srcHBaseId.toString())); + } catch (KeeperException ke) { + throw new IOException("Could not set HBase cluster id", ke); + } + + String peerId = "TestPeer2"; + Admin admin = TEST_UTIL.getAdmin(); + admin.addReplicationPeer(peerId, + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build()); + + String pattern = "Closing source " + peerId + " because an error occurred: " + + "This replication attempt is not allowed by ReplicationEndpoint: " + + "HBaseInterClusterReplicationEndpoint due to invalid target hbase znode setting"; + + Pattern r = Pattern.compile(pattern); + final List log = appender.getLog(); + for (LoggingEvent event : log) { + if (event.getRenderedMessage().contains(peerId)) { + Matcher m = r.matcher(event.getRenderedMessage()); + assertTrue(m.matches()); + } + } + + ZKUtil.setData(zkwTgt, "/hbase/hbaseid", Bytes.toBytes(UUID.randomUUID().toString())); + LogManager.getLogger(ReplicationSource.class).setLevel(Level.ALL); + LogManager.getLogger(ReplicationSource.class).removeAppender(appender); + } + }