From 77f766f041d5d4c747fcf65cd3cfe6962d6207a3 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 15 Jun 2015 12:04:30 -0700 Subject: [PATCH] HBASE-12988 [Replication] Parallel apply edits on row-level (Lars Hofhansl) --- .../HBaseInterClusterReplicationEndpoint.java | 105 +++++++++++++++++---- 1 file changed, 89 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 99a6714..1ca7652 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -21,7 +21,15 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.net.ConnectException; import java.net.SocketTimeoutException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +47,7 @@ import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.ipc.RemoteException; /** @@ -70,6 +79,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // Handles connecting to peer region servers private ReplicationSinkManager replicationSinkMgr; private boolean peersSelected = false; + private ThreadPoolExecutor exec; + private int maxThreads; @Override public void init(Context context) throws IOException { @@ -88,6 +99,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.metrics = context.getMetrics(); // ReplicationQueueInfo parses the peerId out of the znode for us this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf); + // per sink thread pool + this.maxThreads = this.conf.getInt("replication.source.maxthreads", 5); + this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, + new SynchronousQueue()); } private void decorateConf() { @@ -138,32 +153,57 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi public boolean replicate(ReplicateContext replicateContext) { List entries = replicateContext.getEntries(); int sleepMultiplier = 1; - while (this.isRunning()) { - if (!peersSelected) { - connectToPeers(); - peersSelected = true; - } + if (!peersSelected && this.isRunning()) { + connectToPeers(); + peersSelected = true; + } + + int n = Math.min(this.maxThreads, replicationSinkMgr.getSinks().size()/100+1); + List> entryLists = new ArrayList>(n); + for (int i=0; i(entries.size()/n+1)); + } + // now group by region + for (HLog.Entry e : entries) { + entryLists.get(Bytes.hashCode(e.getKey().getEncodedRegionName())%n).add(e); + } + while (this.isRunning()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; } continue; } - SinkPeer sinkPeer = null; try { - sinkPeer = replicationSinkMgr.getReplicationSink(); - BlockingInterface rrs = sinkPeer.getRegionServer(); if (LOG.isTraceEnabled()) { LOG.trace("Replicating " + entries.size() + " entries of total size " + replicateContext.getSize()); } - ReplicationProtbufUtil.replicateWALEntry(rrs, - entries.toArray(new HLog.Entry[entries.size()])); - + List> futures = new ArrayList>(entryLists.size()); + for (int i=0; i f : futures) { + try { + // wait for all futures remove successful parts + entryLists.remove(f.get()); + } catch (InterruptedException ie) { + iox = new IOException(ie); + } catch (ExecutionException ee) { + // cause must be an IOException + iox = (IOException)ee.getCause(); + } + } + if (iox != null) { + // if we had any exception, try again + throw iox; + } // update metrics this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); - replicationSinkMgr.reportSinkSuccess(sinkPeer); return true; } catch (IOException ioe) { @@ -194,10 +234,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi LOG.warn("Can't replicate because of a local or network error: ", ioe); } } - - if (sinkPeer != null) { - replicationSinkMgr.reportBadSink(sinkPeer); - } if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { sleepMultiplier++; } @@ -221,6 +257,43 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi LOG.warn("Failed to close the connection"); } } + exec.shutdownNow(); notifyStopped(); } + + // is this needed? Nobody else will call doStop() otherwise + @Override + public State stopAndWait() { + doStop(); + return super.stopAndWait(); + } + + private class Replicator implements Callable { + private List entries; + private int ordinal; + public Replicator(List entries, int ordinal) { + this.entries = entries; + this.ordinal = ordinal; + } + + @Override + public Integer call() throws IOException { + SinkPeer sinkPeer = null; + try { + sinkPeer = replicationSinkMgr.getReplicationSink(); + BlockingInterface rrs = sinkPeer.getRegionServer(); + ReplicationProtbufUtil.replicateWALEntry(rrs, + entries.toArray(new HLog.Entry[entries.size()])); + replicationSinkMgr.reportSinkSuccess(sinkPeer); + return ordinal; + + } catch (IOException ioe) { + if (sinkPeer != null) { + replicationSinkMgr.reportBadSink(sinkPeer); + } + throw ioe; + } + } + + } } -- 2.2.2