From 8278f6c0d846cebeaae6e544aba76409e5e4e6d8 Mon Sep 17 00:00:00 2001 From: "chancelq" Date: Sat, 2 Dec 2017 23:36:00 +0800 Subject: [PATCH] HBASE-19389 Limit concurrency of put with dense (hundreds) columns to prevent write hander exhausted --- .../java/org/apache/hadoop/hbase/HConstants.java | 1 + .../apache/hadoop/hbase/regionserver/HRegion.java | 39 +++- .../apache/hadoop/hbase/regionserver/HStore.java | 38 +++- .../hadoop/hbase/regionserver/RSRpcServices.java | 6 + .../apache/hadoop/hbase/regionserver/Store.java | 2 + .../throttle/StoreHotnessProtector.java | 205 +++++++++++++++++++++ .../org/apache/hadoop/hbase/io/TestHeapSize.java | 9 + .../throttle/TestStoreHotnessProtector.java | 119 ++++++++++++ 8 files changed, 412 insertions(+), 7 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 594a895c42..a93f433d45 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -89,6 +89,7 @@ public final class HConstants { NOT_RUN, SUCCESS, BAD_FAMILY, + STORE_TOO_BUSY, SANITY_CHECK_FAILURE, FAILURE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e66ad59e05..5b45327866 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -146,6 +146,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; @@ -663,6 +664,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final NavigableMap replicationScope = new TreeMap<>( Bytes.BYTES_COMPARATOR); + private final StoreHotnessProtector storeHotnessProtector; + /** * HRegion constructor. This constructor should only be used for testing and * extensions. Instances of HRegion should be instantiated with the @@ -783,6 +786,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ? DEFAULT_DURABILITY : htd.getDurability(); + + this.storeHotnessProtector = new StoreHotnessProtector(this, conf); + if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -796,7 +802,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (LOG.isDebugEnabled()) { // Write out region name as string and its encoded name. - LOG.debug("Instantiated " + this); + LOG.debug("Instantiated " + this +"; "+ storeHotnessProtector.toString()); } configurationManager = Optional.empty(); @@ -3141,6 +3147,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!isOperationPending(lastIndexExclusive)) { continue; } + + // Must be after #isOperationPending, skip BAD_FAMILY/SANITY_CHECK_FAILURE + // TODO ut for cover it. + // HBASE-19389 Limit concurrency of put with dense (hundreds) columns to + // prevent write handler exhausted + // ugly, it can be override in MutationBatchOperation, let's put it here now, + // just for performance. + if (this instanceof MutationBatchOperation) { + try { + region.storeHotnessProtector + .start(((Mutation) operations[lastIndexExclusive]).getFamilyCellMap()); + } catch (RegionTooBusyException rtbe) { + retCodeDetails[lastIndexExclusive] = + new OperationStatus(OperationStatusCode.STORE_TOO_BUSY, rtbe.getMessage()); + continue; + } + } + Mutation mutation = getMutation(lastIndexExclusive); // If we haven't got any rows in our batch, we should block to get the next one. RowLock rowLock = null; @@ -3875,6 +3899,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; }); + if (batchOp instanceof MutationBatchOperation && miniBatchOp != null) { + this.storeHotnessProtector + .finish(batchOp.nextIndexToProcess, finalLastIndexExclusive, batchOp.retCodeDetails, + batchOp.getMutationsForCoprocs()); + } + batchOp.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, finalSuccess); batchOp.nextIndexToProcess = finalLastIndexExclusive; @@ -7776,7 +7806,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ClassSize.ARRAY + 50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + - 3 * Bytes.SIZEOF_BOOLEAN); + 3 * Bytes.SIZEOF_BOOLEAN + + /*storeHotnessProtector*/ + 1 * ClassSize.REFERENCE); // woefully out of date - currently missing: // 1 x HashMap - coprocessorServiceHandlers @@ -7801,6 +7833,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress + ClassSize.STORE_SERVICES // store services + + StoreHotnessProtector.FIXED_SIZE ; @Override @@ -8267,7 +8300,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ @Override public void onConfigurationChange(Configuration conf) { - // Do nothing for now. + this.storeHotnessProtector.update(conf); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 80f91c8b68..75a4904d8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; @@ -174,6 +175,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat private final boolean verifyBulkLoads; + private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); + private final int parallelPutCountPrintThreshold; + private ScanInfo scanInfo; // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. @@ -327,6 +331,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat + flushRetriesNumber); } cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); + + int confPrintThreshold = conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); + if (confPrintThreshold < 10) { + confPrintThreshold = 10; + } + this.parallelPutCountPrintThreshold = confPrintThreshold; } /** @@ -688,8 +698,15 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat public void add(final Cell cell, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { - this.memstore.add(cell, memstoreSizing); + if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { + if (LOG.isTraceEnabled()) { + LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this + .getColumnFamilyName() + " too Busy!"); + } + } + this.memstore.add(cell, memstoreSizing); } finally { + currentParallelPutCount.decrementAndGet(); lock.readLock().unlock(); } } @@ -700,8 +717,15 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat public void add(final Iterable cells, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { + if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { + if (LOG.isTraceEnabled()) { + LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this + .getColumnFamilyName() + " too Busy!"); + } + } memstore.add(cells, memstoreSizing); } finally { + currentParallelPutCount.decrementAndGet(); lock.readLock().unlock(); } } @@ -2315,9 +2339,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat return this.cacheConf; } - public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG) - + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); + public static final long FIXED_OVERHEAD = ClassSize.align( + ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG) + (5 + * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN) + + Bytes.SIZEOF_INT/*parallelPutCountPrintThreshold*/ + + ClassSize.REFERENCE/*currentParallelPutCount*/); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK @@ -2577,4 +2603,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat lock.writeLock().unlock(); } } + + public int getCurrentParallelPutCount() { + return currentParallelPutCount.get(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 58e2970180..edcd59bbb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; @@ -995,6 +996,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.addResultOrException(getResultOrException( ClientProtos.Result.getDefaultInstance(), index)); break; + + case STORE_TOO_BUSY: + e = new RegionTooBusyException(codes[i].getExceptionMsg()); + builder.addResultOrException(getResultOrException(e, index)); + break; } } } catch (IOException ie) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index d60de6bc8c..11cfb1a153 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -280,4 +280,6 @@ public interface Store { * @return true if the memstore may need some extra memory space */ boolean isSloppyMemStore(); + + int getCurrentParallelPutCount(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java new file mode 100644 index 0000000000..5d201e719d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.regionserver.OperationStatus; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + +/** + * StoreHotnessProtector is designed to Limit concurrency of put with dense (hundreds) columns + * to prevent write handler exhausted. + * It's not some kind of Throttling. + * Throttling is user oriented, while StoreHotnessProtector is system oriented(RS-self-protected). + * For example, a large number of clients write dense (hundreds) columns to one region will + * lead to RS blocking(all handler exhausted) + * This can be dynamically disabled or enabled. + * disable it by setting hbase.region.store.parallel.put.limit to 0 or negative number + */ +@InterfaceAudience.Private +public class StoreHotnessProtector { + private static final Log LOG = LogFactory.getLog(StoreHotnessProtector.class); + private volatile int parallelPutToStoreThreadLimit; + /** + * StoreHotnessProtector is not using lock,so we need #parallelPreparePutToStoreThreadLimit + * to prevent handler exhausted for an instant. + * Btw, basically we only want to limit the concurrency of putting to memStore. + * that means if handler exhausted because MVCC or WAL, it's another thing, + * although #parallelPreparePutToStoreThreadLimit can prevent handler exhausted + * because MVCC/WAL slowly. + */ + private volatile int parallelPreparePutToStoreThreadLimit; + public final static String PARALLEL_PUT_STORE_THREADS_LIMIT = + "hbase.region.store.parallel.put.limit"; + public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = + "hbase.region.store.parallel.prepare.put.multiplier"; + private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10; + private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount; + public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT = + "hbase.region.store.parallel.put.limit.min.column.count"; + private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100; + private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2; + + private final Map preparePutToStoreMap = + new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); + private final Region region; + + public StoreHotnessProtector(Region region, Configuration conf) { + init(conf); + this.region = region; + } + + public void init(Configuration conf) { + this.parallelPutToStoreThreadLimit = + conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT); + this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, + DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit; + this.parallelPutToStoreThreadLimitCheckMinColumnCount = + conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, + DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM); + + } + + public void update(Configuration conf) { + init(conf); + preparePutToStoreMap.clear(); + LOG.debug("update config: " + toString()); + } + + public void start(Map> familyMaps) throws RegionTooBusyException { + // feature is enabled when parallelPutToStoreThreadLimit > 0 + if (this.parallelPutToStoreThreadLimit > 0) { + + String tooBusyStore = null; + + for (Map.Entry> e : familyMaps.entrySet()) { + Store store = this.region.getStore(e.getKey()); + if (store == null || e.getValue() == null) { + continue; + } + + if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { + + //need to run #finish, so we add #preparePutCount first + //update will clear preparePutToStoreMap. no-lock. not really accurate when update conf. + preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger()); + AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey()); + if (preparePutCounter == null) { + preparePutCounter = new AtomicInteger(); + preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter); + } + int preparePutCount = preparePutCounter.incrementAndGet(); + //The number of threads writing to store exceeds the parallel limit, + //or prepare parallel put exceed limit + // that means this region(store) is very hot now, + // and as we know that writing to this store is very slow at this time. + // so we block it this time. + // not accurate(no lock), It's better later than never. + if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit + || preparePutCount > this.parallelPreparePutToStoreThreadLimit) { + tooBusyStore = (tooBusyStore == null ? + store.getColumnFamilyName() : + tooBusyStore + "," + store.getColumnFamilyName()); + } + } + } + + if (tooBusyStore != null) { + String msg = "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + + tooBusyStore + " Above parallelPutToStoreThreadLimit(" + + this.parallelPutToStoreThreadLimit + ")"; + if (LOG.isTraceEnabled()) { + LOG.trace(msg); + } + //reach STORE_TOO_BUSY, this mutation will not run again. so finish it. + //and #finish() will skip OperationStatusCode==STORE_TOO_BUSY + doFinish(familyMaps); + throw new RegionTooBusyException(msg); + } + } + } + + private void doFinish(Map> familyMaps) { + if (this.parallelPutToStoreThreadLimit > 0) { + + for (Map.Entry> e : familyMaps.entrySet()) { + Store store = this.region.getStore(e.getKey()); + if (store == null || e.getValue() == null) { + continue; + } + if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { + AtomicInteger counter = preparePutToStoreMap.get(e.getKey()); + //for change the configuration. not need very accuracy + if (counter != null && counter.decrementAndGet() < 0) { + counter.incrementAndGet(); + } + } + } + } + } + + public String toString() { + return "StoreHotnessProtector, parallelPutToStoreThreadLimit=" + + this.parallelPutToStoreThreadLimit + " ; minColumnNum=" + + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit=" + + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + ( + this.parallelPutToStoreThreadLimit > 0 ? "enable" : "disable"); + } + + public void finish(int indexToProcess, int lastIndexExclusive, OperationStatus[] retCodeDetails, + Mutation[] mutationsForCoprocs) { + // feature is enabled when parallelPutToStoreThreadLimit > 0 + if (this.parallelPutToStoreThreadLimit > 0) { + for (int i = indexToProcess; i < lastIndexExclusive; i++) { + switch (retCodeDetails[i].getOperationStatusCode()) { + case SUCCESS: + case FAILURE: + doFinish(mutationsForCoprocs[i].getFamilyCellMap()); + break; + //case STORE_TOO_BUSY, does not need to do anything here. + //Because when STORE_TOO_BUSY happens, #doFinish has been done . + default: + //do nothing + break; + } + }//end for + } + } + + @VisibleForTesting + Map getPreparePutToStoreMap() { + return preparePutToStoreMap; + } + + public static final long FIXED_SIZE = + ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT); +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index da45fdadb7..5b02ff054a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.MutableSegment; import org.apache.hadoop.hbase.regionserver.Segment; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker; +import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ClassSize; @@ -472,6 +473,14 @@ public class TestHeapSize { assertEquals(expected, actual); } + cl = StoreHotnessProtector.class; + actual = StoreHotnessProtector.FIXED_SIZE; + expected = ClassSize.estimateBase(cl, false); + if (expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + // Block cache key overhead. Only tests fixed overhead as estimating heap // size of strings is hard. cl = BlockCacheKey.class; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java new file mode 100644 index 0000000000..e2459c2bcc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER; +import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT; +import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestStoreHotnessProtector { + + @Test(timeout = 60000) + public void testPreparePutCounter() throws Exception { + + ExecutorService executorService = Executors.newFixedThreadPool(10); + + Configuration conf = new Configuration(); + conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0); + conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10); + conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); + Region mockRegion = mock(Region.class); + StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf); + + Store mockStore1 = mock(Store.class); + RegionInfo mockRegionInfo = mock(RegionInfo.class); + byte[] family = "testF1".getBytes(); + + when(mockRegion.getStore(family)).thenReturn(mockStore1); + when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); + when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1"); + + when(mockStore1.getCurrentParallelPutCount()).thenReturn(1); + when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1"); + + final Map> familyMaps = new HashMap<>(); + familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class))); + + final AtomicReference exception = new AtomicReference<>(); + + // PreparePutCounter not access limit + + int threadCount = 30; + CountDownLatch countDownLatch = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + executorService.execute(() -> { + try { + storeHotnessProtector.start(familyMaps); + } catch (RegionTooBusyException e) { + e.printStackTrace(); + exception.set(e); + } finally { + countDownLatch.countDown(); + } + }); + } + + countDownLatch.await(60, TimeUnit.SECONDS); + //no exception + Assert.assertEquals(exception.get(), null); + Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); + Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), + threadCount); + + // access limit + + try { + storeHotnessProtector.start(familyMaps); + } catch (RegionTooBusyException e) { + e.printStackTrace(); + exception.set(e); + } + + Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class); + + Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); + // when access limit, counter will not changed. + Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), + threadCount); + } + +} \ No newline at end of file -- 2.13.6 (Apple Git-96)