From 8377767c2d9c4d12a195aef17817575e6f9e98ff Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Thu, 15 Dec 2016 17:43:28 +0800 Subject: [PATCH] HBASE-17314 Limit total buffered size for all replication sources --- .../java/org/apache/hadoop/hbase/HConstants.java | 4 ++ .../regionserver/ReplicationSource.java | 50 ++++++++++++++++++---- .../regionserver/ReplicationSourceInterface.java | 3 +- .../regionserver/ReplicationSourceManager.java | 5 ++- .../hbase/replication/ReplicationSourceDummy.java | 4 +- .../regionserver/TestReplicationSourceManager.java | 4 +- 6 files changed, 58 insertions(+), 12 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 48d9778..dc96c2a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -932,6 +932,10 @@ public final class HConstants { public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000; + public static final String REPLICATION_SOURCE_TOTAL_BUFFER_KEY = "replication.total.buffer.quota"; + public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024; + + /** * Directory where the source cluster file system client configuration are placed which is used by * sink cluster to copy HFiles from source cluster file system diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 388efbf..3f9722d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -38,6 +38,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; @@ -147,6 +148,9 @@ public class ReplicationSource extends Thread private ConcurrentHashMap workerThreads = new ConcurrentHashMap(); + private AtomicInteger bufferUsed; + private int quotaPermits; + /** * Instantiation method used by region servers * @@ -165,7 +169,7 @@ public class ReplicationSource extends Thread final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, - final MetricsSource metrics) + final MetricsSource metrics, final AtomicInteger bufferUsed) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); @@ -196,6 +200,9 @@ public class ReplicationSource extends Thread this.actualPeerId = replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; + this.bufferUsed = bufferUsed; + this.quotaPermits = conf.getInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, + HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); } private void decorateConf() { @@ -515,7 +522,7 @@ public class ReplicationSource extends Thread private boolean workerRunning = true; // Current number of hfiles that we need to replicate private long currentNbHFiles = 0; - + List entries; // Use guava cache to set ttl for each key private LoadingCache canSkipWaitingSet = CacheBuilder.newBuilder() .expireAfterAccess(1, TimeUnit.DAYS).build( @@ -535,6 +542,7 @@ public class ReplicationSource extends Thread this.replicationQueueInfo = replicationQueueInfo; this.repLogReader = new ReplicationWALReaderManager(fs, conf); this.source = source; + this.entries = new ArrayList<>(); } @Override @@ -607,8 +615,7 @@ public class ReplicationSource extends Thread boolean gotIOE = false; currentNbOperations = 0; currentNbHFiles = 0; - List entries = new ArrayList(1); - + entries.clear(); Map lastPositionsForSerialScope = new HashMap<>(); currentSize = 0; try { @@ -700,6 +707,7 @@ public class ReplicationSource extends Thread continue; } shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope); + releaseBufferQuota(); } if (replicationQueueInfo.isQueueRecovered()) { // use synchronize to make sure one last thread will clean the queue @@ -789,7 +797,7 @@ public class ReplicationSource extends Thread } } } - + boolean totalBufferTooLarge = false; // don't replicate if the log entries have already been consumed by the cluster if (replicationEndpoint.canReplicateToSameCluster() || !entry.getKey().getClusterIds().contains(peerClusterId)) { @@ -807,15 +815,16 @@ public class ReplicationSource extends Thread logKey.addClusterId(clusterId); currentNbOperations += countDistinctRowKeys(edit); entries.add(entry); - currentSize += entry.getEdit().heapSize(); - currentSize += calculateTotalSizeOfStoreFiles(edit); + int delta = (int)entry.getEdit().heapSize() + calculateTotalSizeOfStoreFiles(edit); + currentSize += delta; + totalBufferTooLarge = acquireBufferQuota(delta); } else { metrics.incrLogEditsFiltered(); } } // Stop if too many entries or too big // FIXME check the relationship between single wal group and overall - if (currentSize >= replicationQueueSizeCapacity + if (totalBufferTooLarge || currentSize >= replicationQueueSizeCapacity || entries.size() >= replicationQueueNbCapacity) { break; } @@ -1285,5 +1294,30 @@ public class ReplicationSource extends Thread public void setWorkerRunning(boolean workerRunning) { this.workerRunning = workerRunning; } + + /** + * + * @param size delta size for grown buffer + * @return true if we should clear buffer and push all + */ + private boolean acquireBufferQuota(int size) { + while (true) { + int now = bufferUsed.get(); + if (bufferUsed.compareAndSet(now, now + size)) { + return now + size >= quotaPermits; + } + } + } + + private void releaseBufferQuota() { + while (true) { + int now = bufferUsed.get(); + if (bufferUsed.compareAndSet(now, now - currentSize)) { + currentSize = 0; + entries.clear(); + return; + } + } + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 7f4a9f7..cc17bc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -55,7 +56,7 @@ public interface ReplicationSourceInterface { final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, - final MetricsSource metrics) throws IOException; + final MetricsSource metrics, final AtomicInteger bufferUsed) throws IOException; /** * Add a log to the list of logs to replicate diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index fa6f894..bc062cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -42,6 +42,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -126,6 +127,8 @@ public class ReplicationSourceManager implements ReplicationListener { private Connection connection; private long replicationWaitTime; + private AtomicInteger totalBufferUsed = new AtomicInteger(); + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues @@ -496,7 +499,7 @@ public class ReplicationSourceManager implements ReplicationListener { MetricsSource metrics = new MetricsSource(peerId); // init replication source src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, - clusterId, replicationEndpoint, metrics); + clusterId, replicationEndpoint, metrics, totalBufferUsed); // init replication endpoint replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index abe484e..52a723c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -43,7 +44,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, - UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) + UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metric, + AtomicInteger bufferUsed) throws IOException { this.manager = manager; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 9d1d165..7e5140e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -39,6 +39,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -582,7 +583,8 @@ public abstract class TestReplicationSourceManager { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, - UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) + UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics, + AtomicInteger totalBufferUsed) throws IOException { throw new IOException("Failing deliberately"); } -- 2.10.1 (Apple Git-78)