diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 32f07cb..1b22dab 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1116,6 +1116,12 @@ public final class HConstants { /** Configuration key for setting replication codec class name */ public static final String REPLICATION_CODEC_CONF_KEY = "hbase.replication.rpc.codec"; + /** Maximum number of threads used by the replication source for shipping edits to the sinks */ + public static final String REPLICATION_SOURCE_MAXTHREADS_KEY = + "hbase.replication.source.maxthreads"; + + public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10; + /** Config for pluggable consensus provider */ public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS = "hbase.coordinated.state.manager.class"; diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 5d5bb10..8f5cccc 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1546,6 +1546,17 @@ possible configurations would overwhelm and obscure the important. using KeyValueCodecWithTags for replication when there are no tags causes no harm. + + hbase.replication.source.maxthreads + 10 + + The maximum number of threads any replication source will use for + shipping edits to the sinks in parallel. This also limits the number of + chunks each replication batch is broken into. + Larger values can improve the replication throughput between the master and + slave clusters. The default of 10 will rarely need to be changed. + + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index bf31a7d..040351f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -21,7 +21,15 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.net.ConnectException; import java.net.SocketTimeoutException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +42,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; @@ -71,6 +80,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // Handles connecting to peer region servers private ReplicationSinkManager replicationSinkMgr; private boolean peersSelected = false; + private ThreadPoolExecutor exec; + private int maxThreads; @Override public void init(Context context) throws IOException { @@ -89,6 +100,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.metrics = context.getMetrics(); // ReplicationQueueInfo parses the peerId out of the znode for us this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf); + // per sink thread pool + this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, + HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); + this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, + new SynchronousQueue()); } private void decorateConf() { @@ -139,32 +155,71 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi public boolean replicate(ReplicateContext replicateContext) { List entries = replicateContext.getEntries(); int sleepMultiplier = 1; - while (this.isRunning()) { - if (!peersSelected) { - connectToPeers(); - peersSelected = true; - } + if (!peersSelected && this.isRunning()) { + connectToPeers(); + peersSelected = true; + } + + // minimum of: configured threads, number of 100-waledit batches, + // and number of current sinks + int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), + replicationSinkMgr.getSinks().size()); + List> entryLists = new ArrayList>(n); + if (n == 1) { + entryLists.add(entries); + } else { + for (int i=0; i(entries.size()/n+1)); + } + // now group by region + for (Entry e : entries) { + entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e); + } + } + while (this.isRunning()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; } continue; } - SinkPeer sinkPeer = null; try { - sinkPeer = replicationSinkMgr.getReplicationSink(); - BlockingInterface rrs = sinkPeer.getRegionServer(); if (LOG.isTraceEnabled()) { LOG.trace("Replicating " + entries.size() + " entries of total size " + replicateContext.getSize()); } - ReplicationProtbufUtil.replicateWALEntry(rrs, - entries.toArray(new Entry[entries.size()])); + List> futures = new ArrayList>(entryLists.size()); + for (int i=0; i f : futures) { + try { + // wait for all futures, remove successful parts + // (only the remaining parts will be retried) + entryLists.remove(f.get()); + } catch (InterruptedException ie) { + iox = new IOException(ie); + } catch (ExecutionException ee) { + // cause must be an IOException + iox = (IOException)ee.getCause(); + } + } + if (iox != null) { + // if we had any exceptions, try again + throw iox; + } // update metrics this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); - replicationSinkMgr.reportSinkSuccess(sinkPeer); return true; } catch (IOException ioe) { @@ -195,10 +250,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi LOG.warn("Can't replicate because of a local or network error: ", ioe); } } - - if (sinkPeer != null) { - replicationSinkMgr.reportBadSink(sinkPeer); - } if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { sleepMultiplier++; } @@ -222,6 +273,43 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi LOG.warn("Failed to close the connection"); } } + exec.shutdownNow(); notifyStopped(); } + + // is this needed? Nobody else will call doStop() otherwise + @Override + public State stopAndWait() { + doStop(); + return super.stopAndWait(); + } + + private class Replicator implements Callable { + private List entries; + private int ordinal; + public Replicator(List entries, int ordinal) { + this.entries = entries; + this.ordinal = ordinal; + } + + @Override + public Integer call() throws IOException { + SinkPeer sinkPeer = null; + try { + sinkPeer = replicationSinkMgr.getReplicationSink(); + BlockingInterface rrs = sinkPeer.getRegionServer(); + ReplicationProtbufUtil.replicateWALEntry(rrs, + entries.toArray(new Entry[entries.size()])); + replicationSinkMgr.reportSinkSuccess(sinkPeer); + return ordinal; + + } catch (IOException ioe) { + if (sinkPeer != null) { + replicationSinkMgr.reportBadSink(sinkPeer); + } + throw ioe; + } + } + + } }