From 99e20dcd312a9b41f251f107dbe6cf97fe7aac5c Mon Sep 17 00:00:00 2001 From: ddupg Date: Thu, 28 Nov 2019 18:57:56 +0800 Subject: [PATCH] HBASE-23345 Duplicate code about deciding whether a table need replicate, copy the code from ReplicationUtils.contains to ReplicationPeerConfig.needToReplicate, and delete ReplicationUtils.contains HBASE-23345 modify UT, use ReplicationPeerConfigBuilder instead of mock --- .../hbase/replication/ReplicationPeerConfig.java | 25 ++- .../replication/TestReplicationPeerConfig.java | 202 ++++++++++++++++++ .../hadoop/hbase/replication/ReplicationUtils.java | 38 ---- .../hbase/replication/TestReplicationUtil.java | 235 --------------------- .../master/replication/ModifyPeerProcedure.java | 7 +- .../master/replication/ReplicationPeerManager.java | 2 +- .../replication/UpdatePeerConfigProcedure.java | 6 +- .../NamespaceTableCfWALEntryFilter.java | 2 +- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 7 +- .../TestReplicationWALEntryFilters.java | 116 +++++----- 10 files changed, 278 insertions(+), 362 deletions(-) delete mode 100644 hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationUtil.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index e0d9a4c..7c0f115 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -366,22 +366,31 @@ public class ReplicationPeerConfig { * @return true if the table need replicate to the peer cluster */ public boolean needToReplicate(TableName table) { + String namespace = table.getNamespaceAsString(); if (replicateAllUserTables) { - if (excludeNamespaces != null && excludeNamespaces.contains(table.getNamespaceAsString())) { + // replicate all user tables, but filter by exclude namespaces and table-cfs config + if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) { return false; } - if (excludeTableCFsMap != null && excludeTableCFsMap.containsKey(table)) { - return false; + // trap here, must check existence first since HashMap allows null value. + if (excludeTableCFsMap == null || !excludeTableCFsMap.containsKey(table)) { + return true; } - return true; + Collection cfs = excludeTableCFsMap.get(table); + // if cfs is null or empty then we can make sure that we do not need to replicate this table, + // otherwise, we may still need to replicate the table but filter out some families. + return cfs != null && !cfs.isEmpty(); } else { - if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { - return true; + // Not replicate all user tables, so filter by namespaces and table-cfs config + if (namespaces == null && tableCFsMap == null) { + return false; } - if (tableCFsMap != null && tableCFsMap.containsKey(table)) { + // First filter by namespaces config + // If table's namespace in peer config, all the tables data are applicable for replication + if (namespaces != null && namespaces.contains(namespace)) { return true; } - return false; + return tableCFsMap != null && tableCFsMap.containsKey(table); } } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java index 881ef45..d67a3f8 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java @@ -17,10 +17,17 @@ */ package org.apache.hadoop.hbase.replication; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.BuilderStyleTest; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -32,6 +39,9 @@ public class TestReplicationPeerConfig { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationPeerConfig.class); + private static TableName TABLE_A = TableName.valueOf("replication", "testA"); + private static TableName TABLE_B = TableName.valueOf("replication", "testB"); + @Test public void testClassMethodsAreBuilderStyle() { /* ReplicationPeerConfig should have a builder style setup where setXXX/addXXX methods @@ -48,4 +58,196 @@ public class TestReplicationPeerConfig { BuilderStyleTest.assertClassesAreBuilderStyle(ReplicationPeerConfig.class); } + + @Test + public void testNeedToReplicateWithReplicatingAll() { + ReplicationPeerConfig peerConfig; + ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder = + new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl(); + Map> tableCfs = new HashMap<>(); + Set namespaces = new HashSet<>(); + + // 1. replication_all flag is true, no namespaces and table-cfs config + builder.setReplicateAllUserTables(true); + peerConfig = builder.build(); + Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + + // 2. replicate_all flag is true, and config in excludedTableCfs + builder.setExcludeNamespaces(null); + // empty map + tableCfs = new HashMap<>(); + builder.setReplicateAllUserTables(true); + builder.setExcludeTableCFsMap(tableCfs); + peerConfig = builder.build(); + Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + + // table testB + tableCfs = new HashMap<>(); + tableCfs.put(TABLE_B, null); + builder.setReplicateAllUserTables(true); + builder.setExcludeTableCFsMap(tableCfs); + peerConfig = builder.build(); + Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + + // table testA + tableCfs = new HashMap<>(); + tableCfs.put(TABLE_A, null); + builder.setReplicateAllUserTables(true); + builder.setExcludeTableCFsMap(tableCfs); + peerConfig = builder.build(); + Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + + // 3. replicate_all flag is true, and config in excludeNamespaces + builder.setExcludeTableCFsMap(null); + // empty set + namespaces = new HashSet<>(); + builder.setReplicateAllUserTables(true); + builder.setExcludeNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + + // namespace default + namespaces = new HashSet<>(); + namespaces.add("default"); + builder.setReplicateAllUserTables(true); + builder.setExcludeNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + + // namespace replication + namespaces = new HashSet<>(); + namespaces.add("replication"); + builder.setReplicateAllUserTables(true); + builder.setExcludeNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + + // 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both + // Namespaces config doesn't conflict with table-cfs config + namespaces = new HashSet<>(); + tableCfs = new HashMap<>(); + namespaces.add("replication"); + tableCfs.put(TABLE_A, null); + builder.setReplicateAllUserTables(true); + builder.setExcludeTableCFsMap(tableCfs); + builder.setExcludeNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + + // Namespaces config conflicts with table-cfs config + namespaces = new HashSet<>(); + tableCfs = new HashMap<>(); + namespaces.add("default"); + tableCfs.put(TABLE_A, null); + builder.setReplicateAllUserTables(true); + builder.setExcludeTableCFsMap(tableCfs); + builder.setExcludeNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + + namespaces = new HashSet<>(); + tableCfs = new HashMap<>(); + namespaces.add("replication"); + tableCfs.put(TABLE_B, null); + builder.setReplicateAllUserTables(true); + builder.setExcludeTableCFsMap(tableCfs); + builder.setExcludeNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + + } + + @Test + public void testNeedToReplicateWithoutReplicatingAll() { + ReplicationPeerConfig peerConfig; + ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder = + new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl(); + Map> tableCfs = new HashMap<>(); + Set namespaces = new HashSet<>(); + + // 1. replication_all flag is false, no namespaces and table-cfs config + builder.setReplicateAllUserTables(false); + peerConfig = builder.build(); + Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + + // 2. replicate_all flag is false, and only config table-cfs in peer + // empty map + builder.setReplicateAllUserTables(false); + builder.setTableCFsMap(tableCfs); + peerConfig = builder.build(); + Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + + // table testB + tableCfs = new HashMap<>(); + tableCfs.put(TABLE_B, null); + builder.setReplicateAllUserTables(false); + builder.setTableCFsMap(tableCfs); + peerConfig = builder.build(); + Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + + // table testA + tableCfs = new HashMap<>(); + tableCfs.put(TABLE_A, null); + builder.setReplicateAllUserTables(false); + builder.setTableCFsMap(tableCfs); + peerConfig = builder.build(); + Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + + // 3. replication_all flag is false, and only config namespace in peer + builder.setTableCFsMap(null); + // empty set + builder.setReplicateAllUserTables(false); + builder.setNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + + // namespace default + namespaces = new HashSet<>(); + namespaces.add("default"); + builder.setReplicateAllUserTables(false); + builder.setNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + + // namespace replication + namespaces = new HashSet<>(); + namespaces.add("replication"); + builder.setReplicateAllUserTables(false); + builder.setNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + + // 4. replicate_all flag is false, and config namespaces and table-cfs both + // Namespaces config doesn't conflict with table-cfs config + namespaces = new HashSet<>(); + tableCfs = new HashMap<>(); + namespaces.add("replication"); + tableCfs.put(TABLE_A, null); + builder.setReplicateAllUserTables(false); + builder.setTableCFsMap(tableCfs); + builder.setNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + + // Namespaces config conflicts with table-cfs config + namespaces = new HashSet<>(); + tableCfs = new HashMap<>(); + namespaces.add("default"); + tableCfs.put(TABLE_A, null); + builder.setReplicateAllUserTables(false); + builder.setTableCFsMap(tableCfs); + builder.setNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + + namespaces = new HashSet<>(); + tableCfs = new HashMap<>(); + namespaces.add("replication"); + tableCfs.put(TABLE_B, null); + builder.setReplicateAllUserTables(false); + builder.setTableCFsMap(tableCfs); + builder.setNamespaces(namespaces); + peerConfig = builder.build(); + Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index b2b87a4..7cfb9d4 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -135,44 +135,6 @@ public final class ReplicationUtils { } /** - * Returns whether we should replicate the given table. - */ - public static boolean contains(ReplicationPeerConfig peerConfig, TableName tableName) { - String namespace = tableName.getNamespaceAsString(); - if (peerConfig.replicateAllUserTables()) { - // replicate all user tables, but filter by exclude namespaces and table-cfs config - Set excludeNamespaces = peerConfig.getExcludeNamespaces(); - if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) { - return false; - } - Map> excludedTableCFs = peerConfig.getExcludeTableCFsMap(); - // trap here, must check existence first since HashMap allows null value. - if (excludedTableCFs == null || !excludedTableCFs.containsKey(tableName)) { - return true; - } - List cfs = excludedTableCFs.get(tableName); - // if cfs is null or empty then we can make sure that we do not need to replicate this table, - // otherwise, we may still need to replicate the table but filter out some families. - return cfs != null && !cfs.isEmpty(); - } else { - // Not replicate all user tables, so filter by namespaces and table-cfs config - Set namespaces = peerConfig.getNamespaces(); - Map> tableCFs = peerConfig.getTableCFsMap(); - - if (namespaces == null && tableCFs == null) { - return false; - } - - // First filter by namespaces config - // If table's namespace in peer config, all the tables data are applicable for replication - if (namespaces != null && namespaces.contains(namespace)) { - return true; - } - return tableCFs != null && tableCFs.containsKey(tableName); - } - } - - /** * Get the adaptive timeout value when performing a retry */ public static int getAdaptiveTimeout(final int initialValue, final int retries) { diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationUtil.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationUtil.java deleted file mode 100644 index f8543fe..0000000 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationUtil.java +++ /dev/null @@ -1,235 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, SmallTests.class }) -public class TestReplicationUtil { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationUtil.class); - - private static TableName TABLE_A = TableName.valueOf("replication", "testA"); - private static TableName TABLE_B = TableName.valueOf("replication", "testB"); - - @Test - public void testContainsWithReplicatingAll() { - ReplicationPeerConfig peerConfig; - ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder = - new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl(); - Map> tableCfs = new HashMap<>(); - Set namespaces = new HashSet<>(); - - // 1. replication_all flag is true, no namespaces and table-cfs config - builder.setReplicateAllUserTables(true); - peerConfig = builder.build(); - Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // 2. replicate_all flag is true, and config in excludedTableCfs - builder.setExcludeNamespaces(null); - // empty map - tableCfs = new HashMap<>(); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // table testB - tableCfs = new HashMap<>(); - tableCfs.put(TABLE_B, null); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // table testA - tableCfs = new HashMap<>(); - tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // 3. replicate_all flag is true, and config in excludeNamespaces - builder.setExcludeTableCFsMap(null); - // empty set - namespaces = new HashSet<>(); - builder.setReplicateAllUserTables(true); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // namespace default - namespaces = new HashSet<>(); - namespaces.add("default"); - builder.setReplicateAllUserTables(true); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // namespace replication - namespaces = new HashSet<>(); - namespaces.add("replication"); - builder.setReplicateAllUserTables(true); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both - // Namespaces config doesn't conflict with table-cfs config - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("replication"); - tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // Namespaces config conflicts with table-cfs config - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("default"); - tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A)); - - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("replication"); - tableCfs.put(TABLE_B, null); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A)); - - } - - @Test - public void testContainsWithoutReplicatingAll() { - ReplicationPeerConfig peerConfig; - ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder = - new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl(); - Map> tableCfs = new HashMap<>(); - Set namespaces = new HashSet<>(); - - // 1. replication_all flag is false, no namespaces and table-cfs config - builder.setReplicateAllUserTables(false); - peerConfig = builder.build(); - Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // 2. replicate_all flag is false, and only config table-cfs in peer - // empty map - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // table testB - tableCfs = new HashMap<>(); - tableCfs.put(TABLE_B, null); - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // table testA - tableCfs = new HashMap<>(); - tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // 3. replication_all flag is false, and only config namespace in peer - builder.setTableCFsMap(null); - // empty set - builder.setReplicateAllUserTables(false); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // namespace default - namespaces = new HashSet<>(); - namespaces.add("default"); - builder.setReplicateAllUserTables(false); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // namespace replication - namespaces = new HashSet<>(); - namespaces.add("replication"); - builder.setReplicateAllUserTables(false); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // 4. replicate_all flag is false, and config namespaces and table-cfs both - // Namespaces config doesn't conflict with table-cfs config - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("replication"); - tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A)); - - // Namespaces config conflicts with table-cfs config - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("default"); - tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A)); - - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("replication"); - tableCfs.put(TABLE_B, null); - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A)); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 0a6eb2a..fc792cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.yetus.audience.InterfaceAudience; @@ -168,11 +167,11 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure getSerialPeerIdsBelongsTo(TableName tableName) { return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) - .filter(p -> ReplicationUtils.contains(p.getPeerConfig(), tableName)).map(p -> p.getPeerId()) + .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) .collect(Collectors.toList()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java index 41e740f..188921a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -128,15 +128,15 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { continue; } TableName tn = td.getTableName(); - if (ReplicationUtils.contains(oldPeerConfig, tn)) { - if (!ReplicationUtils.contains(peerConfig, tn)) { + if (oldPeerConfig.needToReplicate(tn)) { + if (!peerConfig.needToReplicate(tn)) { // removed from peer config for (String encodedRegionName : MetaTableAccessor .getTableEncodedRegionNamesForSerialReplication(conn, tn)) { addToList(encodedRegionNames, encodedRegionName, queueStorage); } } - } else if (ReplicationUtils.contains(peerConfig, tn)) { + } else if (peerConfig.needToReplicate(tn)) { // newly added to peer config setLastPushedSequenceIdForTable(env, tn, lastSeqIds); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java index 3a3200a..58705f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java @@ -52,7 +52,7 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi @Override public Entry filter(Entry entry) { - if (ReplicationUtils.contains(this.peer.getPeerConfig(), entry.getKey().getTableName())) { + if (this.peer.getPeerConfig().needToReplicate(entry.getKey().getTableName())) { return entry; } else { return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 910f523..204b399 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -116,7 +116,6 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; @@ -3926,8 +3925,8 @@ public class HBaseFsck extends Configured implements Closeable { List peerDescriptions = admin.listReplicationPeers(); if (peerDescriptions != null && peerDescriptions.size() > 0) { List peers = peerDescriptions.stream() - .filter(peerConfig -> ReplicationUtils.contains(peerConfig.getPeerConfig(), - cleanReplicationBarrierTable)) + .filter(peerConfig -> peerConfig.getPeerConfig() + .needToReplicate(cleanReplicationBarrierTable)) .map(peerConfig -> peerConfig.getPeerId()).collect(Collectors.toList()); try { List batch = new ArrayList<>(); @@ -3996,4 +3995,4 @@ public class HBaseFsck extends Configured implements Closeable { } } } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index f2c5e50..b0c2aa7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -213,13 +213,11 @@ public class TestReplicationWALEntryFilters { @Test public void testNamespaceTableCfWALEntryFilter() { ReplicationPeer peer = mock(ReplicationPeer.class); - ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); + ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder(); // 1. replicate_all flag is false, no namespaces and table-cfs config - when(peerConfig.replicateAllUserTables()).thenReturn(false); - when(peerConfig.getNamespaces()).thenReturn(null); - when(peerConfig.getTableCFsMap()).thenReturn(null); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(null).setTableCFsMap(null); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); Entry userEntry = createEntry(null, a, b, c); ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); @@ -229,9 +227,8 @@ public class TestReplicationWALEntryFilters { // empty map userEntry = createEntry(null, a, b, c); Map> tableCfs = new HashMap<>(); - when(peerConfig.replicateAllUserTables()).thenReturn(false); - when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); @@ -239,9 +236,8 @@ public class TestReplicationWALEntryFilters { userEntry = createEntry(null, a, b, c); tableCfs = new HashMap<>(); tableCfs.put(TableName.valueOf("bar"), null); - when(peerConfig.replicateAllUserTables()).thenReturn(false); - when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); @@ -249,9 +245,8 @@ public class TestReplicationWALEntryFilters { userEntry = createEntry(null, a, b, c); tableCfs = new HashMap<>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); - when(peerConfig.replicateAllUserTables()).thenReturn(false); - when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a), filter.filter(userEntry)); @@ -259,9 +254,8 @@ public class TestReplicationWALEntryFilters { userEntry = createEntry(null, a, b, c, d); tableCfs = new HashMap<>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); - when(peerConfig.replicateAllUserTables()).thenReturn(false); - when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a,c), filter.filter(userEntry)); @@ -269,19 +263,17 @@ public class TestReplicationWALEntryFilters { when(peer.getTableCFs()).thenReturn(null); // empty set Set namespaces = new HashSet<>(); - when(peerConfig.replicateAllUserTables()).thenReturn(false); - when(peerConfig.getNamespaces()).thenReturn(namespaces); - when(peerConfig.getTableCFsMap()).thenReturn(null); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) + .setTableCFsMap(null); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); // namespace default namespaces.add("default"); - when(peerConfig.replicateAllUserTables()).thenReturn(false); - when(peerConfig.getNamespaces()).thenReturn(namespaces); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); @@ -289,9 +281,8 @@ public class TestReplicationWALEntryFilters { // namespace ns1 namespaces = new HashSet<>(); namespaces.add("ns1"); - when(peerConfig.replicateAllUserTables()).thenReturn(false); - when(peerConfig.getNamespaces()).thenReturn(namespaces); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); @@ -302,10 +293,9 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap<>(); namespaces.add("ns1"); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); - when(peerConfig.replicateAllUserTables()).thenReturn(false); - when(peerConfig.getNamespaces()).thenReturn(namespaces); - when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) + .setTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a, c), filter.filter(userEntry)); @@ -314,10 +304,9 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap<>(); namespaces.add("default"); tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c")); - when(peerConfig.replicateAllUserTables()).thenReturn(false); - when(peerConfig.getNamespaces()).thenReturn(namespaces); - when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) + .setTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); @@ -326,10 +315,9 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap<>(); namespaces.add("ns1"); tableCfs.put(TableName.valueOf("bar"), null); - when(peerConfig.replicateAllUserTables()).thenReturn(false); - when(peerConfig.getNamespaces()).thenReturn(namespaces); - when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) + .setTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); @@ -338,14 +326,14 @@ public class TestReplicationWALEntryFilters { @Test public void testNamespaceTableCfWALEntryFilter2() { ReplicationPeer peer = mock(ReplicationPeer.class); - ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); + ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder(); // 1. replicate_all flag is true // and no exclude namespaces and no exclude table-cfs config - when(peerConfig.replicateAllUserTables()).thenReturn(true); - when(peerConfig.getExcludeNamespaces()).thenReturn(null); - when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setReplicateAllUserTables(true) + .setExcludeNamespaces(null) + .setExcludeTableCFsMap(null); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); Entry userEntry = createEntry(null, a, b, c); ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); @@ -354,18 +342,16 @@ public class TestReplicationWALEntryFilters { // 2. replicate_all flag is true, and only config exclude namespaces // empty set Set namespaces = new HashSet(); - when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); - when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); // exclude namespace default namespaces.add("default"); - when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); - when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); @@ -373,9 +359,8 @@ public class TestReplicationWALEntryFilters { // exclude namespace ns1 namespaces = new HashSet(); namespaces.add("ns1"); - when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); - when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); @@ -383,9 +368,8 @@ public class TestReplicationWALEntryFilters { // 3. replicate_all flag is true, and only config exclude table-cfs // empty table-cfs map Map> tableCfs = new HashMap>(); - when(peerConfig.getExcludeNamespaces()).thenReturn(null); - when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); @@ -393,9 +377,8 @@ public class TestReplicationWALEntryFilters { // exclude table bar tableCfs = new HashMap>(); tableCfs.put(TableName.valueOf("bar"), null); - when(peerConfig.getExcludeNamespaces()).thenReturn(null); - when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); @@ -403,9 +386,8 @@ public class TestReplicationWALEntryFilters { // exclude table foo:a tableCfs = new HashMap>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); - when(peerConfig.getExcludeNamespaces()).thenReturn(null); - when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, b, c), filter.filter(userEntry)); @@ -416,9 +398,8 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap>(); namespaces.add("ns1"); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); - when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); - when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, b), filter.filter(userEntry)); @@ -428,9 +409,8 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap>(); namespaces.add("default"); tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList()); - when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); - when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); - when(peer.getPeerConfig()).thenReturn(peerConfig); + peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); -- 2.7.4