diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index bf31a7d..801bf3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -21,7 +21,15 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.net.ConnectException; import java.net.SocketTimeoutException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +42,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; @@ -71,6 +80,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // Handles connecting to peer region servers private ReplicationSinkManager replicationSinkMgr; private boolean peersSelected = false; + private ThreadPoolExecutor exec; + private int maxThreads; @Override public void init(Context context) throws IOException { @@ -89,6 +100,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.metrics = context.getMetrics(); // ReplicationQueueInfo parses the peerId out of the znode for us this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf); + // per sink thread pool + this.maxThreads = this.conf.getInt("replication.source.maxthreads", 10); + this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, + new SynchronousQueue()); } private void decorateConf() { @@ -139,32 +154,61 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi public boolean replicate(ReplicateContext replicateContext) { List entries = replicateContext.getEntries(); int sleepMultiplier = 1; - while (this.isRunning()) { - if (!peersSelected) { - connectToPeers(); - peersSelected = true; - } + if (!peersSelected && this.isRunning()) { + connectToPeers(); + peersSelected = true; + } + + // minimum of configured threads, number of 100-waledit batches, + // and number of current sinks + int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), + replicationSinkMgr.getSinks().size()); + List> entryLists = new ArrayList>(n); + for (int i=0; i(entries.size()/n+1)); + } + // now group by region + for (Entry e : entries) { + entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e); + } + while (this.isRunning()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; } continue; } - SinkPeer sinkPeer = null; try { - sinkPeer = replicationSinkMgr.getReplicationSink(); - BlockingInterface rrs = sinkPeer.getRegionServer(); if (LOG.isTraceEnabled()) { LOG.trace("Replicating " + entries.size() + " entries of total size " + replicateContext.getSize()); } - ReplicationProtbufUtil.replicateWALEntry(rrs, - entries.toArray(new Entry[entries.size()])); + List> futures = new ArrayList>(entryLists.size()); + for (int i=0; i f : futures) { + try { + // wait for all futures remove successful parts + entryLists.remove(f.get()); + } catch (InterruptedException ie) { + iox = new IOException(ie); + } catch (ExecutionException ee) { + // cause must be an IOException + iox = (IOException)ee.getCause(); + } + } + if (iox != null) { + // if we had any exception, try again + throw iox; + } // update metrics this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); - replicationSinkMgr.reportSinkSuccess(sinkPeer); return true; } catch (IOException ioe) { @@ -195,10 +239,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi LOG.warn("Can't replicate because of a local or network error: ", ioe); } } - - if (sinkPeer != null) { - replicationSinkMgr.reportBadSink(sinkPeer); - } if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { sleepMultiplier++; } @@ -222,6 +262,43 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi LOG.warn("Failed to close the connection"); } } + exec.shutdownNow(); notifyStopped(); } + + // is this needed? Nobody else will call doStop() otherwise + @Override + public State stopAndWait() { + doStop(); + return super.stopAndWait(); + } + + private class Replicator implements Callable { + private List entries; + private int ordinal; + public Replicator(List entries, int ordinal) { + this.entries = entries; + this.ordinal = ordinal; + } + + @Override + public Integer call() throws IOException { + SinkPeer sinkPeer = null; + try { + sinkPeer = replicationSinkMgr.getReplicationSink(); + BlockingInterface rrs = sinkPeer.getRegionServer(); + ReplicationProtbufUtil.replicateWALEntry(rrs, + entries.toArray(new Entry[entries.size()])); + replicationSinkMgr.reportSinkSuccess(sinkPeer); + return ordinal; + + } catch (IOException ioe) { + if (sinkPeer != null) { + replicationSinkMgr.reportBadSink(sinkPeer); + } + throw ioe; + } + } + + } }