From 49df31220cae80e0f5d94fba75e9c8fa9197f996 Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Mon, 11 Apr 2016 21:36:52 -0700 Subject: [PATCH] HBASE-15633 - Backport of update_peer_config to branch-1 --- .../hbase/client/replication/ReplicationAdmin.java | 10 ++ .../hadoop/hbase/replication/ReplicationPeer.java | 5 + .../replication/ReplicationPeerConfigListener.java | 33 +++++ .../hbase/replication/ReplicationPeerZKImpl.java | 71 ++++++++++- .../hadoop/hbase/replication/ReplicationPeers.java | 8 ++ .../hbase/replication/ReplicationPeersZKImpl.java | 133 +++++++-------------- .../hbase/replication/ReplicationSerDeHelper.java | 124 +++++++++++++++++++ .../hbase/replication/ReplicationStateZKBase.java | 4 + .../hbase/replication/BaseReplicationEndpoint.java | 21 ++++ .../hbase/replication/ReplicationEndpoint.java | 7 +- .../regionserver/ReplicationSourceManager.java | 2 +- .../visibility/VisibilityReplicationEndpoint.java | 5 + .../TestReplicationAdminWithClusters.java | 54 +++++++++ .../src/main/ruby/hbase/replication_admin.rb | 27 +++++ hbase-shell/src/main/ruby/shell.rb | 1 + .../main/ruby/shell/commands/update_peer_config.rb | 49 ++++++++ .../src/test/ruby/hbase/replication_admin_test.rb | 28 ++++- 17 files changed, 483 insertions(+), 99 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java create mode 100644 hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 490a735..1305002 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -200,6 +200,11 @@ public class ReplicationAdmin implements Closeable { this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs)); } + public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) + throws ReplicationException { + this.replicationPeers.updatePeerConfig(id, peerConfig); + } + public static Map> parseTableCFsFromConfig(String tableCFsConfig) { if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { return null; @@ -632,6 +637,11 @@ public class ReplicationAdmin implements Closeable { } @VisibleForTesting + public void peerAdded(String id) throws ReplicationException { + this.replicationPeers.peerAdded(id); + } + + @VisibleForTesting List listReplicationPeers() { Map peers = listPeerConfigs(); if (peers == null || peers.size() <= 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 6925778..9a6af4f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -70,4 +70,9 @@ public interface ReplicationPeer { */ public Map> getTableCFs(); + /** + * Setup a callback for chanages to the replication peer config + * @param listener Listener for config changes, usually a replication endpoint + */ + void trackPeerConfigChanges(ReplicationPeerConfigListener listener); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java new file mode 100644 index 0000000..b10862b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public interface ReplicationPeerConfigListener { +/** Callback method for when users update the ReplicationPeerConfig for this peer + * + * @param rpc The updated ReplicationPeerConfig + */ + void peerConfigUpdated(ReplicationPeerConfig rpc); + +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index a0d7b5f..6b10015 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -45,7 +45,7 @@ import org.apache.zookeeper.KeeperException.NodeExistsException; public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable { private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class); - private final ReplicationPeerConfig peerConfig; + private ReplicationPeerConfig peerConfig; private final String id; private volatile PeerState peerState; private volatile Map> tableCFs = new HashMap>(); @@ -53,7 +53,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea private PeerStateTracker peerStateTracker; private TableCFsTracker tableCFsTracker; - + private PeerConfigTracker peerConfigTracker; /** * Constructor that takes all the objects required to communicate with the specified peer, except * for the region server addresses. @@ -129,7 +129,31 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false)); this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs); } + /** + * start a table-cfs tracker to listen the (table, cf-list) map change + * @param zookeeper + * @param peerConfigNode path to zk node which stores table-cfs + * @throws KeeperException + */ + public void startPeerConfigTracker(ZooKeeperWatcher zookeeper, String peerConfigNode) + throws KeeperException { + this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper, + this); + this.peerConfigTracker.start(); + this.readPeerConfig(); + } + private ReplicationPeerConfig readPeerConfig() { + try { + byte[] data = peerConfigTracker.getData(false); + if (data != null) { + this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data); + } + } catch (DeserializationException e) { + LOG.error("", e); + } + return this.peerConfig; + } @Override public PeerState getPeerState() { return peerState; @@ -172,6 +196,13 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea } @Override + public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { + if (this.peerConfigTracker != null){ + this.peerConfigTracker.setListener(listener); + } + } + + @Override public void abort(String why, Throwable e) { LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig + " was aborted for the following reason(s):" + why, e); @@ -290,4 +321,40 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea } } } + + /** + * Tracker for PeerConfigNode of this peer + */ + public class PeerConfigTracker extends ZooKeeperNodeTracker { + + private ReplicationPeerConfigListener listener; + + public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, peerConfigNode, abortable); + } + + public synchronized void setListener(ReplicationPeerConfigListener listener){ + this.listener = listener; + } + + @Override + public synchronized void nodeCreated(String path) { + if (path.equals(node)) { + super.nodeCreated(path); + ReplicationPeerConfig config = readPeerConfig(); + if (listener != null){ + listener.peerConfigUpdated(config); + } + } + } + + @Override + public synchronized void nodeDataChanged(String path) { + //superclass calls nodeCreated + if (path.equals(node)) { + super.nodeDataChanged(path); + } + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 745997e..b8d04b4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -156,4 +156,12 @@ public interface ReplicationPeers { * @return the configuration for the peer cluster, null if it was unable to get the configuration */ Pair getPeerConf(String peerId) throws ReplicationException; + + /** + * Updates replication peer configuration and/or peer data + * @param id a short that identifies the cluster + * @param peerConfig configuration for the replication slave cluster + * @throws ReplicationException + */ + void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index ad634fa..7bf6c43 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -36,9 +36,6 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.util.Bytes; @@ -49,8 +46,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; -import com.google.protobuf.ByteString; - /** * This class provides an implementation of the ReplicationPeers interface using Zookeeper. The * peers znode contains a list of all peer replication clusters and the current replication state of @@ -137,7 +132,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re List listOfOps = new ArrayList(); ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id), - toByteArray(peerConfig)); + ReplicationSerDeHelper.toByteArray(peerConfig)); // There is a race (if hbase.zookeeper.useMulti is false) // b/w PeerWatcher and ReplicationZookeeper#add method to create the // peer-state znode. This happens while adding a peer @@ -325,7 +320,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } try { - return parsePeerFrom(data); + return ReplicationSerDeHelper.parsePeerFrom(data); } catch (DeserializationException e) { LOG.warn("Failed to parse cluster key from peerId=" + peerId + ", specifically the content from the following znode: " + znode); @@ -360,6 +355,43 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return new Pair(peerConfig, otherConf); } + @Override + public void updatePeerConfig(String id, ReplicationPeerConfig newConfig) + throws ReplicationException { + ReplicationPeer peer = getPeer(id); + if (peer == null){ + throw new ReplicationException("Could not find peer Id " + id); + } + ReplicationPeerConfig existingConfig = peer.getPeerConfig(); + if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() && + !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){ + throw new ReplicationException("Changing the cluster key on an existing peer is not allowed." + + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '" + + newConfig.getClusterKey() + + "'"); + } + String existingEndpointImpl = existingConfig.getReplicationEndpointImpl(); + if (newConfig.getReplicationEndpointImpl() != null && + !newConfig.getReplicationEndpointImpl().isEmpty() && + !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){ + throw new ReplicationException("Changing the replication endpoint implementation class " + + "on an existing peer is not allowed. Existing class '" + + existingConfig.getReplicationEndpointImpl() + + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'"); + } + //Update existingConfig's peer config and peer data with the new values, but don't touch config + // or data that weren't explicitly changed + existingConfig.getConfiguration().putAll(newConfig.getConfiguration()); + existingConfig.getPeerData().putAll(newConfig.getPeerData()); + try { + ZKUtil.setData(this.zookeeper, getPeerNode(id), + ReplicationSerDeHelper.toByteArray(existingConfig)); + } + catch(KeeperException ke){ + throw new ReplicationException("There was a problem trying to save changes to the " + + "replication peer " + id, ke); + } + } /** * List all registered peer clusters and set a watch on their znodes. */ @@ -501,90 +533,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re peerId, e); } - return peer; - } - - /** - * @param bytes Content of a peer znode. - * @return ClusterKey parsed from the passed bytes. - * @throws DeserializationException - */ - private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) - throws DeserializationException { - if (ProtobufUtil.isPBMagicPrefix(bytes)) { - int pblen = ProtobufUtil.lengthOfPBMagic(); - ZooKeeperProtos.ReplicationPeer.Builder builder = - ZooKeeperProtos.ReplicationPeer.newBuilder(); - ZooKeeperProtos.ReplicationPeer peer; - try { - ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); - peer = builder.build(); - } catch (IOException e) { - throw new DeserializationException(e); - } - return convert(peer); - } else { - if (bytes.length > 0) { - return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes)); - } - return new ReplicationPeerConfig().setClusterKey(""); - } - } - - private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) { - ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); - if (peer.hasClusterkey()) { - peerConfig.setClusterKey(peer.getClusterkey()); - } - if (peer.hasReplicationEndpointImpl()) { - peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); - } - - for (BytesBytesPair pair : peer.getDataList()) { - peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); - } - - for (NameStringPair pair : peer.getConfigurationList()) { - peerConfig.getConfiguration().put(pair.getName(), pair.getValue()); - } - return peerConfig; - } - - private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { - ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder(); - if (peerConfig.getClusterKey() != null) { - builder.setClusterkey(peerConfig.getClusterKey()); - } - if (peerConfig.getReplicationEndpointImpl() != null) { - builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); - } - - for (Map.Entry entry : peerConfig.getPeerData().entrySet()) { - builder.addData(BytesBytesPair.newBuilder() - .setFirst(ByteString.copyFrom(entry.getKey())) - .setSecond(ByteString.copyFrom(entry.getValue())) - .build()); + try { + peer.startPeerConfigTracker(this.zookeeper, this.getPeerNode(peerId)); } - - for (Map.Entry entry : peerConfig.getConfiguration().entrySet()) { - builder.addConfiguration(NameStringPair.newBuilder() - .setName(entry.getKey()) - .setValue(entry.getValue()) - .build()); + catch(KeeperException e) { + throw new ReplicationException("Error starting the peer config tracker for peerId=" + + peerId, e); } - return builder.build(); - } - - /** - * @param peerConfig - * @return Serialized protobuf of peerConfig with pb magic prefix prepended suitable - * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under - * /hbase/replication/peers/PEER_ID - */ - private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { - byte[] bytes = convert(peerConfig).toByteArray(); - return ProtobufUtil.prependPBMagic(bytes); + return peer; } private void checkQueuesDeleted(String peerId) throws ReplicationException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java new file mode 100644 index 0000000..05f909d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import com.google.protobuf.ByteString; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.Map; + +@InterfaceAudience.Private +@InterfaceStability.Stable +public final class ReplicationSerDeHelper { + private static final Log LOG = LogFactory.getLog(ReplicationSerDeHelper.class); + + private ReplicationSerDeHelper() {} + + /** + * @param bytes Content of a peer znode. + * @return ClusterKey parsed from the passed bytes. + * @throws DeserializationException + */ + public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) + throws DeserializationException { + if (ProtobufUtil.isPBMagicPrefix(bytes)) { + int pblen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.ReplicationPeer.Builder builder = + ZooKeeperProtos.ReplicationPeer.newBuilder(); + ZooKeeperProtos.ReplicationPeer peer; + try { + ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); + peer = builder.build(); + } catch (IOException e) { + throw new DeserializationException(e); + } + return convert(peer); + } else { + if (bytes.length > 0) { + return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes)); + } + return new ReplicationPeerConfig().setClusterKey(""); + } + } + + private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) { + ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); + if (peer.hasClusterkey()) { + peerConfig.setClusterKey(peer.getClusterkey()); + } + if (peer.hasReplicationEndpointImpl()) { + peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); + } + + for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) { + peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); + } + + for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { + peerConfig.getConfiguration().put(pair.getName(), pair.getValue()); + } + return peerConfig; + } + + /** + * @param peerConfig + * @return Serialized protobuf of peerConfig with pb magic prefix prepended suitable + * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under + * /hbase/replication/peers/PEER_ID + */ + public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { + byte[] bytes = convert(peerConfig).toByteArray(); + return ProtobufUtil.prependPBMagic(bytes); + } + + private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { + ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder(); + if (peerConfig.getClusterKey() != null) { + builder.setClusterkey(peerConfig.getClusterKey()); + } + if (peerConfig.getReplicationEndpointImpl() != null) { + builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); + } + + for (Map.Entry entry : peerConfig.getPeerData().entrySet()) { + builder.addData(HBaseProtos.BytesBytesPair.newBuilder() + .setFirst(ByteString.copyFrom(entry.getKey())) + .setSecond(ByteString.copyFrom(entry.getValue())) + .build()); + } + + for (Map.Entry entry : peerConfig.getConfiguration().entrySet()) { + builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder() + .setName(entry.getKey()) + .setValue(entry.getValue()) + .build()); + } + + return builder.build(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index a1dc1c8..ed9359d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -118,4 +118,8 @@ public abstract class ReplicationStateZKBase { protected boolean isPeerPath(String path) { return path.split("/").length == peersZNode.split("/").length + 1; } + + protected String getPeerNode(String id) { + return ZKUtil.joinZNode(this.peersZNode, id); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 67051ab..bc73f74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import com.google.common.collect.Lists; @@ -35,11 +37,30 @@ import com.google.common.util.concurrent.AbstractService; public abstract class BaseReplicationEndpoint extends AbstractService implements ReplicationEndpoint { + private static final Log LOG = LogFactory.getLog(BaseReplicationEndpoint.class); protected Context ctx; @Override public void init(Context context) throws IOException { this.ctx = context; + + if (this.ctx != null){ + ReplicationPeer peer = this.ctx.getReplicationPeer(); + if (peer != null){ + peer.trackPeerConfigChanges(this); + } else { + LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() + + " because there's no such peer"); + } + } + } + + @Override + /** + * No-op implementation for subclasses to override if they wish to execute logic if their config changes + */ + public void peerConfigUpdated(ReplicationPeerConfig rpc){ + } /** Returns a default set of filters */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index ac1257f..c92b53d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -46,14 +46,13 @@ import com.google.common.util.concurrent.Service; * and persisting of the WAL entries in the other cluster. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) -public interface ReplicationEndpoint extends Service { +public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListener { @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) class Context { private final Configuration conf; private final FileSystem fs; private final TableDescriptors tableDescriptors; - private final ReplicationPeerConfig peerConfig; private final ReplicationPeer replicationPeer; private final String peerId; private final UUID clusterId; @@ -63,13 +62,11 @@ public interface ReplicationEndpoint extends Service { public Context( final Configuration conf, final FileSystem fs, - final ReplicationPeerConfig peerConfig, final String peerId, final UUID clusterId, final ReplicationPeer replicationPeer, final MetricsSource metrics, final TableDescriptors tableDescriptors) { - this.peerConfig = peerConfig; this.conf = conf; this.fs = fs; this.clusterId = clusterId; @@ -91,7 +88,7 @@ public interface ReplicationEndpoint extends Service { return peerId; } public ReplicationPeerConfig getPeerConfig() { - return peerConfig; + return replicationPeer.getPeerConfig(); } public ReplicationPeer getReplicationPeer() { return replicationPeer; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9ff4b2d..83e0205 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -494,7 +494,7 @@ public class ReplicationSourceManager implements ReplicationListener { // init replication endpoint replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), - fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors)); + fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors)); return src; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java index 6519fc2..e5f6a21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -152,4 +153,8 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { return delegator.stopAndWait(); } + @Override + public void peerConfigUpdated(ReplicationPeerConfig rpc) { + + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index d628a7c..b75c1cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -18,6 +18,7 @@ import static org.junit.Assert.fail; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -27,6 +28,8 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.AfterClass; @@ -194,4 +197,55 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { adminExt.disableTableRep(tableName); } } + + @Test(timeout=300000) + public void testReplicationPeerConfigUpdateCallback() throws Exception { + String peerId = "1"; + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName()); + rpc.getConfiguration().put("key1", "value1"); + admin.addPeer(peerId, rpc, null); + admin.peerAdded(peerId); + rpc.getConfiguration().put("key1", "value2"); + admin.updatePeerConfig(peerId, rpc); + if (!TestUpdatableReplicationEndpoint.hasCalledBack()) { + synchronized(TestUpdatableReplicationEndpoint.class) { + TestUpdatableReplicationEndpoint.class.wait(2000L); + } + } + assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack()); + } + + public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint { + private static boolean calledBack = false; + public static boolean hasCalledBack(){ + return calledBack; + } + @Override + public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc){ + calledBack = true; + notifyAll(); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + + @Override + public UUID getPeerUUID() { + return UUID.randomUUID(); + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + return false; + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 1c64f09..882481a 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -178,5 +178,32 @@ module Hbase def get_peer_config(id) @replication_admin.get_peer_config(id) end + + + def peer_added(id) + @replication_admin.peer_added(id) + end + + def update_peer_config(id, args={}) + # Optional parameters + config = args.fetch(CONFIG, nil) + data = args.fetch(DATA, nil) + + # Create and populate a ReplicationPeerConfig + replication_peer_config = ReplicationPeerConfig.new + unless config.nil? + replication_peer_config.get_configuration.put_all(config) + end + + unless data.nil? + # Convert Strings to Bytes for peer_data + peer_data = replication_peer_config.get_peer_data + data.each{|key, val| + peer_data.put(Bytes.to_bytes(key), Bytes.to_bytes(val)) + } + end + + @replication_admin.update_peer_config(id, replication_peer_config) + end end end diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 7b5766c..0490c78 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -361,6 +361,7 @@ Shell.load_command_group( disable_table_replication get_peer_config list_peer_configs + update_peer_config ] ) diff --git a/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb b/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb new file mode 100644 index 0000000..bcecb91 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb @@ -0,0 +1,49 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class UpdatePeerConfig< Command + def help + return <<-EOF +A peer can either be another HBase cluster or a custom replication endpoint. In either case an id +must be specified to identify the peer. This command does not interrupt processing on an enabled replication peer. + +Two optional arguments are DATA and CONFIG which can be specified to set different values for either +the peer_data or configuration for a custom replication endpoint. Any existing values not updated by this command +are left unchanged. + +CLUSTER_KEY, REPLICATION_ENDPOINT, and TABLE_CFs cannot be updated with this command. +To update TABLE_CFs, see the append_peer_tableCFs and remove_peer_tableCFs commands. + + hbase> update_peer_config '1', DATA => { "key1" => 1 } + hbase> update_peer_config '2', CONFIG => { "config1" => "value1", "config2" => "value2" } + hbase> update_peer_config '3', DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" }, + + EOF + end + + def command(id, args = {}) + format_simple_command do + replication_admin.update_peer_config(id, args) + end + end + end + end +end \ No newline at end of file diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 4923560..6edb447 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -30,12 +30,9 @@ module Hbase include TestHelpers def setup - @test_name = "hbase_shell_tests_table" @peer_id = '1' setup_hbase - drop_test_table(@test_name) - create_test_table(@test_name) assert_equal(0, replication_admin.list_peers.length) end @@ -215,6 +212,31 @@ module Hbase replication_admin.remove_peer(peer_id_second) end + define_test "update_peer_config: can update peer config and data" do + repl_impl = "org.apache.hadoop.hbase.replication.ReplicationEndpointForTest" + config_params = { "config1" => "value1", "config2" => "value2" } + data_params = {"data1" => "value1", "data2" => "value2"} + args = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params, DATA => data_params} + replication_admin.add_peer(@peer_id, args) + + #Normally the ReplicationSourceManager will call ReplicationPeer#peer_added, but here we have to do it ourselves + replication_admin.peer_added(@peer_id) + + new_config_params = { "config1" => "new_value1" } + new_data_params = {"data1" => "new_value1"} + new_args = {CONFIG => new_config_params, DATA => new_data_params} + replication_admin.update_peer_config(@peer_id, new_args) + + #Make sure the updated key/value pairs in config and data were successfully updated, and that those we didn't + #update are still there and unchanged + peer_config = replication_admin.get_peer_config(@peer_id) + replication_admin.remove_peer(@peer_id) + assert_equal("new_value1", peer_config.get_configuration.get("config1")) + assert_equal("value2", peer_config.get_configuration.get("config2")) + assert_equal("new_value1", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data1")))) + assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2")))) + end + # assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279 # Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below. # define_test "add_peer: adding a second peer with same id should error" do -- 1.8.4