diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index eba984a..cddfccb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -72,10 +72,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi // Configuration key for split threads public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; public final static int SPLIT_THREADS_DEFAULT = 1; - - // Configuration keys for merge threads - public final static String MERGE_THREADS = "hbase.regionserver.thread.merge"; - public final static int MERGE_THREADS_DEFAULT = 1; public static final String REGION_SERVER_REGION_SPLIT_LIMIT = "hbase.regionserver.regionSplitLimit"; @@ -87,7 +83,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi private final ThreadPoolExecutor longCompactions; private final ThreadPoolExecutor shortCompactions; private final ThreadPoolExecutor splits; - private final ThreadPoolExecutor mergePool; private volatile ThroughputController compactionThroughputController; @@ -150,15 +145,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi return new Thread(r, name); } }); - int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT); - this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool( - mergeThreads, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-merges-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); // compaction throughput controller this.compactionThroughputController = @@ -170,8 +156,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi return "compaction_queue=(" + longCompactions.getQueue().size() + ":" + shortCompactions.getQueue().size() + ")" - + ", split_queue=" + splits.getQueue().size() - + ", merge_queue=" + mergePool.getQueue().size(); + + ", split_queue=" + splits.getQueue().size(); } public String dumpQueue() { @@ -205,15 +190,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi queueLists.append("\n"); } - queueLists.append("\n"); - queueLists.append(" Region Merge Queue:\n"); - lq = mergePool.getQueue(); - it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - return queueLists.toString(); } @@ -372,7 +348,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi */ void interruptIfNecessary() { splits.shutdown(); - mergePool.shutdown(); longCompactions.shutdown(); shortCompactions.shutdown(); } @@ -394,7 +369,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi void join() { waitFor(splits, "Split Thread"); - waitFor(mergePool, "Merge Thread"); waitFor(longCompactions, "Large Compaction Thread"); waitFor(shortCompactions, "Small Compaction Thread"); } @@ -641,21 +615,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } } - int mergeThreads = newConf.getInt(MERGE_THREADS, - MERGE_THREADS_DEFAULT); - if (this.mergePool.getCorePoolSize() != mergeThreads) { - LOG.info("Changing the value of " + MERGE_THREADS + - " from " + this.mergePool.getCorePoolSize() + " to " + - mergeThreads); - if(this.mergePool.getCorePoolSize() < mergeThreads) { - this.mergePool.setMaximumPoolSize(mergeThreads); - this.mergePool.setCorePoolSize(mergeThreads); - } else { - this.mergePool.setCorePoolSize(mergeThreads); - this.mergePool.setMaximumPoolSize(mergeThreads); - } - } - ThroughputController old = this.compactionThroughputController; if (old != null) { old.stop("configuration change"); @@ -680,10 +639,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi return this.splits.getCorePoolSize(); } - protected int getMergeThreadNum() { - return this.mergePool.getCorePoolSize(); - } - /** * {@inheritDoc} */ @@ -706,11 +661,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } @VisibleForTesting - public long getCompletedMergeTaskCount() { - return mergePool.getCompletedTaskCount(); - } - - @VisibleForTesting /** * Shutdown the long compaction thread pool. * Should only be used in unit test to prevent long compaction thread pool from stealing job diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java index c7b6c7c..f6dc8c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java @@ -80,7 +80,6 @@ public class TestCompactSplitThread { conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 3); conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 4); conf.setInt(CompactSplitThread.SPLIT_THREADS, 5); - conf.setInt(CompactSplitThread.MERGE_THREADS, 6); } @After @@ -113,13 +112,11 @@ public class TestCompactSplitThread { assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum()); assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum()); assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum()); - assertEquals(6, regionServer.compactSplitThread.getMergeThreadNum()); // change bigger configurations and do online update conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 4); conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 5); conf.setInt(CompactSplitThread.SPLIT_THREADS, 6); - conf.setInt(CompactSplitThread.MERGE_THREADS, 7); try { regionServer.compactSplitThread.onConfigurationChange(conf); } catch (IllegalArgumentException iae) { @@ -130,13 +127,11 @@ public class TestCompactSplitThread { assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum()); assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum()); assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum()); - assertEquals(7, regionServer.compactSplitThread.getMergeThreadNum()); // change smaller configurations and do online update conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 2); conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 3); conf.setInt(CompactSplitThread.SPLIT_THREADS, 4); - conf.setInt(CompactSplitThread.MERGE_THREADS, 5); try { regionServer.compactSplitThread.onConfigurationChange(conf); } catch (IllegalArgumentException iae) { @@ -147,7 +142,6 @@ public class TestCompactSplitThread { assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum()); assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum()); assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum()); - assertEquals(5, regionServer.compactSplitThread.getMergeThreadNum()); } finally { conn.close(); }