From 40f2f9d01c21aad1aca1957f397a4e6b84eeac13 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Tue, 6 Feb 2018 16:00:59 +0800 Subject: [PATCH] HBASE-19935 Only allow table replication for sync replication for now --- .../master/replication/ReplicationPeerManager.java | 32 ++++++++++++++-- .../client/replication/TestReplicationAdmin.java | 43 +++++++++++----------- .../hbase/wal/TestSyncReplicationWALProvider.java | 6 +++ 3 files changed, 56 insertions(+), 25 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 9336fbd..9779522 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -167,7 +167,7 @@ public class ReplicationPeerManager { " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'"); } - if (oldPeerConfig.getRemoteWALDir() != null) { + if (isSyncReplication(oldPeerConfig)) { if (!ReplicationUtils.isKeyConfigEqual(oldPeerConfig, peerConfig)) { throw new DoNotRetryIOException( "Changing the replicated namespace/table config on a synchronous replication " @@ -195,8 +195,8 @@ public class ReplicationPeerManager { } ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); SyncReplicationState syncReplicationState = - StringUtils.isBlank(peerConfig.getRemoteWALDir()) ? SyncReplicationState.NONE - : SyncReplicationState.DOWNGRADE_ACTIVE; + isSyncReplication(copiedPeerConfig) ? SyncReplicationState.DOWNGRADE_ACTIVE + : SyncReplicationState.NONE; peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState)); @@ -316,9 +316,28 @@ public class ReplicationPeerManager { peerConfig.getTableCFsMap()); } + if (isSyncReplication(peerConfig)) { + checkPeerConfigForSyncReplication(peerConfig); + } + checkConfiguredWALEntryFilters(peerConfig); } + private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + // This is used to reduce the difficulty for implementing the sync replication state transition + // as we need to reopen all the related regions. + // TODO: Add namespace, replicat_all flag back + if (peerConfig.replicateAllUserTables()) { + throw new DoNotRetryIOException( + "Only support replicated table config for sync replication peer"); + } + if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) { + throw new DoNotRetryIOException( + "Only support replicated table config for sync replication peer"); + } + } + /** * Set a namespace in the peer config means that all tables in this namespace will be replicated * to the peer cluster. @@ -396,6 +415,13 @@ public class ReplicationPeerManager { } /** + * Use remote wal dir to decide whether a peer is sync replication peer + */ + private boolean isSyncReplication(ReplicationPeerConfig peerConfig) { + return !StringUtils.isBlank(peerConfig.getRemoteWALDir()); + } + + /** * For replication peer cluster key or endpoint class, null and empty string is same. So here * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index a7710e7..b040f95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -929,45 +929,42 @@ public class TestReplicationAdmin { builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_SECOND); builder.setRemoteWALDir(rootDir); - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); - - rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); - assertEquals(rootDir, rpc.getRemoteWALDir()); try { - builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail("Change remote wal dir is not allowed"); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("Only support replicated table config for sync replication"); } catch (Exception e) { // OK } + builder.setReplicateAllUserTables(false); try { - builder.setRemoteWALDir(null); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail("Change remote wal dir is not allowed"); + Set namespaces = new HashSet(); + namespaces.add("ns1"); + builder.setNamespaces(namespaces); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("Only support replicated table config for sync replication"); } catch (Exception e) { // OK } + builder.setNamespaces(null); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); + assertEquals(rootDir, rpc.getRemoteWALDir()); + try { - builder = ReplicationPeerConfig.newBuilder(rpc); - builder.setReplicateAllUserTables(false); + builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail( - "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + fail("Change remote wal dir is not allowed"); } catch (Exception e) { // OK } try { - builder = ReplicationPeerConfig.newBuilder(rpc); - Set namespaces = new HashSet<>(); - namespaces.add("ns1"); - builder.setExcludeNamespaces(namespaces); + builder.setRemoteWALDir(null); hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail( - "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + fail("Change remote wal dir is not allowed"); } catch (Exception e) { // OK } @@ -976,10 +973,10 @@ public class TestReplicationAdmin { builder = ReplicationPeerConfig.newBuilder(rpc); Map> tableCfs = new HashMap<>(); tableCfs.put(TableName.valueOf(name.getMethodName()), new ArrayList<>()); - builder.setExcludeTableCFsMap(tableCfs); + builder.setTableCFsMap(tableCfs); hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); fail( - "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + "Change replicated table config on an existing synchronous peer is not allowed"); } catch (Exception e) { // OK } @@ -989,6 +986,7 @@ public class TestReplicationAdmin { public void testTransitSyncReplicationPeerState() throws Exception { ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_ONE); + builder.setReplicateAllUserTables(false); hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); assertEquals(SyncReplicationState.NONE, hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE)); @@ -1005,6 +1003,7 @@ public class TestReplicationAdmin { builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_SECOND); builder.setRemoteWALDir(rootDir); + builder.setReplicateAllUserTables(false); hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index 60a9e13..f09e51e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.util.Optional; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; @@ -41,12 +42,17 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ RegionServerTests.class, MediumTests.class }) public class TestSyncReplicationWALProvider { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static String PEER_ID = "1"; -- 1.9.1