diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index f569e47bc8..87b4dcd998 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; public final class ReplicationPeerConfigUtil { private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class); + public static final String HBASE_REPLICATION_PEER_DEFAULT_CONFIG= "hbase.replication.peer.default.config"; private ReplicationPeerConfigUtil() {} @@ -450,6 +451,45 @@ public final class ReplicationPeerConfigUtil { return builder.build(); } + /** + Sample Configuration + + hbase.replication.peer.default.configs + hbase.replication.source.custom.walentryfilters=x,y,z;hbase.xxx.custom_property=123 + + */ + + /** + * Helper method to add default peer configs from HBase Configuration to ReplicationPeerConfig + * @param conf Configuration + * @return true if new configurations was added. + */ + public static ReplicationPeerConfig addDefaultPeerConfigsIfNotPresent(Configuration conf, ReplicationPeerConfig receivedPeerConfig){ + + ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig.newBuilder(receivedPeerConfig); + String defaultPeerConfigs = conf.get(HBASE_REPLICATION_PEER_DEFAULT_CONFIG); + + Map peerConfigurations = receivedPeerConfig.getConfiguration(); + + if(defaultPeerConfigs != null && defaultPeerConfigs.length() != 0){ + String[] defaultPeerConfigList = defaultPeerConfigs.split(";"); + + for(String defaultPeerConfig : defaultPeerConfigList){ + String[] configSplit = defaultPeerConfig.split("="); + if(configSplit != null && configSplit.length == 2){ + String configName = configSplit[0]; + String configValue = configSplit[1]; + + // Only override if default property does not exist in existing peer configs or its value is different. + if(!peerConfigurations.containsKey(configName) || !peerConfigurations.get(configName).equalsIgnoreCase(configValue)){ + copiedPeerConfigBuilder.putConfiguration(configName,configValue); + } + } + } + } + return copiedPeerConfigBuilder.build(); + } + public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig( Map> excludeTableCfs, ReplicationPeerConfig peerConfig) throws ReplicationException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 3cc3e26480..aba703ccde 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -420,43 +419,4 @@ public class ReplicationPeerConfig { return tableCFsMap != null && tableCFsMap.containsKey(table); } } - - /** - Sample Configuration - - hbase.replication.peer.default.configs - hbase.replication.source.custom.walentryfilters=x,y,z;hbase.xxx.custom_property=123 - - */ - - /** - * Helper method to add default peer configs from HBase Configuration to ReplicationPeerConfig - * @param conf Configuration - * @return true if new configurations was added. - */ - public boolean addDefaultPeerConfigsIfNotPresent(Configuration conf){ - - boolean isNewConfAdded = false; - String defaultPeerConfigs = conf.get("hbase.replication.peer.default.config"); - Map peerConfigurations = getConfiguration(); - - if(defaultPeerConfigs != null && defaultPeerConfigs.length() != 0){ - String[] defaultPeerConfigList = defaultPeerConfigs.split(";"); - - for(String defaultPeerConfig : defaultPeerConfigList){ - String[] configSplit = defaultPeerConfig.split("="); - if(configSplit != null && configSplit.length == 2){ - String configName = configSplit[0]; - String configValue = configSplit[1]; - - // Only override if default property does not exist in existing peer configs or its value is different. - if(!peerConfigurations.containsKey(configName) || !peerConfigurations.get(configName).equalsIgnoreCase(configValue)){ - peerConfigurations.put(configName,configValue); - isNewConfAdded = true; - } - } - } - } - return isNewConfAdded; - } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 9b440b9ce4..bc89bd8f54 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -145,11 +146,10 @@ public class ReplicationPeers { SyncReplicationState newSyncReplicationState = peerStorage.getPeerNewSyncReplicationState(peerId); - // Only update peer configs when we have extra received from default. - if(peerConfig.addDefaultPeerConfigsIfNotPresent(this.conf)){ - peerStorage.updatePeerConfig(peerId,peerConfig); - } + ReplicationPeerConfig updatedPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(this.conf, peerConfig); + peerStorage.updatePeerConfig(peerId,updatedPeerConfig); + return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), - peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState); + peerId, updatedPeerConfig, enabled, syncReplicationState, newSyncReplicationState); } } diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java index 0e7cd74048..0b0a530300 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java @@ -34,9 +34,12 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.stream.Stream; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -215,4 +218,40 @@ public class TestZKReplicationPeerStorage { assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), STORAGE.getNewSyncReplicationStateNode(peerId))); } + + @Test + public void testDefaultReplicationPeerConfigIsAppliedIfNotAlreadySet(){ + String customPeerConfigKey = "hbase.xxx.custom_config"; + String customPeerConfigValue = "test"; + + ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); + + // custom config not present + assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); + + Configuration conf = UTIL.getConfiguration(); + conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_DEFAULT_CONFIG, + customPeerConfigKey.concat("=").concat(customPeerConfigValue)); + + ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(conf,existingReplicationPeerConfig); + assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); + } + + @Test + public void testDefaultReplicationPeerConfigOverrideIfAlreadySet(){ + + String customPeerConfigKey = "hbase.xxx.custom_config"; + String customPeerConfigValue = "test"; + String customPeerConfigUpdatedValue = "test_updated"; + + ReplicationPeerConfig existingReplicationPeerConfig = ReplicationPeerConfig.newBuilder(getConfig(1)) + .putConfiguration(customPeerConfigKey,customPeerConfigValue).build(); + + Configuration conf = UTIL.getConfiguration(); + conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_DEFAULT_CONFIG, + customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue)); + + ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(conf,existingReplicationPeerConfig); + assertEquals(customPeerConfigUpdatedValue, updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 4a38a22d2c..2193c51dc0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -119,7 +120,8 @@ public class ReplicationPeerManager { if (peerId.contains("-")) { throw new DoNotRetryIOException("Found invalid peer name: " + peerId); } - peerConfig.addDefaultPeerConfigsIfNotPresent(conf); + ReplicationPeerConfig updatedPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(conf,peerConfig); + peerConfig = updatedPeerConfig; checkPeerConfig(peerConfig); if (peerConfig.isSyncReplication()) { checkSyncReplicationPeerConfigConflict(peerConfig);