From 5c7e63d7f84fbc2aa405ed4102e045aff1ca75f0 Mon Sep 17 00:00:00 2001 From: xcang Date: Sat, 20 Oct 2018 18:36:04 -0700 Subject: [PATCH] Deduplicate compaction reuqests, only for smallCompacion queue --- .../hadoop/hbase/regionserver/CompactSplit.java | 71 ++++++++++++++++++++-- .../hadoop/hbase/regionserver/TestCompaction.java | 31 +++++++++- 2 files changed, 96 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index fbf73f36ee..9c0a85c597 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -364,19 +364,32 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } ThreadPoolExecutor pool; + boolean isShortCompaction = false; if (selectNow) { // compaction.get is safe as we will just return if selectNow is true but no compaction is // selected - pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions - : shortCompactions; + if (store.throttleCompaction(compaction.getRequest().getSize())) { + LOG.info("into longCompactions: " + store.toString()); + pool = longCompactions; + } else { + pool = shortCompactions; + isShortCompaction = true; + } } else { // We assume that most compactions are small. So, put system compactions into small // pool; we will do selection there, and move to large pool if necessary. pool = shortCompactions; + isShortCompaction = true; + } + boolean skip = false; + if(isShortCompaction) { + skip = handleShortCompactionDuplication(store, region, priority); + } + if (!skip) { + pool.execute( + new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); + region.incrementCompactionsQueuedCount(); } - pool.execute( - new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); - region.incrementCompactionsQueuedCount(); if (LOG.isDebugEnabled()) { String type = (pool == shortCompactions) ? "Small " : "Large "; LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") @@ -384,6 +397,42 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } } + /** + * return true if this compaction request can be skipped. + */ + private synchronized boolean handleShortCompactionDuplication(HStore store, HRegion region, + int priority) { + BlockingQueue queue = shortCompactions.getQueue(); + for(Runnable r : queue){ + HStore storeFromQueue = ((CompactionRunner)r).getStore(); + HRegion regionFromQueue = ((CompactionRunner)r).getRegion(); + //Don't do anything if one priority is negative and one is positive. + // Because we don't want to replace + // user compact with system compact or vice versa. + if(((CompactionRunner)r).getQueuedPriority() * priority < 0) { + return false; + } + + if(storeFromQueue.toString().equals(store.toString()) && + regionFromQueue.toString().equals(region.toString())) { + if(((CompactionRunner)r).getQueuedPriority() <= priority) { + //skip enqueuing this since it has lower priority + return true; + } else { + //try to remove duplicated compact from queue + if(!shortCompactions.remove(r)) { + return true; + } else { + //succesfully removed from shortCompactionQueue, decreasing queue count. + region.decrementCompactionsQueuedCount(); + return false; + } + } + } + } + return false; + } + public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); @@ -660,6 +709,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } } + public HStore getStore() { + return store; + } + + public HRegion getRegion() { + return region; + } + + public int getQueuedPriority() { + return queuedPriority; + } + @Override public void run() { Preconditions.checkNotNull(server); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index a1d76fba3f..dd6a9fa58a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -101,6 +101,7 @@ public class TestCompaction { private final byte [] STARTROW = Bytes.toBytes(START_KEY); private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; private int compactionThreshold; + private long compactionThrottleThreshold; private byte[] secondRowBytes, thirdRowBytes; private static final long MAX_FILES_TO_COMPACT = 10; private final byte[] FAMILY = Bytes.toBytes("cf"); @@ -115,7 +116,6 @@ public class TestCompaction { conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitThroughputController.class.getName()); compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); - secondRowBytes = START_KEY_BYTES.clone(); // Increment the least significant character so we get to next row. secondRowBytes[START_KEY_BYTES.length - 1]++; @@ -764,6 +764,35 @@ public class TestCompaction { return sf; } + + @Test + public void testCompactionQueueDedup() throws Exception { + // setup a compact/split thread on a mock server + HRegionServer mockServer = Mockito.mock(HRegionServer.class); + Configuration baseConfig = r.getBaseConf(); + //set throttle to max, avoid long compactions + baseConfig.set("hbase.regionserver.thread.compaction.throttle", String.valueOf(Long.MAX_VALUE)); + Mockito.when(mockServer.getConfiguration()).thenReturn(baseConfig); + CompactSplit thread = new CompactSplit(mockServer); + Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); + + // setup a region/store with some files + HStore store = r.getStore(COLUMN_FAMILY); + createStoreFile(r); + for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { + createStoreFile(r); + } + assertEquals(true, thread.isCompactionsEnabled()); + thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, + CompactionLifeCycleTracker.DUMMY, null); + // make 100 more compaction requests with same region and store. They should all be deduped. + for(int i = 0; i < 100; i++) { + thread.requestCompaction(r, store, "test", Store.PRIORITY_USER + i, + CompactionLifeCycleTracker.DUMMY, null); + } + assertTrue(thread.getShortCompactions().getQueue().size() <= 1); + } + /** * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction * finishes. -- 2.15.1 (Apple Git-101)