diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 10fa50f553..65af492b52 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -487,9 +487,10 @@ public class ReplicationSource implements ReplicationSourceInterface { // In rare case, zookeeper setting may be messed up. That leads to the incorrect // peerClusterId value, which is the same as the source clusterId if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { - this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " - + peerClusterId + " which is not allowed by ReplicationEndpoint:" - + replicationEndpoint.getClass().getName(), null, false); + this.terminate("This replication attempt is not allowed by ReplicationEndpoint: " + + replicationEndpoint.getClass().getSimpleName() + " due to invalid target hbase znode setting", + new ReplicationException("Target cluster has the same clusterId " + peerClusterId + + " as the source " + clusterId), false); this.manager.removeSource(this); return; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 274ccabfbe..7cc7fcc58e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -17,17 +17,19 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.OptionalLong; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -62,6 +64,14 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.spi.LoggingEvent; +import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -72,6 +82,8 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.*; + @Category({ReplicationTests.class, MediumTests.class}) public class TestReplicationSource { @@ -319,5 +331,82 @@ public class TestReplicationSource { Assert.assertEquals(1001L, shipper.getStartPosition()); conf.unset("replication.source.maxretriesmultiplier"); } + + class TestAppender extends AppenderSkeleton { + private final List log = new ArrayList<>(); + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + protected void append(final LoggingEvent loggingEvent) { + log.add(loggingEvent); + } + + @Override + public void close() { + } + + public List getLog() { + return new ArrayList<>(log); + } + } + + /** + * Test to check the source is terminated in case of same hbase clusterId in ZK + * @throws Exception + */ + @Test + public void testSamePeerClusterID() throws Exception { + + TestAppender appender = new TestAppender(); + LogManager.getLogger(ReplicationSource.class).setLevel(Level.ERROR); + LogManager.getLogger(ReplicationSource.class).addAppender(appender); + + setUpBeforeClass(); + + MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(1); + + HRegionServer serverA = cluster.getRegionServer(0); + final ReplicationSourceManager managerA = + ((Replication) serverA.getReplicationSourceService()).getReplicationManager(); + + UUID srcHBaseId = null; + try { + srcHBaseId = ZKClusterId.getUUIDForCluster(serverA.getZooKeeper()); + } catch (KeeperException ke) { + throw new IOException("Could not read cluster id", ke); + } + + TEST_UTIL_PEER.startMiniCluster(2); + + ZKWatcher zkwTgt = new ZKWatcher(TEST_UTIL_PEER.getConfiguration(), "tgt", null); + ZKUtil.setData(zkwTgt,"/hbase/hbaseid", Bytes.toBytes(srcHBaseId.toString())); + + Admin admin = TEST_UTIL.getAdmin(); + String peerId = "TestPeer1"; + admin.addReplicationPeer(peerId, + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build()); + + managerA.addSource(peerId); + + String pattern = "Closing source " + peerId + " because an error occurred: " + + "This replication attempt is not allowed by ReplicationEndpoint: " + + "HBaseInterClusterReplicationEndpoint due to invalid target hbase znode setting"; + + Pattern r = Pattern.compile(pattern); + final List log = appender.getLog(); + for (LoggingEvent event : log) { + Matcher m = r.matcher(event.getRenderedMessage()); + assertTrue(m.matches()); + } + + cluster.stopRegionServer(0); + tearDownAfterClass(); + LogManager.getLogger(ReplicationSource.class).setLevel(Level.ALL); + LogManager.getLogger(ReplicationSource.class).removeAppender(appender); + } }