From be89caab77c678e77263e831b61287f65f39135a Mon Sep 17 00:00:00 2001 From: Geoffrey Date: Wed, 6 Apr 2016 21:31:55 -0700 Subject: [PATCH] HBASE-15507 - Added update_peer_config to the HBase shell and ReplicationAdmin, and provided a callback for custom replication endpoints to be notified of changes to their configuration and peer data --- .../hbase/client/replication/ReplicationAdmin.java | 11 +++- .../hadoop/hbase/replication/ReplicationPeer.java | 2 + .../hbase/replication/ReplicationPeerZKImpl.java | 25 ++++++++-- .../hadoop/hbase/replication/ReplicationPeers.java | 2 + .../hbase/replication/ReplicationPeersZKImpl.java | 32 ++++++++++++ .../hbase/replication/BaseReplicationEndpoint.java | 22 ++++++++ .../hbase/replication/ReplicationEndpoint.java | 7 +-- .../regionserver/ReplicationSourceManager.java | 2 +- .../visibility/VisibilityReplicationEndpoint.java | 6 +++ .../TestReplicationAdminWithClusters.java | 58 ++++++++++++++++++++++ .../src/main/ruby/hbase/replication_admin.rb | 29 ++++++++++- hbase-shell/src/main/ruby/shell.rb | 1 + .../src/test/ruby/hbase/replication_admin_test.rb | 25 ++++++++++ 13 files changed, 210 insertions(+), 12 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 8ee3a22..43460e9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeer; @@ -201,7 +200,10 @@ public class ReplicationAdmin implements Closeable { public static Map> parseTableCFsFromConfig(String tableCFsConfig) { return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig); } - + + public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { + this.replicationPeers.updatePeerConfig(id, peerConfig); + } /** * Removes a peer cluster and stops the replication to it. * @param id a short name that identifies the cluster @@ -550,6 +552,11 @@ public class ReplicationAdmin implements Closeable { } @VisibleForTesting + public void peerAdded(String id) throws ReplicationException { + this.replicationPeers.peerAdded(id); + } + + @VisibleForTesting List listReplicationPeers() { Map peers = listPeerConfigs(); if (peers == null || peers.size() <= 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 920eea6..3da01fe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -71,4 +71,6 @@ public interface ReplicationPeer { */ public Map> getTableCFs(); + void trackPeerConfigChanges(ReplicationPeerConfigListener listener); + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index f7a2411..5054675 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -109,7 +109,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase this.readPeerConfig(); } - private void readPeerConfig() { + private ReplicationPeerConfig readPeerConfig() { try { byte[] data = peerConfigTracker.getData(false); if (data != null) { @@ -118,6 +118,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase } catch (DeserializationException e) { LOG.error("", e); } + return this.peerConfig; } @Override @@ -163,6 +164,13 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase } @Override + public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { + if (this.peerConfigTracker != null){ + this.peerConfigTracker.setListener(listener); + } + } + + @Override public void abort(String why, Throwable e) { LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig + " was aborted for the following reason(s):" + why, e); @@ -260,16 +268,25 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase */ public class PeerConfigTracker extends ZooKeeperNodeTracker { + ReplicationPeerConfigListener listener; + public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher, Abortable abortable) { super(watcher, peerConfigNode, abortable); } - + + public void setListener(ReplicationPeerConfigListener listener){ + this.listener = listener; + } + @Override public synchronized void nodeCreated(String path) { if (path.equals(node)) { super.nodeCreated(path); - readPeerConfig(); + ReplicationPeerConfig config = readPeerConfig(); + if (listener != null){ + listener.peerConfigUpdated(config); + } } } @@ -278,6 +295,8 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase if (path.equals(node)) { super.nodeDataChanged(path); } + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 1961a65..9f70d95 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -151,4 +151,6 @@ public interface ReplicationPeers { * @return the configuration for the peer cluster, null if it was unable to get the configuration */ Pair getPeerConf(String peerId) throws ReplicationException; + + void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 367c688..009f38f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -351,6 +351,38 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return new Pair(peerConfig, otherConf); } + @Override + public void updatePeerConfig(String id, ReplicationPeerConfig newConfig) throws ReplicationException { + ReplicationPeer peer = getPeer(id); + if (peer == null){ + throw new ReplicationException("Could not find peer Id " + id); + } + ReplicationPeerConfig existingConfig = peer.getPeerConfig(); + if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() && + !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){ + throw new ReplicationException("Changing the cluster key on an existing peer is not allowed. " + + "Existing key '" + existingConfig.getClusterKey() + "' does not match new key '" + newConfig.getClusterKey() + + "'"); + } + if (newConfig.getReplicationEndpointImpl() != null && !newConfig.getReplicationEndpointImpl().isEmpty() && + !newConfig.getReplicationEndpointImpl().equals(existingConfig.getReplicationEndpointImpl())){ + throw new ReplicationException("Changing the replication endpoint implementation class on an existing peer" + + "is not allowed. Existing class '" + existingConfig.getReplicationEndpointImpl() + "' does not match" + + " new class '" + newConfig.getReplicationEndpointImpl() + "'"); + } + //Update existingConfig's peer config and peer data with the new values, but do not touch config or data that weren't + //explicitly changed + existingConfig.getConfiguration().putAll(newConfig.getConfiguration()); + existingConfig.getPeerData().putAll(newConfig.getPeerData()); + + try { + ZKUtil.setData(this.zookeeper, getPeerNode(id), ReplicationSerDeHelper.toByteArray(existingConfig)); + } + catch(KeeperException ke){ + throw new ReplicationException("There was a problem trying to save changes to the replication peer " + id, ke); + } + } + /** * List all registered peer clusters and set a watch on their znodes. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 67051ab..d667269 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -21,10 +21,13 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractService; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; /** * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this @@ -35,11 +38,30 @@ import com.google.common.util.concurrent.AbstractService; public abstract class BaseReplicationEndpoint extends AbstractService implements ReplicationEndpoint { + private static final Log LOG = LogFactory.getLog(BaseReplicationEndpoint.class); protected Context ctx; @Override public void init(Context context) throws IOException { this.ctx = context; + + if (this.ctx != null){ + ReplicationPeer peer = this.ctx.getReplicationPeer(); + if (peer != null){ + peer.trackPeerConfigChanges(this); + } else { + LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() + + " because there's no such peer"); + } + } + } + + @Override + /** + * No-op implementation for subclasses to override if they wish to execute logic if their config changes + */ + public void peerConfigUpdated(ReplicationPeerConfig rpc){ + } /** Returns a default set of filters */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index ac1257f..c92b53d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -46,14 +46,13 @@ import com.google.common.util.concurrent.Service; * and persisting of the WAL entries in the other cluster. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) -public interface ReplicationEndpoint extends Service { +public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListener { @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) class Context { private final Configuration conf; private final FileSystem fs; private final TableDescriptors tableDescriptors; - private final ReplicationPeerConfig peerConfig; private final ReplicationPeer replicationPeer; private final String peerId; private final UUID clusterId; @@ -63,13 +62,11 @@ public interface ReplicationEndpoint extends Service { public Context( final Configuration conf, final FileSystem fs, - final ReplicationPeerConfig peerConfig, final String peerId, final UUID clusterId, final ReplicationPeer replicationPeer, final MetricsSource metrics, final TableDescriptors tableDescriptors) { - this.peerConfig = peerConfig; this.conf = conf; this.fs = fs; this.clusterId = clusterId; @@ -91,7 +88,7 @@ public interface ReplicationEndpoint extends Service { return peerId; } public ReplicationPeerConfig getPeerConfig() { - return peerConfig; + return replicationPeer.getPeerConfig(); } public ReplicationPeer getReplicationPeer() { return replicationPeer; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9ff4b2d..83e0205 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -494,7 +494,7 @@ public class ReplicationSourceManager implements ReplicationListener { // init replication endpoint replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), - fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors)); + fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors)); return src; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java index 3db54c6..2ac515a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -57,6 +58,11 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { } @Override + public void peerConfigUpdated(ReplicationPeerConfig rpc){ + + } + + @Override public boolean replicate(ReplicateContext replicateContext) { if (!delegator.canReplicateToSameCluster()) { // Only when the replication is inter cluster replication we need to diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index e7bd72c..a56276d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -18,6 +18,7 @@ import static org.junit.Assert.fail; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -27,6 +28,8 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -195,4 +198,59 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { adminExt.disableTableRep(tableName); } } + + @Test(timeout=300000) + public void testReplicationPeerConfigUpdateCallback() throws Exception { + String peerId = "1"; + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName()); + rpc.getConfiguration().put("key1", "value1"); + + admin.addPeer(peerId, rpc); + admin.peerAdded(peerId); + + rpc.getConfiguration().put("key1", "value2"); + admin.updatePeerConfig(peerId, rpc); + if (!TestUpdatableReplicationEndpoint.hasCalledBack()) { + synchronized(TestUpdatableReplicationEndpoint.class) { + TestUpdatableReplicationEndpoint.class.wait(2000L); + } + } + + assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack()); + } + + public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint { + private static boolean calledBack = false; + public static boolean hasCalledBack(){ + return calledBack; + } + @Override + public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc){ + calledBack = true; + notifyAll(); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + + + @Override + public UUID getPeerUUID() { + return UUID.randomUUID(); + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + return false; + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index f441a99..e91a4f7 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -92,8 +92,9 @@ module Hbase table_cfs.each{|key, val| map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) } + replication_peer_config.set_table_cfs_map(map) end - @replication_admin.add_peer(id, replication_peer_config, map) + @replication_admin.add_peer(id, replication_peer_config) else raise(ArgumentError, "args must be a Hash") end @@ -202,5 +203,31 @@ module Hbase def get_peer_config(id) @replication_admin.get_peer_config(id) end + + def peer_added(id) + @replication_admin.peer_added(id) + end + + def update_peer_config(id, args={}) + # Optional parameters + config = args.fetch(CONFIG, nil) + data = args.fetch(DATA, nil) + + # Create and populate a ReplicationPeerConfig + replication_peer_config = ReplicationPeerConfig.new + unless config.nil? + replication_peer_config.get_configuration.put_all(config) + end + + unless data.nil? + # Convert Strings to Bytes for peer_data + peer_data = replication_peer_config.get_peer_data + data.each{|key, val| + peer_data.put(Bytes.to_bytes(key), Bytes.to_bytes(val)) + } + end + + @replication_admin.update_peer_config(id, replication_peer_config) + end end end diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index e5c9a31..adcd8f2 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -365,6 +365,7 @@ Shell.load_command_group( disable_table_replication get_peer_config list_peer_configs + update_peer_config ] ) diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 8f08dc0..7efdf82 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -200,6 +200,31 @@ module Hbase replication_admin.remove_peer(peer_id_second) end + define_test "update_peer: can update peer config and data" do + repl_impl = "org.apache.hadoop.hbase.replication.ReplicationEndpointForTest" + config_params = { "config1" => "value1", "config2" => "value2" } + data_params = {"data1" => "value1", "data2" => "value2"} + args = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params, DATA => data_params} + replication_admin.add_peer(@peer_id, args) + + #Normally the ReplicationSourceManager will call ReplicationPeer#peer_added, but here we have to do it ourselves + replication_admin.peer_added(@peer_id) + + new_config_params = { "config1" => "new_value1" } + new_data_params = {"data1" => "new_value1"} + new_args = {CONFIG => new_config_params, DATA => new_data_params} + replication_admin.update_peer_config(@peer_id, new_args) + + #Make sure the updated key/value pairs in config and data were successfully updated, and that those we didn't + #update are still there and unchanged + peer_config = replication_admin.get_peer_config(@peer_id) + replication_admin.remove_peer(@peer_id) + assert_equal("new_value1", peer_config.get_configuration.get("config1")) + assert_equal("value2", peer_config.get_configuration.get("config2")) + assert_equal("new_value1", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data1")))) + assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2")))) + + end # assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279 # Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below. # define_test "add_peer: adding a second peer with same id should error" do -- 2.5.4 (Apple Git-61)