From ea8123e81cb4b0e2d89fb672b5bfe67557852ec0 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Wed, 29 Nov 2017 17:21:42 -0800 Subject: [PATCH] HBASE-19381 TestGlobalThrottler doesn't make progress Revert "HBASE-17314 Limit total buffered size for all replication sources" This reverts commit 29e390c80895af54206d6a14eac50ca2859cf2b7. Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java --- .../java/org/apache/hadoop/hbase/HConstants.java | 9 - .../hadoop/hbase/regionserver/HRegionServer.java | 1 - .../regionserver/ReplicationSource.java | 13 +- .../regionserver/ReplicationSourceManager.java | 8 - .../ReplicationSourceWALReaderThread.java | 34 +--- .../hbase/replication/TestReplicationSource.java | 12 +- .../regionserver/TestGlobalThrottler.java | 187 --------------------- .../regionserver/TestWALEntryStream.java | 3 - 8 files changed, 5 insertions(+), 262 deletions(-) delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 8242a170ab..2de16f7014 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -895,15 +895,6 @@ public final class HConstants { /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */ public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id"; /** - * Max total size of buffered entries in all replication peers. It will prevent server getting - * OOM if there are many peers. Default value is 256MB which is four times to default - * replication.source.size.capacity. - */ - public static final String REPLICATION_SOURCE_TOTAL_BUFFER_KEY = "replication.total.buffer.quota"; - public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024; - - - /** * Directory where the source cluster file system client configuration are placed which is used by * sink cluster to copy HFiles from source cluster file system */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4853b2b050..b156256937 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2291,7 +2291,6 @@ public class HRegionServer extends HasThread implements * @return Return the object that implements the replication * source service. */ - @VisibleForTesting public ReplicationSourceService getReplicationSourceService() { return replicationSourceHandler; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index add104397f..776814f276 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -72,10 +72,6 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.Service; - /** * Class that handles the source of a replication stream. * Currently does not handle more than 1 slave @@ -148,8 +144,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf FINISHED // The worker is done processing a recovered queue } - private AtomicLong totalBufferUsed; - /** * Instantiation method used by region servers * @@ -195,7 +189,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); - this.totalBufferUsed = manager.getTotalBufferUsed(); + LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId + ", currentBandwidth=" + this.currentBandwidth); } @@ -555,7 +549,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf try { WALEntryBatch entryBatch = entryReader.take(); shipEdits(entryBatch); - releaseBufferQuota((int) entryBatch.getHeapSize()); if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty() && entryBatch.getLastSeqIds().isEmpty()) { LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " @@ -916,9 +909,5 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf public WorkerState getWorkerState() { return state; } - - private void releaseBufferQuota(int size) { - totalBufferUsed.addAndGet(-size); - } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 469e6345e6..74dded777d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -41,7 +41,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -120,8 +119,6 @@ public class ReplicationSourceManager implements ReplicationListener { private final boolean replicationForBulkLoadDataEnabled; - private AtomicLong totalBufferUsed = new AtomicLong(); - /** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues @@ -453,11 +450,6 @@ public class ReplicationSourceManager implements ReplicationListener { } } - @VisibleForTesting - public AtomicLong getTotalBufferUsed() { - return totalBufferUsed; - } - /** * Factory method to create a replication source * @param conf the configuration to use diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index 306ba8f51b..872f91d08a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,17 +35,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -78,9 +74,6 @@ public class ReplicationSourceWALReaderThread extends Thread { private int maxRetriesMultiplier; private MetricsSource metrics; - private AtomicLong totalBufferUsed; - private long totalBufferQuota; - /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -109,9 +102,6 @@ public class ReplicationSourceWALReaderThread extends Thread { // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); - this.totalBufferUsed = manager.getTotalBufferUsed(); - this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, - HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = @@ -132,9 +122,6 @@ public class ReplicationSourceWALReaderThread extends Thread { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { - continue; - } WALEntryBatch batch = null; while (entryStream.hasNext()) { if (batch == null) { @@ -148,9 +135,8 @@ public class ReplicationSourceWALReaderThread extends Thread { long entrySize = getEntrySize(entry); batch.addEntry(entry); updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); - boolean totalBufferTooLarge = acquireBufferQuota(entrySize); // Stop if too many entries or too big - if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity + if (batch.getHeapSize() >= replicationBatchSizeCapacity || batch.getNbEntries() >= replicationBatchCountCapacity) { break; } @@ -223,16 +209,6 @@ public class ReplicationSourceWALReaderThread extends Thread { return logQueue.peek(); } - //returns false if we've already exceeded the global quota - private boolean checkQuota() { - // try not to go over total quota - if (totalBufferUsed.get() > totalBufferQuota) { - Threads.sleep(sleepForRetries); - return false; - } - return true; - } - private Entry filterEntry(Entry entry) { Entry filtered = filter.filter(entry); if (entry != null && filtered == null) { @@ -337,14 +313,6 @@ public class ReplicationSourceWALReaderThread extends Thread { } /** - * @param size delta size for grown buffer - * @return true if we should clear buffer and push all - */ - private boolean acquireBufferQuota(long size) { - return totalBufferUsed.addAndGet(size) >= totalBufferQuota; - } - - /** * @return whether the reader thread is running */ public boolean isReaderRunning() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 6e6fe9a9c2..990c5fd81f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,7 +59,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import static org.mockito.Mockito.mock; @@ -160,15 +158,11 @@ public class TestReplicationSource { } }; replicationEndpoint.start(); - ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeers mockPeers = mock(ReplicationPeers.class); Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - source.init(testConf, null, manager, null, mockPeers, null, "testPeer", - null, replicationEndpoint, null); + source.init(testConf, null, null, null, mockPeers, null, "testPeer", null, replicationEndpoint, + null); ExecutorService executor = Executors.newSingleThreadExecutor(); final Future future = executor.submit(new Runnable() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java deleted file mode 100644 index 6e19fc2402..0000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication.regionserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.HTestConst; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, LargeTests.class }) -public class TestGlobalThrottler { - private static final Log LOG = LogFactory.getLog(TestGlobalThrottler.class); - private static Configuration conf1; - private static Configuration conf2; - - private static HBaseTestingUtility utility1; - private static HBaseTestingUtility utility2; - - private static final byte[] famName = Bytes.toBytes("f"); - private static final byte[] VALUE = Bytes.toBytes("v"); - private static final byte[] ROW = Bytes.toBytes("r"); - private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf1 = HBaseConfiguration.create(); - conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); - conf1.setLong("replication.source.sleepforretries", 100); - // Each WAL is about 120 bytes - conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200); - conf1.setLong("replication.source.per.peer.node.bandwidth", 100L); - - utility1 = new HBaseTestingUtility(conf1); - utility1.startMiniZKCluster(); - MiniZooKeeperCluster miniZK = utility1.getZkCluster(); - new ZooKeeperWatcher(conf1, "cluster1", null, true); - - conf2 = new Configuration(conf1); - conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); - - utility2 = new HBaseTestingUtility(conf2); - utility2.setZkCluster(miniZK); - new ZooKeeperWatcher(conf2, "cluster2", null, true); - - ReplicationAdmin admin1 = new ReplicationAdmin(conf1); - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(utility2.getClusterKey()); - admin1.addPeer("peer1", rpc, null); - admin1.addPeer("peer2", rpc, null); - admin1.addPeer("peer3", rpc, null); - - utility1.startMiniCluster(1, 1); - utility2.startMiniCluster(1, 1); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - utility2.shutdownMiniCluster(); - utility1.shutdownMiniCluster(); - } - - - volatile private boolean testQuotaPass = false; - volatile private boolean testQuotaNonZero = false; - @Test - public void testQuota() throws IOException { - TableName tableName = TableName.valueOf("testQuota"); - HTableDescriptor table = new HTableDescriptor(tableName); - HColumnDescriptor fam = new HColumnDescriptor(famName); - fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL); - table.addFamily(fam); - utility1.getHBaseAdmin().createTable(table); - utility2.getHBaseAdmin().createTable(table); - - Thread watcher = new Thread(new Runnable() { - @Override - public void run() { - Replication replication = (Replication) utility1.getMiniHBaseCluster() - .getRegionServer(0).getReplicationSourceService(); - AtomicLong bufferUsed = replication.getReplicationManager().getTotalBufferUsed(); - testQuotaPass = true; - while (!Thread.interrupted()) { - long size = bufferUsed.get(); - if (size > 0) { - testQuotaNonZero = true; - } - if (size > 600) { - // We read logs first then check throttler, so if the buffer quota limiter doesn't - // take effect, it will push many logs and exceed the quota. - testQuotaPass = false; - } - Threads.sleep(50); - } - } - }); - - watcher.start(); - - try(Table t1 = utility1.getConnection().getTable(tableName); - Table t2 = utility2.getConnection().getTable(tableName)) { - for (int i = 0; i < 50; i++) { - Put put = new Put(ROWS[i]); - put.addColumn(famName, VALUE, VALUE); - t1.put(put); - } - long start = EnvironmentEdgeManager.currentTime(); - while (EnvironmentEdgeManager.currentTime() - start < 180000) { - Scan scan = new Scan(); - scan.setCaching(50); - int count = 0; - try (ResultScanner results = t2.getScanner(scan)) { - for (Result result : results) { - count++; - } - } - if (count < 50) { - LOG.info("Waiting all logs pushed to slave. Expected 50 , actual " + count); - Threads.sleep(200); - continue; - } - break; - } - } - - watcher.interrupt(); - Assert.assertTrue(testQuotaPass); - Assert.assertTrue(testQuotaNonZero); - } - - private List getRowNumbers(List cells) { - List listOfRowNumbers = new ArrayList<>(); - for (Cell c : cells) { - listOfRowNumbers.add(Integer.parseInt(Bytes - .toString(c.getRowArray(), c.getRowOffset() + ROW.length, - c.getRowLength() - ROW.length))); - } - return listOfRowNumbers; - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 005e2a17a9..04c3b818db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -33,7 +32,6 @@ import java.util.NavigableMap; import java.util.NoSuchElementException; import java.util.TreeMap; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -347,7 +345,6 @@ public class TestWALEntryStream { // start up a batcher ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0, fs, conf, getDummyFilter(), new MetricsSource("1")); Path walPath = walQueue.peek(); -- 2.13.4