diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index f569e47bc8..87b4dcd998 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
public final class ReplicationPeerConfigUtil {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
+ public static final String HBASE_REPLICATION_PEER_DEFAULT_CONFIG= "hbase.replication.peer.default.config";
private ReplicationPeerConfigUtil() {}
@@ -450,6 +451,45 @@ public final class ReplicationPeerConfigUtil {
return builder.build();
}
+ /**
+ Sample Configuration
+
+ hbase.replication.peer.default.configs
+ hbase.replication.source.custom.walentryfilters=x,y,z;hbase.xxx.custom_property=123
+
+ */
+
+ /**
+ * Helper method to add default peer configs from HBase Configuration to ReplicationPeerConfig
+ * @param conf Configuration
+ * @return true if new configurations was added.
+ */
+ public static ReplicationPeerConfig addDefaultPeerConfigsIfNotPresent(Configuration conf, ReplicationPeerConfig receivedPeerConfig){
+
+ ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig.newBuilder(receivedPeerConfig);
+ String defaultPeerConfigs = conf.get(HBASE_REPLICATION_PEER_DEFAULT_CONFIG);
+
+ Map peerConfigurations = receivedPeerConfig.getConfiguration();
+
+ if(defaultPeerConfigs != null && defaultPeerConfigs.length() != 0){
+ String[] defaultPeerConfigList = defaultPeerConfigs.split(";");
+
+ for(String defaultPeerConfig : defaultPeerConfigList){
+ String[] configSplit = defaultPeerConfig.split("=");
+ if(configSplit != null && configSplit.length == 2){
+ String configName = configSplit[0];
+ String configValue = configSplit[1];
+
+ // Only override if default property does not exist in existing peer configs or its value is different.
+ if(!peerConfigurations.containsKey(configName) || !peerConfigurations.get(configName).equalsIgnoreCase(configValue)){
+ copiedPeerConfigBuilder.putConfiguration(configName,configValue);
+ }
+ }
+ }
+ }
+ return copiedPeerConfigBuilder.build();
+ }
+
public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
Map> excludeTableCfs, ReplicationPeerConfig peerConfig)
throws ReplicationException {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 3cc3e26480..aba703ccde 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@@ -420,43 +419,4 @@ public class ReplicationPeerConfig {
return tableCFsMap != null && tableCFsMap.containsKey(table);
}
}
-
- /**
- Sample Configuration
-
- hbase.replication.peer.default.configs
- hbase.replication.source.custom.walentryfilters=x,y,z;hbase.xxx.custom_property=123
-
- */
-
- /**
- * Helper method to add default peer configs from HBase Configuration to ReplicationPeerConfig
- * @param conf Configuration
- * @return true if new configurations was added.
- */
- public boolean addDefaultPeerConfigsIfNotPresent(Configuration conf){
-
- boolean isNewConfAdded = false;
- String defaultPeerConfigs = conf.get("hbase.replication.peer.default.config");
- Map peerConfigurations = getConfiguration();
-
- if(defaultPeerConfigs != null && defaultPeerConfigs.length() != 0){
- String[] defaultPeerConfigList = defaultPeerConfigs.split(";");
-
- for(String defaultPeerConfig : defaultPeerConfigList){
- String[] configSplit = defaultPeerConfig.split("=");
- if(configSplit != null && configSplit.length == 2){
- String configName = configSplit[0];
- String configValue = configSplit[1];
-
- // Only override if default property does not exist in existing peer configs or its value is different.
- if(!peerConfigurations.containsKey(configName) || !peerConfigurations.get(configName).equalsIgnoreCase(configValue)){
- peerConfigurations.put(configName,configValue);
- isNewConfAdded = true;
- }
- }
- }
- }
- return isNewConfAdded;
- }
}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 9b440b9ce4..bc89bd8f54 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -23,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@@ -145,11 +146,10 @@ public class ReplicationPeers {
SyncReplicationState newSyncReplicationState =
peerStorage.getPeerNewSyncReplicationState(peerId);
- // Only update peer configs when we have extra received from default.
- if(peerConfig.addDefaultPeerConfigsIfNotPresent(this.conf)){
- peerStorage.updatePeerConfig(peerId,peerConfig);
- }
+ ReplicationPeerConfig updatedPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(this.conf, peerConfig);
+ peerStorage.updatePeerConfig(peerId,updatedPeerConfig);
+
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
- peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);
+ peerId, updatedPeerConfig, enabled, syncReplicationState, newSyncReplicationState);
}
}
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
index 0e7cd74048..0b0a530300 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -34,9 +34,12 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -215,4 +218,40 @@ public class TestZKReplicationPeerStorage {
assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
STORAGE.getNewSyncReplicationStateNode(peerId)));
}
+
+ @Test
+ public void testDefaultReplicationPeerConfigIsAppliedIfNotAlreadySet(){
+ String customPeerConfigKey = "hbase.xxx.custom_config";
+ String customPeerConfigValue = "test";
+
+ ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
+
+ // custom config not present
+ assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
+
+ Configuration conf = UTIL.getConfiguration();
+ conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_DEFAULT_CONFIG,
+ customPeerConfigKey.concat("=").concat(customPeerConfigValue));
+
+ ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(conf,existingReplicationPeerConfig);
+ assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
+ }
+
+ @Test
+ public void testDefaultReplicationPeerConfigOverrideIfAlreadySet(){
+
+ String customPeerConfigKey = "hbase.xxx.custom_config";
+ String customPeerConfigValue = "test";
+ String customPeerConfigUpdatedValue = "test_updated";
+
+ ReplicationPeerConfig existingReplicationPeerConfig = ReplicationPeerConfig.newBuilder(getConfig(1))
+ .putConfiguration(customPeerConfigKey,customPeerConfigValue).build();
+
+ Configuration conf = UTIL.getConfiguration();
+ conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_DEFAULT_CONFIG,
+ customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue));
+
+ ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(conf,existingReplicationPeerConfig);
+ assertEquals(customPeerConfigUpdatedValue, updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 4a38a22d2c..2193c51dc0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -119,7 +120,8 @@ public class ReplicationPeerManager {
if (peerId.contains("-")) {
throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
}
- peerConfig.addDefaultPeerConfigsIfNotPresent(conf);
+ ReplicationPeerConfig updatedPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(conf,peerConfig);
+ peerConfig = updatedPeerConfig;
checkPeerConfig(peerConfig);
if (peerConfig.isSyncReplication()) {
checkSyncReplicationPeerConfigConflict(peerConfig);