diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 32f07cb..1b22dab 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1116,6 +1116,12 @@ public final class HConstants {
/** Configuration key for setting replication codec class name */
public static final String REPLICATION_CODEC_CONF_KEY = "hbase.replication.rpc.codec";
+ /** Maximum number of threads used by the replication source for shipping edits to the sinks */
+ public static final String REPLICATION_SOURCE_MAXTHREADS_KEY =
+ "hbase.replication.source.maxthreads";
+
+ public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;
+
/** Config for pluggable consensus provider */
public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
"hbase.coordinated.state.manager.class";
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 5d5bb10..8f5cccc 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1546,6 +1546,17 @@ possible configurations would overwhelm and obscure the important.
using KeyValueCodecWithTags for replication when there are no tags causes no harm.
+
+ hbase.replication.source.maxthreads
+ 10
+
+ The maximum number of threads any replication source will use for
+ shipping edits to the sinks in parallel. This also limits the number of
+ chunks each replication batch is broken into.
+ Larger values can improve the replication throughput between the master and
+ slave clusters. The default of 10 will rarely need to be changed.
+
+
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index bf31a7d..040351f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -21,7 +21,15 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +42,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
@@ -71,6 +80,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// Handles connecting to peer region servers
private ReplicationSinkManager replicationSinkMgr;
private boolean peersSelected = false;
+ private ThreadPoolExecutor exec;
+ private int maxThreads;
@Override
public void init(Context context) throws IOException {
@@ -89,6 +100,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
+ // per sink thread pool
+ this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
+ HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
+ this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS,
+ new SynchronousQueue());
}
private void decorateConf() {
@@ -139,32 +155,71 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
public boolean replicate(ReplicateContext replicateContext) {
List entries = replicateContext.getEntries();
int sleepMultiplier = 1;
- while (this.isRunning()) {
- if (!peersSelected) {
- connectToPeers();
- peersSelected = true;
- }
+ if (!peersSelected && this.isRunning()) {
+ connectToPeers();
+ peersSelected = true;
+ }
+
+ // minimum of: configured threads, number of 100-waledit batches,
+ // and number of current sinks
+ int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1),
+ replicationSinkMgr.getSinks().size());
+ List> entryLists = new ArrayList>(n);
+ if (n == 1) {
+ entryLists.add(entries);
+ } else {
+ for (int i=0; i(entries.size()/n+1));
+ }
+ // now group by region
+ for (Entry e : entries) {
+ entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
+ }
+ }
+ while (this.isRunning()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
- SinkPeer sinkPeer = null;
try {
- sinkPeer = replicationSinkMgr.getReplicationSink();
- BlockingInterface rrs = sinkPeer.getRegionServer();
if (LOG.isTraceEnabled()) {
LOG.trace("Replicating " + entries.size() +
" entries of total size " + replicateContext.getSize());
}
- ReplicationProtbufUtil.replicateWALEntry(rrs,
- entries.toArray(new Entry[entries.size()]));
+ List> futures = new ArrayList>(entryLists.size());
+ for (int i=0; i f : futures) {
+ try {
+ // wait for all futures, remove successful parts
+ // (only the remaining parts will be retried)
+ entryLists.remove(f.get());
+ } catch (InterruptedException ie) {
+ iox = new IOException(ie);
+ } catch (ExecutionException ee) {
+ // cause must be an IOException
+ iox = (IOException)ee.getCause();
+ }
+ }
+ if (iox != null) {
+ // if we had any exceptions, try again
+ throw iox;
+ }
// update metrics
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
- replicationSinkMgr.reportSinkSuccess(sinkPeer);
return true;
} catch (IOException ioe) {
@@ -195,10 +250,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
LOG.warn("Can't replicate because of a local or network error: ", ioe);
}
}
-
- if (sinkPeer != null) {
- replicationSinkMgr.reportBadSink(sinkPeer);
- }
if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
sleepMultiplier++;
}
@@ -222,6 +273,43 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
LOG.warn("Failed to close the connection");
}
}
+ exec.shutdownNow();
notifyStopped();
}
+
+ // is this needed? Nobody else will call doStop() otherwise
+ @Override
+ public State stopAndWait() {
+ doStop();
+ return super.stopAndWait();
+ }
+
+ private class Replicator implements Callable {
+ private List entries;
+ private int ordinal;
+ public Replicator(List entries, int ordinal) {
+ this.entries = entries;
+ this.ordinal = ordinal;
+ }
+
+ @Override
+ public Integer call() throws IOException {
+ SinkPeer sinkPeer = null;
+ try {
+ sinkPeer = replicationSinkMgr.getReplicationSink();
+ BlockingInterface rrs = sinkPeer.getRegionServer();
+ ReplicationProtbufUtil.replicateWALEntry(rrs,
+ entries.toArray(new Entry[entries.size()]));
+ replicationSinkMgr.reportSinkSuccess(sinkPeer);
+ return ordinal;
+
+ } catch (IOException ioe) {
+ if (sinkPeer != null) {
+ replicationSinkMgr.reportBadSink(sinkPeer);
+ }
+ throw ioe;
+ }
+ }
+
+ }
}