From 7b510a3eb72fc32fa4476e27ec088c8b62249545 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 22 May 2017 14:51:03 -0700 Subject: [PATCH] HBASE-18027 Replication should respect RPC size limits when batching edits --- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 4 ++-- .../hbase/replication/ReplicationEndpoint.java | 10 +++++++++ .../HBaseInterClusterReplicationEndpoint.java | 19 ++++++++-------- .../regionserver/ReplicationSinkManager.java | 9 ++++---- .../ReplicationSourceWALReaderThread.java | 26 ++++++++++++++++++---- .../replication/regionserver/WALEntryStream.java | 13 +++++++++++ .../regionserver/TestReplicationSinkManager.java | 6 ++--- 7 files changed, 64 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index d68a05e123..bbaf10d5e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -168,7 +168,7 @@ public abstract class RpcServer implements RpcServerInterface, protected HBaseRPCErrorHandler errorHandler = null; - protected static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; + public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION = new RequestTooBigException(); @@ -183,7 +183,7 @@ public abstract class RpcServer implements RpcServerInterface, protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; /** Default value for above params */ - protected static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M + public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 69db31c15e..7962eff12e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -165,6 +165,16 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe public String getWalGroupId(){ return walGroupId; } + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("ReplicateContext for group "); + sb.append(walGroupId); + sb.append(" with "); + sb.append(entries.size()); + sb.append(" entries of size "); + sb.append(size); + return sb.toString(); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 97f28b4655..1fe0d189cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.ipc.RemoteException; -import javax.security.sasl.SaslException; /** * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} @@ -224,6 +223,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e); } } + + if (LOG.isDebugEnabled()) { + LOG.debug("Replicating " + replicateContext + " in " + n + " batches to peers " + + StringUtils.join(replicationSinkMgr.getSinks(), ",")); + } + while (this.isRunning() && !exec.isShutdown()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { @@ -232,17 +237,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi continue; } try { - if (LOG.isTraceEnabled()) { - LOG.trace("Replicating " + entries.size() + - " entries of total size " + replicateContext.getSize()); - } - int futures = 0; for (int i=0; i getSinksForTesting() { - return Collections.unmodifiableList(sinks); + public synchronized List getSinks() { + return sinks != null ? Collections.unmodifiableList(sinks) : Collections.emptyList(); } /** @@ -197,5 +197,4 @@ public class ReplicationSinkManager { } } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index 29808e9757..1c948bbd73 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; @@ -104,6 +105,15 @@ public class ReplicationSourceWALReaderThread extends Thread { this.filter = filter; this.replicationBatchSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); + final long maxRpcSize = + this.conf.getLong(RpcServer.MAX_REQUEST_SIZE, RpcServer.DEFAULT_MAX_REQUEST_SIZE); + if (replicationBatchSizeCapacity > maxRpcSize) { + LOG.warn("The replication batch size limit of " + replicationBatchSizeCapacity + + " bytes exceeds the RPC request size limit of " + maxRpcSize + " bytes." + + " Consider adjusting replication.source.size.capacity or " + + RpcServer.MAX_REQUEST_SIZE + ". Using the request size limit as batch size."); + replicationBatchSizeCapacity = maxRpcSize; + } this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue @@ -149,14 +159,18 @@ public class ReplicationSourceWALReaderThread extends Thread { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { long entrySize = getEntrySize(entry); - batch.addEntry(entry); - updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); boolean totalBufferTooLarge = acquireBufferQuota(entrySize); // Stop if too many entries or too big - if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity - || batch.getNbEntries() >= replicationBatchCountCapacity) { + if (batch.getNbEntries() > 0 && // Must replicate at least one entry + (totalBufferTooLarge || + ((batch.getHeapSize() + entrySize) >= replicationBatchSizeCapacity + || (batch.getNbEntries() + 1) >= replicationBatchCountCapacity))) { + releaseBufferQuota(entrySize); // Did not add entry, undo acquireBufferQuota + entryStream.putBack(entry); break; } + batch.addEntry(entry); + updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); } } } @@ -345,6 +359,10 @@ public class ReplicationSourceWALReaderThread extends Thread { return totalBufferUsed.addAndGet(size) >= totalBufferQuota; } + private void releaseBufferQuota(long size) { + totalBufferUsed.addAndGet(-size); + } + /** * @return whether the reader thread is running */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index c4d552c17d..f7a194c2eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -125,6 +125,19 @@ public class WALEntryStream implements Iterator, Closeable, Iterable sinkList = sinkManager.getSinksForTesting(); + List sinkList = sinkManager.getSinks(); assertEquals(2, sinkList.size()); ServerName serverNameA = sinkList.get(0); -- 2.12.0