From e6561848331ba746d4aa60f7b29e37a72eb9bfe4 Mon Sep 17 00:00:00 2001 From: "chancelq" Date: Sat, 2 Dec 2017 23:36:00 +0800 Subject: [PATCH] HBASE-19389 Limit concurrency of put with dense (hundreds) columns to prevent write hander exhausted --- .../java/org/apache/hadoop/hbase/HConstants.java | 1 + .../apache/hadoop/hbase/regionserver/HRegion.java | 35 +++- .../apache/hadoop/hbase/regionserver/HStore.java | 38 ++++- .../hadoop/hbase/regionserver/RSRpcServices.java | 6 + .../apache/hadoop/hbase/regionserver/Store.java | 2 + .../regionserver/throttle/StoreHotProtector.java | 179 +++++++++++++++++++++ .../org/apache/hadoop/hbase/io/TestHeapSize.java | 9 ++ .../throttle/StoreHotProtectorTest.java | 117 ++++++++++++++ 8 files changed, 380 insertions(+), 7 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotProtector.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotProtectorTest.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 594a895c42..a93f433d45 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -89,6 +89,7 @@ public final class HConstants { NOT_RUN, SUCCESS, BAD_FAMILY, + STORE_TOO_BUSY, SANITY_CHECK_FAILURE, FAILURE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e66ad59e05..d9964be31a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; +import org.apache.hadoop.hbase.regionserver.throttle.StoreHotProtector; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import java.io.EOFException; @@ -663,6 +664,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final NavigableMap replicationScope = new TreeMap<>( Bytes.BYTES_COMPARATOR); + private final StoreHotProtector storeHotProtector; + /** * HRegion constructor. This constructor should only be used for testing and * extensions. Instances of HRegion should be instantiated with the @@ -783,6 +786,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ? DEFAULT_DURABILITY : htd.getDurability(); + + this.storeHotProtector = new StoreHotProtector(this, conf); + if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -796,7 +802,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (LOG.isDebugEnabled()) { // Write out region name as string and its encoded name. - LOG.debug("Instantiated " + this); + LOG.debug("Instantiated " + this +"; "+ storeHotProtector.toString()); } configurationManager = Optional.empty(); @@ -3138,6 +3144,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int readyToWriteCount = 0; int lastIndexExclusive = 0; for (; lastIndexExclusive < size(); lastIndexExclusive++) { + + // HBASE-19389 Limit concurrency of put with dense (hundreds) columns to + // prevent write hander exhausted + // ugly, but no choice because next maybe stop acquiring more rows for this batch + if (this instanceof MutationBatchOperation) { + try { + region.storeHotProtector + .start(((Mutation) operations[lastIndexExclusive]).getFamilyCellMap()); + } catch (RegionTooBusyException rtbe) { + retCodeDetails[lastIndexExclusive] = + new OperationStatus(OperationStatusCode.STORE_TOO_BUSY, rtbe.getMessage()); + continue; + } + } if (!isOperationPending(lastIndexExclusive)) { continue; } @@ -3875,6 +3895,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; }); + if (batchOp instanceof MutationBatchOperation && miniBatchOp != null) { + this.storeHotProtector + .finish(batchOp.nextIndexToProcess, finalLastIndexExclusive, batchOp.retCodeDetails, + batchOp.getMutationsForCoprocs()); + } + batchOp.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, finalSuccess); batchOp.nextIndexToProcess = finalLastIndexExclusive; @@ -7776,7 +7802,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ClassSize.ARRAY + 50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + - 3 * Bytes.SIZEOF_BOOLEAN); + 3 * Bytes.SIZEOF_BOOLEAN + + /*storeHotProtector*/ + 1 * ClassSize.REFERENCE); // woefully out of date - currently missing: // 1 x HashMap - coprocessorServiceHandlers @@ -7801,6 +7829,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress + ClassSize.STORE_SERVICES // store services + + StoreHotProtector.FIXED_SIZE ; @Override @@ -8267,7 +8296,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ @Override public void onConfigurationChange(Configuration conf) { - // Do nothing for now. + this.storeHotProtector.update(conf); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 80f91c8b68..75a4904d8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; @@ -174,6 +175,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat private final boolean verifyBulkLoads; + private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); + private final int parallelPutCountPrintThreshold; + private ScanInfo scanInfo; // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. @@ -327,6 +331,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat + flushRetriesNumber); } cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); + + int confPrintThreshold = conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); + if (confPrintThreshold < 10) { + confPrintThreshold = 10; + } + this.parallelPutCountPrintThreshold = confPrintThreshold; } /** @@ -688,8 +698,15 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat public void add(final Cell cell, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { - this.memstore.add(cell, memstoreSizing); + if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { + if (LOG.isTraceEnabled()) { + LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this + .getColumnFamilyName() + " too Busy!"); + } + } + this.memstore.add(cell, memstoreSizing); } finally { + currentParallelPutCount.decrementAndGet(); lock.readLock().unlock(); } } @@ -700,8 +717,15 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat public void add(final Iterable cells, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { + if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { + if (LOG.isTraceEnabled()) { + LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this + .getColumnFamilyName() + " too Busy!"); + } + } memstore.add(cells, memstoreSizing); } finally { + currentParallelPutCount.decrementAndGet(); lock.readLock().unlock(); } } @@ -2315,9 +2339,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat return this.cacheConf; } - public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG) - + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); + public static final long FIXED_OVERHEAD = ClassSize.align( + ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG) + (5 + * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN) + + Bytes.SIZEOF_INT/*parallelPutCountPrintThreshold*/ + + ClassSize.REFERENCE/*currentParallelPutCount*/); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK @@ -2577,4 +2603,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat lock.writeLock().unlock(); } } + + public int getCurrentParallelPutCount() { + return currentParallelPutCount.get(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 58e2970180..edcd59bbb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; @@ -995,6 +996,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.addResultOrException(getResultOrException( ClientProtos.Result.getDefaultInstance(), index)); break; + + case STORE_TOO_BUSY: + e = new RegionTooBusyException(codes[i].getExceptionMsg()); + builder.addResultOrException(getResultOrException(e, index)); + break; } } } catch (IOException ie) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index d60de6bc8c..11cfb1a153 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -280,4 +280,6 @@ public interface Store { * @return true if the memstore may need some extra memory space */ boolean isSloppyMemStore(); + + int getCurrentParallelPutCount(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotProtector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotProtector.java new file mode 100644 index 0000000000..5a06d813df --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotProtector.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.regionserver.OperationStatus; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class StoreHotProtector { + private static final Log LOG = LogFactory.getLog(StoreHotProtector.class); + private volatile int parallelPutToStoreThreadLimit; + private volatile int parallelPreparePutToStoreThreadLimit; + public final static String PARALLEL_PUT_STORE_THREADS_LIMIT = + "hbase.region.store.parallel.put.limit"; + public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = + "hbase.region.store.parallel.prepare.put.multiplier"; + private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10; + private volatile int parallelPutToStoreThreadLimitCheckMinColumnNum; + public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = + "hbase.region.store.parallel.put.limit.min.column.num"; + private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100; + private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2; + + private final Map preparePutToStoreMap = + new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); + private final Region region; + + public StoreHotProtector(Region region, Configuration conf) { + init(conf); + this.region = region; + } + + public void init(Configuration conf) { + this.parallelPutToStoreThreadLimit = + conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT); + this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, + DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit; + this.parallelPutToStoreThreadLimitCheckMinColumnNum = + conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM, + DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM); + + } + + public void update(Configuration conf) { + init(conf); + preparePutToStoreMap.clear(); + LOG.debug("update config: " + toString()); + } + + public void start(Map> familyMaps) throws RegionTooBusyException { + if (this.parallelPutToStoreThreadLimit > 0) { + + String tooBusyStore = null; + + for (Map.Entry> e : familyMaps.entrySet()) { + Store store = this.region.getStore(e.getKey()); + if (store == null || e.getValue() == null) { + continue; + } + + if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnNum) { + + //need to run #finish, so we add #preparePutCount first + //update will clear preparePutToStoreMap. no-lock. not really accurate when update conf. + preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger()); + AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey()); + if (preparePutCounter == null) { + preparePutCounter = new AtomicInteger(); + preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter); + } + int preparePutCount = preparePutCounter.incrementAndGet(); + //The number of threads writing to store exceeds the parallel limit, + //or prepare parallel put exceed limit + // that means this region(store) is very hot now, + // and as we know that writing to this store is very slow at this time. + // so we block it this time. + // not accurate(no lock), It's better later than never. + if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit + || preparePutCount > this.parallelPreparePutToStoreThreadLimit) { + tooBusyStore = (tooBusyStore == null ? + store.getColumnFamilyName() : + tooBusyStore + "," + store.getColumnFamilyName()); + } + } + } + + if (tooBusyStore != null) { + String msg = "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + + tooBusyStore + " Above parallelPutToStoreThreadLimit(" + + this.parallelPutToStoreThreadLimit + ")"; + if (LOG.isTraceEnabled()) { + LOG.trace(msg); + } + //will skip this mutation. so finish it. + //and normally, #finish will skip OperationStatusCode!=STORE_TOO_BUSY + doFinish(familyMaps); + throw new RegionTooBusyException(msg); + } + } + } + + private void doFinish(Map> familyMaps) { + if (this.parallelPutToStoreThreadLimit > 0) { + + for (Map.Entry> e : familyMaps.entrySet()) { + Store store = this.region.getStore(e.getKey()); + if (store == null || e.getValue() == null) { + continue; + } + if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnNum) { + AtomicInteger counter = preparePutToStoreMap.get(e.getKey()); + //for change the configuration. not need very accuracy + if (counter != null && counter.decrementAndGet() < 0) { + counter.incrementAndGet(); + } + } + } + } + } + + public String toString() { + return "StoreHotProtector, parallelPutToStoreThreadLimit=" + this.parallelPutToStoreThreadLimit + + " ; minColumnNum=" + this.parallelPutToStoreThreadLimitCheckMinColumnNum + + " ; preparePutThreadLimit=" + this.parallelPreparePutToStoreThreadLimit + + " ; hotProtect now " + (this.parallelPutToStoreThreadLimit > 0 ? "enable" : "disable"); + } + + public void finish(int indexToProcess, int finalLastIndexExclusive, + OperationStatus[] retCodeDetails, Mutation[] mutationsForCoprocs) { + if (this.parallelPutToStoreThreadLimit > 0) { + for (int i = indexToProcess; i < finalLastIndexExclusive; i++) { + switch (retCodeDetails[i].getOperationStatusCode()) { + case SUCCESS: + case FAILURE: + doFinish(mutationsForCoprocs[i].getFamilyCellMap()); + break; + } + }//end for + } + } + + @VisibleForTesting + Map getPreparePutToStoreMap() { + return preparePutToStoreMap; + } + + public static final long FIXED_SIZE = + ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT); +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index da45fdadb7..bf32b1fe28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io; +import org.apache.hadoop.hbase.regionserver.throttle.StoreHotProtector; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -472,6 +473,14 @@ public class TestHeapSize { assertEquals(expected, actual); } + cl = StoreHotProtector.class; + actual = StoreHotProtector.FIXED_SIZE; + expected = ClassSize.estimateBase(cl, false); + if (expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + // Block cache key overhead. Only tests fixed overhead as estimating heap // size of strings is hard. cl = BlockCacheKey.class; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotProtectorTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotProtectorTest.java new file mode 100644 index 0000000000..7e58b16c6f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotProtectorTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import com.google.common.collect.Lists; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER; +import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotProtector.PARALLEL_PUT_STORE_THREADS_LIMIT; +import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Category(SmallTests.class) +public class StoreHotProtectorTest { + + @Test(timeout = 60000) + public void testPreparePutCounter() throws Exception { + + ExecutorService executorService = Executors.newFixedThreadPool(10); + + Configuration conf = new Configuration(); + conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM, 0); + conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10); + conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); + Region mockRegion = mock(Region.class); + StoreHotProtector storeHotProtector = new StoreHotProtector(mockRegion, conf); + + Store mockStore1 = mock(Store.class); + RegionInfo mockRegionInfo = mock(RegionInfo.class); + byte[] family = "testF1".getBytes(); + + when(mockRegion.getStore(family)).thenReturn(mockStore1); + when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); + when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1"); + + when(mockStore1.getCurrentParallelPutCount()).thenReturn(1); + when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1"); + + final Map> familyMaps = new HashMap<>(); + familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class))); + + final AtomicReference exception = new AtomicReference<>(); + + // PreparePutCounter not access limit + + int threadCount = 30; + CountDownLatch countDownLatch = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + executorService.execute(() -> { + try { + storeHotProtector.start(familyMaps); + } catch (RegionTooBusyException e) { + e.printStackTrace(); + exception.set(e); + } finally { + countDownLatch.countDown(); + } + }); + } + + countDownLatch.await(60, TimeUnit.SECONDS); + //no exception + Assert.assertEquals(exception.get(), null); + Assert.assertEquals(storeHotProtector.getPreparePutToStoreMap().size(), 1); + Assert.assertEquals(storeHotProtector.getPreparePutToStoreMap().get(family).get(), threadCount); + + // access limit + + try { + storeHotProtector.start(familyMaps); + } catch (RegionTooBusyException e) { + e.printStackTrace(); + exception.set(e); + } + + Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class); + + System.out.println(storeHotProtector.getPreparePutToStoreMap().get(family).get()); + Assert.assertEquals(storeHotProtector.getPreparePutToStoreMap().size(), 1); + // when access limit, counter will not changed. + Assert.assertEquals(storeHotProtector.getPreparePutToStoreMap().get(family).get(), threadCount); + } + +} \ No newline at end of file -- 2.13.6 (Apple Git-96)