regionServers) {
+ this.regionServers = regionServers;
+ lastRegionserverUpdate = System.currentTimeMillis();
+ }
+
+ /**
+ * Get the timestamp at which the last change occurred to the list of region servers to replicate
+ * to.
+ * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
+ */
+ public long getLastRegionserverUpdate() {
+ return lastRegionserverUpdate;
+ }
+
+ /**
+ * Tracks changes to the list of region servers in a peer's cluster.
+ */
+ public static class PeerRegionServerListener extends ZooKeeperListener {
+
+ private HBaseReplicationEndpoint replicator;
+ private String regionServerListNode;
+
+ public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) {
+ super(replicationPeer.getZkw());
+ this.replicator = replicationPeer;
+ this.regionServerListNode = replicator.getZkw().rsZNode;
+ }
+
+ public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) {
+ super(zkw);
+ this.regionServerListNode = regionServerListNode;
+ }
+
+ @Override
+ public synchronized void nodeChildrenChanged(String path) {
+ if (path.equals(regionServerListNode)) {
+ try {
+ LOG.info("Detected change to peer regionservers, fetching updated list");
+ replicator.setRegionServers(fetchSlavesAddresses(replicator.getZkw()));
+ } catch (KeeperException e) {
+ LOG.fatal("Error reading slave addresses", e);
+ }
+ }
+ }
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
new file mode 100644
index 0000000..487ff64
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+
+import com.google.common.util.concurrent.Service;
+
+/**
+ * ReplicationService is a plugin which implements replication
+ * to other HBase clusters, or other systems. ReplicationService implementation
+ * can be specified at the peer creation time by specifying it
+ * in the {@link ReplicationPeerConfig}. A ReplicationService is run in a thread
+ * in each region server in the same process.
+ *
+ * ReplicationService is closely tied to ReplicationSource in a producer-consumer
+ * relation. ReplicationSource is an HBase-private class which tails the logs and manages
+ * the queue of logs plus management and persistence of all the state for replication.
+ * ReplicationService on the other hand is responsible for doing the actual shipping
+ * and persisting of the WAL entries in the other cluster.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationEndpoint extends Service {
+
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+ class Context {
+ private final Configuration conf;
+ private final FileSystem fs;
+ private final ReplicationPeerConfig peerConfig;
+ private final ReplicationPeer replicationPeer;
+ private final String peerId;
+ private final UUID clusterId;
+ private final MetricsSource metrics;
+
+ @InterfaceAudience.Private
+ public Context(
+ final Configuration conf,
+ final FileSystem fs,
+ final ReplicationPeerConfig peerConfig,
+ final String peerId,
+ final UUID clusterId,
+ final ReplicationPeer replicationPeer,
+ final MetricsSource metrics) {
+ this.peerConfig = peerConfig;
+ this.conf = conf;
+ this.fs = fs;
+ this.clusterId = clusterId;
+ this.peerId = peerId;
+ this.replicationPeer = replicationPeer;
+ this.metrics = metrics;
+ }
+ public Configuration getConfiguration() {
+ return conf;
+ }
+ public FileSystem getFilesystem() {
+ return fs;
+ }
+ public UUID getClusterId() {
+ return clusterId;
+ }
+ public String getPeerId() {
+ return peerId;
+ }
+ public ReplicationPeerConfig getPeerConfig() {
+ return peerConfig;
+ }
+ public ReplicationPeer getReplicationPeer() {
+ return replicationPeer;
+ }
+ public MetricsSource getMetrics() {
+ return metrics;
+ }
+ }
+
+ /**
+ * Initialize the consumer with the given context.
+ * @param context replication context
+ * @throws IOException
+ */
+ void init(Context context) throws IOException;
+
+ /** Whether or not, the replication consumer can replicate to it's source cluster with the same
+ * UUID */
+ boolean canReplicateToSameCluster();
+
+ /**
+ * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted
+ * associated UUID. If the replication is not performed to an actual HBase cluster (but
+ * some other system), the UUID returned has to uniquely identify the connected target system.
+ * @return a UUID or null if the peer cluster does not exist or is not connected.
+ */
+ UUID getPeerUUID();
+
+ /**
+ * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication
+ * infrastructure will call this filter before sending the edits to shipEdits().
+ * @return a {@link WALEntryFilter} or null.
+ */
+ WALEntryFilter getWALEntryfilter();
+
+ /**
+ * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method.
+ */
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+ class ReplicateContext {
+ List entries;
+ int size;
+ @InterfaceAudience.Private
+ public ReplicateContext() {
+ }
+
+ public ReplicateContext setEntries(List entries) {
+ this.entries = entries;
+ return this;
+ }
+ public ReplicateContext setSize(int size) {
+ this.size = size;
+ return this;
+ }
+ public List getEntries() {
+ return entries;
+ }
+ public int getSize() {
+ return size;
+ }
+ }
+
+ /**
+ * Replicate the given set of entries (in the context) to the other cluster.
+ * Can block until all the given entries are replicated. Upon this method is returned,
+ * all entries that were passed in the context are assumed to be persisted in the
+ * target cluster.
+ * @param replicateContext a context where WAL entries and other
+ * parameters can be obtained.
+ */
+ boolean replicate(ReplicateContext replicateContext);
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
new file mode 100644
index 0000000..7b01d11
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+
+/**
+ * Keeps KVs that are scoped other than local
+ */
+@InterfaceAudience.Private
+public class ScopeWALEntryFilter implements WALEntryFilter {
+
+ @Override
+ public Entry filter(Entry entry) {
+ NavigableMap scopes = entry.getKey().getScopes();
+ ArrayList kvs = entry.getEdit().getKeyValues();
+ int size = kvs.size();
+ for (int i = size-1; i >= 0; i--) {
+ KeyValue kv = kvs.get(i);
+ // The scope will be null or empty if
+ // there's nothing to replicate in that WALEdit
+ if (scopes == null || !scopes.containsKey(kv.getFamily())) {
+ kvs.remove(i);
+ }
+ }
+ if (kvs.size() < size/2) {
+ kvs.trimToSize();
+ }
+ return entry;
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
new file mode 100644
index 0000000..e84ac18
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+
+/**
+ * Skips WAL edits for all System tables including META
+ */
+@InterfaceAudience.Private
+public class SystemTableWALEntryFilter implements WALEntryFilter {
+ @Override
+ public Entry filter(Entry entry) {
+ if (entry.getKey().getTablename().isSystemTable()) {
+ return null;
+ }
+ return entry;
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
new file mode 100644
index 0000000..963df9a
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TableCfWALEntryFilter implements WALEntryFilter {
+
+ private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
+ private final ReplicationPeer peer;
+
+ public TableCfWALEntryFilter(ReplicationPeer peer) {
+ this.peer = peer;
+ }
+
+ @Override
+ public Entry filter(Entry entry) {
+ String tabName = entry.getKey().getTablename().getNameAsString();
+ ArrayList kvs = entry.getEdit().getKeyValues();
+ Map> tableCFs = null;
+
+ try {
+ tableCFs = this.peer.getTableCFs();
+ } catch (IllegalArgumentException e) {
+ LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
+ ", degenerate as if it's not configured by keeping tableCFs==null");
+ }
+ int size = kvs.size();
+
+ // clear kvs(prevent replicating) if logKey's table isn't in this peer's
+ // replicable table list (empty tableCFs means all table are replicable)
+ if (tableCFs != null && !tableCFs.containsKey(tabName)) {
+ return null;
+ } else {
+ List cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
+ for (int i = size - 1; i >= 0; i--) {
+ KeyValue kv = kvs.get(i);
+ // ignore(remove) kv if its cf isn't in the replicable cf list
+ // (empty cfs means all cfs of this table are replicable)
+ if ((cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
+ kvs.remove(i);
+ }
+ }
+ }
+ if (kvs.size() < size/2) {
+ kvs.trimToSize();
+ }
+ return entry;
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
new file mode 100644
index 0000000..100e91d
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+/**
+ * A Filter for WAL entries before being sent over to replication. Multiple
+ * filters might be chained together using {@link ChainWALEntryFilter}.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface WALEntryFilter {
+
+ /**
+ * Applies the filter, possibly returning a different HLog.Entry instance.
+ * If null is returned, the entry will be skipped.
+ * @param entry WAL Entry to filter
+ * @return
+ */
+ public HLog.Entry filter(HLog.Entry entry);
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index cea5750..be69d98 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -25,14 +25,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
import java.io.IOException;
import java.util.List;
import java.util.Set;
@@ -57,12 +54,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
@Override
public Iterable getDeletableFiles(Iterable files) {
- // all members of this class are null if replication is disabled,
+ // all members of this class are null if replication is disabled,
// so we cannot filter the files
if (this.getConf() == null) {
return files;
}
-
+
final Set hlogs = loadHLogsFromQueues();
return Iterables.filter(files, new Predicate() {
@Override
@@ -137,8 +134,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
LOG.info("Stopping " + this.zkw);
this.zkw.close();
}
- // Not sure why we're deleting a connection that we never acquired or used
- HConnectionManager.deleteConnection(this.getConf());
}
@Override
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
new file mode 100644
index 0000000..935da9b
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A {@link ReplicationEndpoint} implementation for replicating to another HBase cluster.
+ * For the slave cluster it selects a random number of peers
+ * using a replication ratio. For example, if replication ration = 0.1
+ * and slave cluster has 100 region servers, 10 will be selected.
+ *
+ * A stream is considered down when we cannot contact a region server on the
+ * peer cluster for more than 55 seconds by default.
+ */
+@InterfaceAudience.Private
+public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
+
+ public static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
+ private HConnection conn;
+
+ private Configuration conf;
+
+ // How long should we sleep for each retry
+ private long sleepForRetries;
+
+ // Maximum number of retries before taking bold actions
+ private int maxRetriesMultiplier;
+ // Socket timeouts require even bolder actions since we don't want to DDOS
+ private int socketTimeoutMultiplier;
+ //Metrics for this source
+ private MetricsSource metrics;
+ // Handles connecting to peer region servers
+ private ReplicationSinkManager replicationSinkMgr;
+ private boolean peersSelected = false;
+
+ @Override
+ public void init(Context context) throws IOException {
+ super.init(context);
+ this.conf = HBaseConfiguration.create(ctx.getConfiguration());
+ decorateConf();
+ this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
+ this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
+ maxRetriesMultiplier * maxRetriesMultiplier);
+ // TODO: This connection is replication specific or we should make it particular to
+ // replication and make replication specific settings such as compression or codec to use
+ // passing Cells.
+ this.conn = HConnectionManager.createConnection(this.conf);
+ this.sleepForRetries =
+ this.conf.getLong("replication.source.sleepforretries", 1000);
+ this.metrics = context.getMetrics();
+ // ReplicationQueueInfo parses the peerId out of the znode for us
+ this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
+ }
+
+ private void decorateConf() {
+ String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
+ if (StringUtils.isNotEmpty(replicationCodec)) {
+ this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
+ }
+ }
+
+ private void connectToPeers() throws KeeperException {
+ getRegionServers();
+
+ int sleepMultiplier = 1;
+
+ // Connect to peer cluster first, unless we have to stop
+ while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+ replicationSinkMgr.chooseSinks();
+ if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+ if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ }
+ }
+
+ /**
+ * Do the sleeping logic
+ * @param msg Why we sleep
+ * @param sleepMultiplier by how many times the default sleeping time is augmented
+ * @return True if sleepMultiplier is < maxRetriesMultiplier
+ */
+ protected boolean sleepForRetries(String msg, int sleepMultiplier) {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+ }
+ Thread.sleep(this.sleepForRetries * sleepMultiplier);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping between retries");
+ }
+ return sleepMultiplier < maxRetriesMultiplier;
+ }
+
+ /**
+ * Do the shipping logic
+ */
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ List entries = replicateContext.getEntries();
+ int sleepMultiplier = 1;
+ while (this.isRunning()) {
+ if (!peersSelected) {
+ try {
+ connectToPeers();
+ peersSelected = true;
+ } catch (KeeperException ke) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fetch salves addresses failed.", ke);
+ }
+ reconnect(ke);
+ }
+ }
+
+ if (!isPeerEnabled()) {
+ if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+ SinkPeer sinkPeer = null;
+ try {
+ sinkPeer = replicationSinkMgr.getReplicationSink();
+ BlockingInterface rrs = sinkPeer.getRegionServer();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Replicating " + entries.size() +
+ " entries of total size " + replicateContext.getSize());
+ }
+ ReplicationProtbufUtil.replicateWALEntry(rrs,
+ entries.toArray(new HLog.Entry[entries.size()]));
+
+ // update metrics
+ this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
+ return true;
+
+ } catch (IOException ioe) {
+ // Didn't ship anything, but must still age the last time we did
+ this.metrics.refreshAgeOfLastShippedOp();
+ if (ioe instanceof RemoteException) {
+ ioe = ((RemoteException) ioe).unwrapRemoteException();
+ LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
+ if (ioe instanceof TableNotFoundException) {
+ if (sleepForRetries("A table is missing in the peer cluster. "
+ + "Replication cannot proceed without losing data.", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ } else {
+ if (ioe instanceof SocketTimeoutException) {
+ // This exception means we waited for more than 60s and nothing
+ // happened, the cluster is alive and calling it right away
+ // even for a test just makes things worse.
+ sleepForRetries("Encountered a SocketTimeoutException. Since the " +
+ "call to the remote cluster timed out, which is usually " +
+ "caused by a machine failure or a massive slowdown",
+ this.socketTimeoutMultiplier);
+ } else if (ioe instanceof ConnectException) {
+ LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
+ replicationSinkMgr.chooseSinks();
+ } else {
+ LOG.warn("Can't replicate because of a local or network error: ", ioe);
+ }
+ }
+
+ if (sinkPeer != null) {
+ replicationSinkMgr.reportBadSink(sinkPeer);
+ }
+ if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ }
+ return false; // in case we exited before replicating
+ }
+
+ protected boolean isPeerEnabled() {
+ return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
+ }
+
+ @Override
+ protected void doStop() {
+ disconnect(); //don't call super.doStop()
+ if (this.conn != null) {
+ try {
+ this.conn.close();
+ this.conn = null;
+ } catch (IOException e) {
+ LOG.warn("Failed to close the connection");
+ }
+ }
+ notifyStopped();
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index b38a0c8..94dec7c 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -22,13 +22,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* This class is for maintaining the various replication statistics for a source and publishing them
* through the metrics interfaces.
*/
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class MetricsSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
@@ -152,7 +153,7 @@ public class MetricsSource {
rms.incCounters(shippedKBsKey, sizeInKB);
rms.incCounters(SOURCE_SHIPPED_KBS, sizeInKB);
}
-
+
/** increase the byte number read by source from log file */
public void incrLogReadInBytes(long readInBytes) {
rms.incCounters(logReadInBytesKey, readInBytes);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index 839db9b..fd10d6e 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -29,8 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
-
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -61,7 +60,7 @@ public class ReplicationSinkManager {
private final String peerClusterId;
- private final ReplicationPeers replicationPeers;
+ private final HBaseReplicationEndpoint endpoint;
// Count of "bad replication sink" reports per peer sink
private final Map badReportCounts;
@@ -90,10 +89,10 @@ public class ReplicationSinkManager {
* threshold
*/
public ReplicationSinkManager(HConnection conn, String peerClusterId,
- ReplicationPeers replicationPeers, Configuration conf) {
+ HBaseReplicationEndpoint endpoint, Configuration conf) {
this.conn = conn;
this.peerClusterId = peerClusterId;
- this.replicationPeers = replicationPeers;
+ this.endpoint = endpoint;
this.badReportCounts = Maps.newHashMap();
this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
@@ -107,8 +106,7 @@ public class ReplicationSinkManager {
* @return a replication sink to replicate to
*/
public SinkPeer getReplicationSink() throws IOException {
- if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId)
- > this.lastUpdateToPeers) {
+ if (endpoint.getLastRegionserverUpdate() > this.lastUpdateToPeers) {
LOG.info("Current list of sinks is out of date, updating");
chooseSinks();
}
@@ -143,8 +141,7 @@ public class ReplicationSinkManager {
}
void chooseSinks() {
- List slaveAddresses =
- replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
+ List slaveAddresses = endpoint.getRegionServers();
Collections.shuffle(slaveAddresses, random);
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
sinks = slaveAddresses.subList(0, numSinks);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 4e2106d..ca508b5 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -21,13 +21,9 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -41,27 +37,25 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.ipc.RemoteException;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
/**
* Class that handles the source of a replication stream.
@@ -82,9 +76,9 @@ public class ReplicationSource extends Thread
public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Queue of logs to process
private PriorityBlockingQueue queue;
- private HConnection conn;
private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
+
private Configuration conf;
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
@@ -118,8 +112,6 @@ public class ReplicationSource extends Thread
private String peerClusterZnode;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
- // Socket timeouts require even bolder actions since we don't want to DDOS
- private int socketTimeoutMultiplier;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
// Current size of data we need to replicate
@@ -130,10 +122,14 @@ public class ReplicationSource extends Thread
private MetricsSource metrics;
// Handle on the log reader helper
private ReplicationHLogReaderManager repLogReader;
- // Handles connecting to peer region servers
- private ReplicationSinkManager replicationSinkMgr;
//WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
+ // ReplicationEndpoint which will handle the actual replication
+ private ReplicationEndpoint replicationEndpoint;
+ // A filter (or a chain of filters) for the WAL entries.
+ private WALEntryFilter walEntryFilter;
+ // Context for ReplicationEndpoint#replicate()
+ private ReplicationEndpoint.ReplicateContext replicateContext;
// throttler
private ReplicationThrottler throttler;
@@ -147,28 +143,25 @@ public class ReplicationSource extends Thread
* @param peerClusterZnode the name of our znode
* @throws IOException
*/
+ @Override
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
- final String peerClusterZnode, final UUID clusterId) throws IOException {
+ final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
+ final MetricsSource metrics)
+ throws IOException {
this.stopper = stopper;
- this.conf = HBaseConfiguration.create(conf);
+ this.conf = conf;
decorateConf();
this.replicationQueueSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
this.replicationQueueNbCapacity =
this.conf.getInt("replication.source.nb.capacity", 25000);
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
- this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
- maxRetriesMultiplier * maxRetriesMultiplier);
this.queue =
new PriorityBlockingQueue(
this.conf.getInt("hbase.regionserver.maxlogs", 32),
new LogsComparator());
- // TODO: This connection is replication specific or we should make it particular to
- // replication and make replication specific settings such as compression or codec to use
- // passing Cells.
- this.conn = HConnectionManager.getConnection(this.conf);
long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
this.replicationQueues = replicationQueues;
@@ -177,7 +170,7 @@ public class ReplicationSource extends Thread
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
- this.metrics = new MetricsSource(peerClusterZnode);
+ this.metrics = metrics;
this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
this.clusterId = clusterId;
@@ -185,8 +178,10 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
- this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
+ this.replicationEndpoint = replicationEndpoint;
+
+ this.replicateContext = new ReplicationEndpoint.ReplicateContext();
}
private void decorateConf() {
@@ -209,30 +204,47 @@ public class ReplicationSource extends Thread
}
private void uninitialize() {
- if (this.conn != null) {
- try {
- this.conn.close();
- } catch (IOException e) {
- LOG.debug("Attempt to close connection failed", e);
- }
- }
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
+ if (replicationEndpoint.state() == Service.State.STARTING
+ || replicationEndpoint.state() == Service.State.RUNNING) {
+ replicationEndpoint.stopAndWait();
+ }
}
@Override
public void run() {
- connectToPeers();
// We were stopped while looping to connect to sinks, just abort
if (!this.isActive()) {
uninitialize();
return;
}
+ try {
+ // start the endpoint, connect to the cluster
+ Service.State state = replicationEndpoint.start().get();
+ if (state != Service.State.RUNNING) {
+ LOG.warn("ReplicationEndpoint was not started. Exiting");
+ return;
+ }
+ } catch (Exception ex) {
+ LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
+ throw new RuntimeException(ex);
+ }
+
+ // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
+ ArrayList filters = Lists.newArrayList(
+ (WALEntryFilter)new SystemTableWALEntryFilter());
+ WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
+ if (filterFromEndpoint != null) {
+ filters.add(filterFromEndpoint);
+ }
+ this.walEntryFilter = new ChainWALEntryFilter(filters);
+
int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
while (this.isActive() && this.peerClusterId == null) {
- this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
+ this.peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
@@ -250,9 +262,10 @@ public class ReplicationSource extends Thread
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
- if (clusterId.equals(peerClusterId)) {
+ if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
- + peerClusterId);
+ + peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ + replicationEndpoint.getClass().getName(), null, false);
}
LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
@@ -409,18 +422,22 @@ public class ReplicationSource extends Thread
HLog.Entry entry =
this.repLogReader.readNextAndSetPosition();
while (entry != null) {
- WALEdit edit = entry.getEdit();
this.metrics.incrLogEditsRead();
seenEntries++;
- // Remove all KVs that should not be replicated
- HLogKey logKey = entry.getKey();
+
// don't replicate if the log entries have already been consumed by the cluster
- if (!logKey.getClusterIds().contains(peerClusterId)) {
- removeNonReplicableEdits(entry);
- // Don't replicate catalog entries, if the WALEdit wasn't
- // containing anything to replicate and if we're currently not set to replicate
- if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) &&
- edit.size() != 0) {
+ if (replicationEndpoint.canReplicateToSameCluster()
+ || !entry.getKey().getClusterIds().contains(peerClusterId)) {
+ // Remove all KVs that should not be replicated
+ entry = walEntryFilter.filter(entry);
+ WALEdit edit = null;
+ HLogKey logKey = null;
+ if (entry != null) {
+ edit = entry.getEdit();
+ logKey = entry.getKey();
+ }
+
+ if (edit != null && edit.size() != 0) {
//Mark that the current cluster has the change
logKey.addClusterId(clusterId);
currentNbOperations += countDistinctRowKeys(edit);
@@ -451,20 +468,6 @@ public class ReplicationSource extends Thread
return seenEntries == 0 && processEndOfFile();
}
- private void connectToPeers() {
- int sleepMultiplier = 1;
-
- // Connect to peer cluster first, unless we have to stop
- while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
- replicationSinkMgr.chooseSinks();
- if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
- if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
- sleepMultiplier++;
- }
- }
- }
- }
-
/**
* Poll for the next path
* @return true if a path was obtained, false if not
@@ -622,47 +625,6 @@ public class ReplicationSource extends Thread
}
/**
- * We only want KVs that are scoped other than local
- * @param entry The entry to check for replication
- */
- protected void removeNonReplicableEdits(HLog.Entry entry) {
- String tabName = entry.getKey().getTablename().getNameAsString();
- ArrayList kvs = entry.getEdit().getKeyValues();
- Map> tableCFs = null;
- try {
- tableCFs = this.replicationPeers.getTableCFs(peerId);
- } catch (IllegalArgumentException e) {
- LOG.error("should not happen: can't get tableCFs for peer " + peerId +
- ", degenerate as if it's not configured by keeping tableCFs==null");
- }
- int size = kvs.size();
-
- // clear kvs(prevent replicating) if logKey's table isn't in this peer's
- // replicable table list (empty tableCFs means all table are replicable)
- if (tableCFs != null && !tableCFs.containsKey(tabName)) {
- kvs.clear();
- } else {
- NavigableMap scopes = entry.getKey().getScopes();
- List cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
- for (int i = size - 1; i >= 0; i--) {
- KeyValue kv = kvs.get(i);
- // The scope will be null or empty if
- // there's nothing to replicate in that WALEdit
- // ignore(remove) kv if its cf isn't in the replicable cf list
- // (empty cfs means all cfs of this table are replicable)
- if (scopes == null || !scopes.containsKey(kv.getFamily()) ||
- (cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
- kvs.remove(i);
- }
- }
- }
-
- if (kvs.size() < size/2) {
- kvs.trimToSize();
- }
- }
-
- /**
* Count the number of different row keys in the given edit because of
* mini-batching. We assume that there's at least one KV in the WALEdit.
* @param edit edit to count row keys from
@@ -692,13 +654,6 @@ public class ReplicationSource extends Thread
return;
}
while (this.isActive()) {
- if (!isPeerEnabled()) {
- if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
- SinkPeer sinkPeer = null;
try {
if (this.throttler.isEnabled()) {
long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
@@ -719,14 +674,15 @@ public class ReplicationSource extends Thread
this.throttler.resetStartTick();
}
}
- sinkPeer = replicationSinkMgr.getReplicationSink();
- BlockingInterface rrs = sinkPeer.getRegionServer();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Replicating " + entries.size() +
- " entries of total size " + currentSize);
+ replicateContext.setEntries(entries).setSize(currentSize);
+
+ // send the edits to the endpoint. Will block until the edits are actually sent and acknowledged
+ boolean replicated = replicationEndpoint.replicate(replicateContext);
+
+ if (!replicated) {
+ continue;
}
- ReplicationProtbufUtil.replicateWALEntry(rrs,
- entries.toArray(new HLog.Entry[entries.size()]));
+
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.repLogReader.getPosition(),
@@ -745,50 +701,9 @@ public class ReplicationSource extends Thread
+ this.totalReplicatedOperations + " operations");
}
break;
-
- } catch (IOException ioe) {
- // Didn't ship anything, but must still age the last time we did
- this.metrics.refreshAgeOfLastShippedOp();
- if (ioe instanceof RemoteException) {
- ioe = ((RemoteException) ioe).unwrapRemoteException();
- LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
- if (ioe instanceof TableNotFoundException) {
- if (sleepForRetries("A table is missing in the peer cluster. "
- + "Replication cannot proceed without losing data.", sleepMultiplier)) {
- sleepMultiplier++;
- }
- // current thread might be interrupted to terminate
- // directly go back to while() for confirm this
- if (isInterrupted()) {
- continue;
- }
- }
- } else {
- if (ioe instanceof SocketTimeoutException) {
- // This exception means we waited for more than 60s and nothing
- // happened, the cluster is alive and calling it right away
- // even for a test just makes things worse.
- sleepForRetries("Encountered a SocketTimeoutException. Since the " +
- "call to the remote cluster timed out, which is usually " +
- "caused by a machine failure or a massive slowdown",
- this.socketTimeoutMultiplier);
- // current thread might be interrupted to terminate
- // directly go back to while() for confirm this
- if (isInterrupted()) {
- continue;
- }
- } else if (ioe instanceof ConnectException) {
- LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
- replicationSinkMgr.chooseSinks();
- } else {
- LOG.warn("Can't replicate because of a local or network error: ", ioe);
- }
- }
-
- if (sinkPeer != null) {
- replicationSinkMgr.reportBadSink(sinkPeer);
- }
- if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
+ } catch (Exception ex) {
+ LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex);
+ if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++;
}
}
@@ -801,7 +716,7 @@ public class ReplicationSource extends Thread
* @return true if the peer is enabled, otherwise false
*/
protected boolean isPeerEnabled() {
- return this.replicationPeers.getStatusOfConnectedPeer(this.peerId);
+ return this.replicationPeers.getStatusOfPeer(this.peerId);
}
/**
@@ -835,10 +750,12 @@ public class ReplicationSource extends Thread
return false;
}
+ @Override
public void startup() {
String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler =
new Thread.UncaughtExceptionHandler() {
+ @Override
public void uncaughtException(final Thread t, final Throwable e) {
LOG.error("Unexpected exception in ReplicationSource," +
" currentPath=" + currentPath, e);
@@ -849,11 +766,17 @@ public class ReplicationSource extends Thread
this.peerClusterZnode, handler);
}
+ @Override
public void terminate(String reason) {
terminate(reason, null);
}
+ @Override
public void terminate(String reason, Exception cause) {
+ terminate(reason, cause, true);
+ }
+
+ public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
LOG.info("Closing source "
+ this.peerClusterZnode + " because: " + reason);
@@ -864,17 +787,33 @@ public class ReplicationSource extends Thread
}
this.running = false;
this.interrupt();
- Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier);
+ ListenableFuture future = null;
+ if (this.replicationEndpoint != null) {
+ future = this.replicationEndpoint.stop();
+ }
+ if (join) {
+ Threads.shutdown(this, this.sleepForRetries);
+ if (future != null) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ LOG.warn("Got exception:" + e);
+ }
+ }
+ }
}
+ @Override
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
+ @Override
public String getPeerClusterId() {
return this.peerId;
}
+ @Override
public Path getCurrentPath() {
return this.currentPath;
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index df599f0..6388d9b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -50,7 +51,8 @@ public interface ReplicationSourceInterface {
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
- final String peerClusterZnode, final UUID clusterId) throws IOException;
+ final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
+ final MetricsSource metrics) throws IOException;
/**
* Add a log to the list of logs to replicate
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 7b4cd83..a2de78f 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -43,9 +43,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
@@ -115,7 +119,7 @@ public class ReplicationSourceManager implements ReplicationListener {
final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
final Path oldLogDir, final UUID clusterId) {
//CopyOnWriteArrayList is thread-safe.
- //Generally, reading is more than modifying.
+ //Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList();
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
@@ -194,7 +198,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* old region server hlog queues
*/
protected void init() throws IOException, ReplicationException {
- for (String id : this.replicationPeers.getConnectedPeers()) {
+ for (String id : this.replicationPeers.getPeerIds()) {
addSource(id);
}
List currentReplicators = this.replicationQueues.getListOfReplicators();
@@ -221,9 +225,12 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
protected ReplicationSourceInterface addSource(String id) throws IOException,
ReplicationException {
+ ReplicationPeerConfig peerConfig
+ = replicationPeers.getReplicationPeerConfig(id);
+ ReplicationPeer peer = replicationPeers.getPeer(id);
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
- this.replicationPeers, stopper, id, this.clusterId);
+ this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer);
synchronized (this.hlogsById) {
this.sources.add(src);
this.hlogsById.put(id, new TreeSet());
@@ -254,7 +261,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void deleteSource(String peerId, boolean closeConnection) {
this.replicationQueues.removeQueue(peerId);
if (closeConnection) {
- this.replicationPeers.disconnectFromPeer(peerId);
+ this.replicationPeers.peerRemoved(peerId);
}
}
@@ -340,7 +347,9 @@ public class ReplicationSourceManager implements ReplicationListener {
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
final FileSystem fs, final ReplicationSourceManager manager,
final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
- final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
+ final Stoppable stopper, final String peerId, final UUID clusterId,
+ final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
+ throws IOException {
ReplicationSourceInterface src;
try {
@SuppressWarnings("rawtypes")
@@ -351,9 +360,32 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.warn("Passed replication source implementation throws errors, " +
"defaulting to ReplicationSource", e);
src = new ReplicationSource();
+ }
+ ReplicationEndpoint replicationEndpoint = null;
+ try {
+ String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
+ if (replicationEndpointImpl == null) {
+ // Default to HBase inter-cluster replicator
+ replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
+ }
+ @SuppressWarnings("rawtypes")
+ Class c = Class.forName(replicationEndpointImpl);
+ replicationEndpoint = (ReplicationEndpoint) c.newInstance();
+ } catch (Exception e) {
+ LOG.warn("Passed replication endpoint implementation throws errors", e);
+ throw new IOException(e);
}
- src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
+
+ MetricsSource metrics = new MetricsSource(peerId);
+ // init replication source
+ src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId,
+ clusterId, replicationEndpoint, metrics);
+
+ // init replication endpoint
+ replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
+ fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
+
return src;
}
@@ -441,7 +473,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void peerListChanged(List peerIds) {
for (String id : peerIds) {
try {
- boolean added = this.replicationPeers.connectToPeer(id);
+ boolean added = this.replicationPeers.peerAdded(id);
if (added) {
addSource(id);
}
@@ -507,10 +539,25 @@ public class ReplicationSourceManager implements ReplicationListener {
for (Map.Entry> entry : newQueues.entrySet()) {
String peerId = entry.getKey();
try {
+ // there is not an actual peer defined corresponding to peerId for the failover.
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+ String actualPeerId = replicationQueueInfo.getPeerId();
+ ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
+ ReplicationPeerConfig peerConfig = null;
+ try {
+ peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
+ } catch (ReplicationException ex) {
+ LOG.warn("Received exception while getting replication peer config, skipping replay" + ex);
+ }
+ if (peer == null || peerConfig == null) {
+ LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
+ continue;
+ }
+
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
- stopper, peerId, this.clusterId);
- if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) {
+ stopper, peerId, this.clusterId, peerConfig, peer);
+ if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index fa3dda6..b1b18b5 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
@@ -40,7 +41,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
- UUID clusterId) throws IOException {
+ UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
index ee102fc..7b7ddb0 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
@@ -175,30 +175,30 @@ public class TestPerTableCFReplication {
Map> tabCFsMap = null;
// 1. null or empty string, result should be null
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(null);
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(null);
assertEquals(null, tabCFsMap);
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("");
assertEquals(null, tabCFsMap);
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(" ");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(" ");
assertEquals(null, tabCFsMap);
// 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey("tab1")); // its table name is "tab1"
assertFalse(tabCFsMap.containsKey("tab2")); // not other table
assertEquals(null, tabCFsMap.get("tab1")); // null cf-list,
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab2:cf1");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab2:cf1");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey("tab2")); // its table name is "tab2"
assertFalse(tabCFsMap.containsKey("tab1")); // not other table
assertEquals(1, tabCFsMap.get("tab2").size()); // cf-list contains only 1 cf
assertEquals("cf1", tabCFsMap.get("tab2").get(0));// the only cf is "cf1"
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab3 : cf1 , cf3");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab3 : cf1 , cf3");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey("tab3")); // its table name is "tab2"
assertFalse(tabCFsMap.containsKey("tab1")); // not other table
@@ -207,7 +207,7 @@ public class TestPerTableCFReplication {
assertTrue(tabCFsMap.get("tab3").contains("cf3"));// contains "cf3"
// 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
// 3.1 contains 3 tables : "tab1", "tab2" and "tab3"
assertEquals(3, tabCFsMap.size());
assertTrue(tabCFsMap.containsKey("tab1"));
@@ -225,7 +225,7 @@ public class TestPerTableCFReplication {
// 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
// still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
// 4.1 contains 3 tables : "tab1", "tab2" and "tab3"
assertEquals(3, tabCFsMap.size());
assertTrue(tabCFsMap.containsKey("tab1"));
@@ -243,7 +243,7 @@ public class TestPerTableCFReplication {
// 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"
// "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
// 5.1 no "tab1" and "tab2", only "tab3"
assertEquals(1, tabCFsMap.size()); // only one table
assertFalse(tabCFsMap.containsKey("tab1"));
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index fd003ad..e560620 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -179,50 +179,48 @@ public abstract class TestReplicationStateBasic {
} catch (IllegalArgumentException e) {
}
try {
- rp.getStatusOfConnectedPeer("bogus");
+ rp.getStatusOfPeer("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
- assertFalse(rp.connectToPeer("bogus"));
- rp.disconnectFromPeer("bogus");
- assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size());
- assertNull(rp.getPeerUUID("bogus"));
+ assertFalse(rp.peerAdded("bogus"));
+ rp.peerRemoved("bogus");
+
assertNull(rp.getPeerConf("bogus"));
- assertNumberOfPeers(0, 0);
+ assertNumberOfPeers(0);
// Add some peers
- rp.addPeer(ID_ONE, KEY_ONE);
- assertNumberOfPeers(0, 1);
- rp.addPeer(ID_TWO, KEY_TWO);
- assertNumberOfPeers(0, 2);
+ rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+ assertNumberOfPeers(1);
+ rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
+ assertNumberOfPeers(2);
// Test methods with a peer that is added but not connected
try {
- rp.getStatusOfConnectedPeer(ID_ONE);
+ rp.getStatusOfPeer(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
- assertNull(rp.getPeerUUID(ID_ONE));
- assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE)));
- rp.disconnectFromPeer(ID_ONE);
- assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
-
- // Connect to one peer
- rp.connectToPeer(ID_ONE);
- assertNumberOfPeers(1, 2);
- assertTrue(rp.getStatusOfConnectedPeer(ID_ONE));
+ assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
+ rp.removePeer(ID_ONE);
+ rp.peerRemoved(ID_ONE);
+ assertNumberOfPeers(1);
+
+ // Add one peer
+ rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+ rp.peerAdded(ID_ONE);
+ assertNumberOfPeers(2);
+ assertTrue(rp.getStatusOfPeer(ID_ONE));
rp.disablePeer(ID_ONE);
assertConnectedPeerStatus(false, ID_ONE);
rp.enablePeer(ID_ONE);
assertConnectedPeerStatus(true, ID_ONE);
- assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
- assertNotNull(rp.getPeerUUID(ID_ONE).toString());
// Disconnect peer
- rp.disconnectFromPeer(ID_ONE);
- assertNumberOfPeers(0, 2);
+ rp.peerRemoved(ID_ONE);
+ assertNumberOfPeers(2);
try {
- rp.getStatusOfConnectedPeer(ID_ONE);
+ rp.getStatusOfPeer(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
@@ -234,7 +232,7 @@ public abstract class TestReplicationStateBasic {
fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
}
while (true) {
- if (status == rp.getStatusOfConnectedPeer(peerId)) {
+ if (status == rp.getStatusOfPeer(peerId)) {
return;
}
if (zkTimeoutCount < ZK_MAX_COUNT) {
@@ -247,9 +245,9 @@ public abstract class TestReplicationStateBasic {
}
}
- protected void assertNumberOfPeers(int connected, int total) {
- assertEquals(total, rp.getAllPeerClusterKeys().size());
- assertEquals(connected, rp.getConnectedPeers().size());
+ protected void assertNumberOfPeers(int total) {
+ assertEquals(total, rp.getAllPeerConfigs().size());
+ assertEquals(total, rp.getAllPeerIds().size());
assertEquals(total, rp.getAllPeerIds().size());
}
@@ -269,7 +267,7 @@ public abstract class TestReplicationStateBasic {
rq3.addLog("qId" + i, "filename" + j);
}
//Add peers for the corresponding queues so they are not orphans
- rp.addPeer("qId" + i, "bogus" + i);
+ rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i), null);
}
}
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index e6a26e7..f393b0f 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -144,7 +144,7 @@ public class TestReplicationTrackerZKImpl {
@Ignore ("Flakey") @Test(timeout = 30000)
public void testPeerRemovedEvent() throws Exception {
- rp.addPeer("5", utility.getClusterKey());
+ rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
rt.registerListener(new DummyReplicationListener());
rp.removePeer("5");
// wait for event
@@ -157,7 +157,7 @@ public class TestReplicationTrackerZKImpl {
@Ignore ("Flakey") @Test(timeout = 30000)
public void testPeerListChangedEvent() throws Exception {
// add a peer
- rp.addPeer("5", utility.getClusterKey());
+ rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
rt.registerListener(new DummyReplicationListener());
rp.disablePeer("5");
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index 296f953..9175192 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.junit.Before;
@@ -42,13 +43,15 @@ public class TestReplicationSinkManager {
private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
private ReplicationPeers replicationPeers;
+ private HBaseReplicationEndpoint replicationEndpoint;
private ReplicationSinkManager sinkManager;
@Before
public void setUp() {
replicationPeers = mock(ReplicationPeers.class);
+ replicationEndpoint = mock(HBaseReplicationEndpoint.class);
sinkManager = new ReplicationSinkManager(mock(HConnection.class),
- PEER_CLUSTER_ID, replicationPeers, new Configuration());
+ PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
}
@Test
@@ -58,7 +61,7 @@ public class TestReplicationSinkManager {
serverNames.add(mock(ServerName.class));
}
- when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);
sinkManager.chooseSinks();
@@ -72,7 +75,7 @@ public class TestReplicationSinkManager {
List serverNames = Lists.newArrayList(mock(ServerName.class),
mock(ServerName.class));
- when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);
sinkManager.chooseSinks();
@@ -84,8 +87,8 @@ public class TestReplicationSinkManager {
public void testReportBadSink() {
ServerName serverNameA = mock(ServerName.class);
ServerName serverNameB = mock(ServerName.class);
- when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(
- Lists.newArrayList(serverNameA, serverNameB));
+ when(replicationEndpoint.getRegionServers())
+ .thenReturn(Lists.newArrayList(serverNameA, serverNameB));
sinkManager.chooseSinks();
// Sanity check
@@ -110,7 +113,7 @@ public class TestReplicationSinkManager {
for (int i = 0; i < 20; i++) {
serverNames.add(mock(ServerName.class));
}
- when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);
@@ -137,7 +140,7 @@ public class TestReplicationSinkManager {
for (int i = 0; i < 20; i++) {
serverNames.add(mock(ServerName.class));
}
- when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);