diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e419dc5eb3b..55481fb2a87 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2416,6 +2416,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_OPTIMIZE_TOPNKEY("hive.optimize.topnkey", true, "Whether to enable top n key optimizer."), HIVE_MAX_TOPN_ALLOWED("hive.optimize.topnkey.max", 128, "Maximum topN value allowed by top n key optimizer.\n" + "If the LIMIT is greater than this value then top n key optimization won't be used."), + HIVE_TOPN_EFFICIENCY_THRESHOLD("hive.optimize.topnkey.efficiency.threshold", 0.6f, "Disable topN key filter if the ratio between forwarded and total rows reaches this limit."), + HIVE_TOPN_EFFICIENCY_CHECK_BATCHES("hive.optimize.topnkey.efficiency.check.nbatches", 8, "Check topN key filter efficiency after a specific number of batches."), + HIVE_TOPN_MAX_NUMBER_OF_PARTITIONS("hive.optimize.topnkey.partitions.max", 64, "Limit the maximum number of partitions used by the top N key operator."), HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true, "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java index 38d2e08b760..361dbc32094 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java @@ -73,11 +73,30 @@ public void clear() { @Override public String toString() { final StringBuilder sb = new StringBuilder("TopNKeyFilter{"); - sb.append("topN=").append(topN); + sb.append("id=").append(super.toString()); + sb.append(", topN=").append(topN); sb.append(", repeated=").append(repeated); sb.append(", added=").append(added); sb.append(", total=").append(total); + sb.append(", forwardingRatio=").append(forwardingRatio()); sb.append('}'); return sb.toString(); } + + /** + * Ratio between the forwarded rows and the total incoming rows. + * The higher the number is, the less is the efficiency of the filter. + * 1 means all rows should be forwarded. + * @return + */ + public float forwardingRatio() { + if (total == 0) { + return 0; + } + return ((repeated + added) / (float)total); + } + + public long getTotal() { + return total; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java index dd66dfcd72e..f09867bb4e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hive.ql.exec; +import static org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator.checkTopNFilterEfficiency; import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY; import java.io.Serializable; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -47,6 +50,7 @@ private transient KeyWrapper keyWrapper; private transient KeyWrapperComparator keyWrapperComparator; + private transient Set disabledPartitions; /** Kryo ctor. */ public TopNKeyOperator() { @@ -82,6 +86,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { keyObjectInspectors, currentKeyObjectInspectors, columnSortOrder, nullSortOrder); this.topNKeyFilters = new HashMap<>(); + this.disabledPartitions = new HashSet<>(); } private KeyWrapper initObjectInspectors(Configuration hconf, @@ -105,29 +110,54 @@ private KeyWrapper initObjectInspectors(Configuration hconf, @Override public void process(Object row, int tag) throws HiveException { + if (!disabledPartitions.isEmpty() && disabledPartitions.size() == topNKeyFilters.size()) { // all filters are disabled due to efficiency check + forward(row, outputObjInspector); + return; + } + partitionKeyWrapper.getNewKey(row, inputObjInspectors[tag]); partitionKeyWrapper.setHashKey(); + if (disabledPartitions.contains(partitionKeyWrapper)) { // filter for this partition is disabled + forward(row, outputObjInspector); + return; + } + TopNKeyFilter topNKeyFilter = topNKeyFilters.get(partitionKeyWrapper); - if (topNKeyFilter == null) { + if (topNKeyFilter == null && topNKeyFilters.size() < conf.getMaxNumberOfPartitions()) { topNKeyFilter = new TopNKeyFilter(conf.getTopN(), keyWrapperComparator); topNKeyFilters.put(partitionKeyWrapper.copyKey(), topNKeyFilter); } - - keyWrapper.getNewKey(row, inputObjInspectors[tag]); - keyWrapper.setHashKey(); - - if (topNKeyFilter.canForward(keyWrapper)) { + if (topNKeyFilter == null) { forward(row, outputObjInspector); + } else { + keyWrapper.getNewKey(row, inputObjInspectors[tag]); + keyWrapper.setHashKey(); + if (topNKeyFilter.canForward(keyWrapper)) { + forward(row, outputObjInspector); + } + } + + if (runTimeNumRows % conf.getCheckEfficiencyNumRows() == 0) { // check the efficiency at every nth rows + checkTopNFilterEfficiency(topNKeyFilters, disabledPartitions, conf.getEfficiencyThreshold(), LOG); } } @Override protected final void closeOp(boolean abort) throws HiveException { - for (TopNKeyFilter each : topNKeyFilters.values()) { - each.clear(); + if (topNKeyFilters.size() == 1) { + TopNKeyFilter filter = topNKeyFilters.values().iterator().next(); + LOG.info("Closing TopNKeyFilter: {}", filter); + filter.clear(); + } else { + LOG.info("Closing {} TopNKeyFilters", topNKeyFilters.size()); + for (TopNKeyFilter each : topNKeyFilters.values()) { + LOG.debug("Closing TopNKeyFilter: {}", each); + each.clear(); + } } topNKeyFilters.clear(); + disabledPartitions.clear(); super.closeOp(abort); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java index 7feadd3137d..0f8eb173c66 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java @@ -19,13 +19,14 @@ import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.Set; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.KeyWrapper; -import org.apache.hadoop.hive.ql.exec.KeyWrapperComparator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TopNKeyFilter; import org.apache.hadoop.hive.ql.exec.TopNKeyOperator; @@ -38,6 +39,9 @@ import org.apache.hadoop.hive.ql.plan.VectorDesc; import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.slf4j.Logger; + +import com.google.common.annotations.VisibleForTesting; /** * VectorTopNKeyOperator passes rows that contains top N keys only. @@ -54,8 +58,9 @@ private transient VectorHashKeyWrapperBatch partitionKeyWrapperBatch; private transient VectorHashKeyWrapperBatch keyWrappersBatch; private transient Map topNKeyFilters; + private transient Set disabledPartitions; private transient Comparator keyWrapperComparator; - + private transient long incomingBatches; public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf, VectorizationContext vContext, VectorDesc vectorDesc) { @@ -92,6 +97,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { keyWrapperComparator = keyWrappersBatch.getComparator(conf.getColumnSortOrder(), conf.getNullOrder()); partitionKeyWrapperBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(partitionKeyExpressions); topNKeyFilters = new HashMap<>(); + disabledPartitions = new HashSet<>(); + incomingBatches = 0; } private void initKeyExpressions(Configuration hconf, VectorExpression[] keyExpressions) throws HiveException { @@ -104,7 +111,11 @@ private void initKeyExpressions(Configuration hconf, VectorExpression[] keyExpre @Override public void process(Object data, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) data; - + if (!disabledPartitions.isEmpty() && disabledPartitions.size() == topNKeyFilters.size()) { // all filters are disabled due to efficiency check + vectorForward(batch); + return; + } + incomingBatches++; // The selected vector represents selected rows. // Clone the selected vector System.arraycopy(batch.selected, 0, temporarySelected, 0, batch.size); @@ -134,15 +145,18 @@ public void process(Object data, int tag) throws HiveException { j = i; } - // Select a row in the priority queue - TopNKeyFilter topNKeyFilter = topNKeyFilters.get(partitionKeyWrappers[i]); - if (topNKeyFilter == null) { - topNKeyFilter = new TopNKeyFilter(conf.getTopN(), keyWrapperComparator); - topNKeyFilters.put(partitionKeyWrappers[i].copyKey(), topNKeyFilter); - } - - if (topNKeyFilter.canForward(keyWrappers[i])) { + VectorHashKeyWrapperBase partitionKey = partitionKeyWrappers[i]; + if (disabledPartitions.contains(partitionKey)) { // filter for this partition is disabled selected[size++] = j; + } else { + TopNKeyFilter topNKeyFilter = topNKeyFilters.get(partitionKey); + if (topNKeyFilter == null && topNKeyFilters.size() < conf.getMaxNumberOfPartitions()) { + topNKeyFilter = new TopNKeyFilter(conf.getTopN(), keyWrapperComparator); + topNKeyFilters.put(partitionKey.copyKey(), topNKeyFilter); + } + if (topNKeyFilter == null || topNKeyFilter.canForward(keyWrappers[i])) { + selected[size++] = j; + } } } @@ -162,6 +176,28 @@ public void process(Object data, int tag) throws HiveException { batch.selected = selectedBackup; batch.size = sizeBackup; batch.selectedInUse = selectedInUseBackup; + + if (incomingBatches % conf.getCheckEfficiencyNumBatches() == 0) { + checkTopNFilterEfficiency(topNKeyFilters, disabledPartitions, conf.getEfficiencyThreshold(), LOG); + } + } + + public static void checkTopNFilterEfficiency(Map filters, + Set disabledPartitions, + float efficiencyThreshold, + Logger log) + { + Iterator> iterator = filters.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry each = iterator.next(); + KeyWrapper partitionKey = each.getKey(); + TopNKeyFilter filter = each.getValue(); + log.debug("Checking TopN Filter efficiency {}, threshold: {}", filter, efficiencyThreshold); + if (filter.forwardingRatio() >= efficiencyThreshold) { + log.info("Disabling TopN Filter {}", filter); + disabledPartitions.add(partitionKey); + } + } } @Override @@ -195,9 +231,19 @@ public OperatorType getType() { @Override protected void closeOp(boolean abort) throws HiveException { // LOG.info("Closing TopNKeyFilter: {}.", topNKeyFilter); - for (TopNKeyFilter topNKeyFilter : topNKeyFilters.values()) { - topNKeyFilter.clear(); + if (topNKeyFilters.size() == 1) { + TopNKeyFilter filter = topNKeyFilters.values().iterator().next(); + LOG.info("Closing TopNKeyFilter: {}", filter); + filter.clear(); + } else { + LOG.info("Closing {} TopNKeyFilters", topNKeyFilters.size()); + for (TopNKeyFilter each : topNKeyFilters.values()) { + LOG.debug("Closing TopNKeyFilter: {}", each); + each.clear(); + } } + topNKeyFilters.clear(); + disabledPartitions.clear(); super.closeOp(abort); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java index 3869ffa2b83..bf5083f0428 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hive.ql.optimizer.topnkey; +import java.util.Collections; +import java.util.List; +import java.util.Stack; + import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -33,20 +37,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.List; -import java.util.Stack; - /** * TopNKeyProcessor is a processor for TopNKeyOperator. * A TopNKeyOperator will be placed before any ReduceSinkOperator which has a topN property >= 0. */ public class TopNKeyProcessor implements SemanticNodeProcessor { private static final Logger LOG = LoggerFactory.getLogger(TopNKeyProcessor.class); - private final int maxTopNAllowed; + private float efficiencyThreshold; + private long checkEfficiencyNumBatches; + private int maxTopNAllowed; + private int maxNumberOfPartitions; - public TopNKeyProcessor(int maxTopNAllowed) { + public TopNKeyProcessor() { + } + + public TopNKeyProcessor(int maxTopNAllowed, float efficiencyThreshold, long checkEfficiencyNumBatches, int maxNumberOfPartitions) { this.maxTopNAllowed = maxTopNAllowed; + this.efficiencyThreshold = efficiencyThreshold; + this.checkEfficiencyNumBatches = checkEfficiencyNumBatches; + this.maxNumberOfPartitions = maxNumberOfPartitions; } @Override @@ -84,7 +93,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } TopNKeyDesc topNKeyDesc = new TopNKeyDesc(reduceSinkDesc.getTopN(), reduceSinkDesc.getOrder(), - reduceSinkDesc.getNullOrder(), reduceSinkDesc.getKeyCols(), partitionCols); + reduceSinkDesc.getNullOrder(), reduceSinkDesc.getKeyCols(), partitionCols, + efficiencyThreshold, checkEfficiencyNumBatches, maxNumberOfPartitions); + copyDown(reduceSinkOperator, topNKeyDesc); return null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 31735c9ea3d..caab0564f2f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -1289,7 +1289,11 @@ private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx) Map opRules = new LinkedHashMap(); opRules.put( new RuleRegExp("Top n key optimization", ReduceSinkOperator.getOperatorName() + "%"), - new TopNKeyProcessor(HiveConf.getIntVar(procCtx.conf, HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED))); + new TopNKeyProcessor( + HiveConf.getIntVar(procCtx.conf, HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED), + HiveConf.getFloatVar(procCtx.conf, ConfVars.HIVE_TOPN_EFFICIENCY_THRESHOLD), + HiveConf.getIntVar(procCtx.conf, ConfVars.HIVE_TOPN_EFFICIENCY_CHECK_BATCHES), + HiveConf.getIntVar(procCtx.conf, ConfVars.HIVE_TOPN_MAX_NUMBER_OF_PARTITIONS))); opRules.put( new RuleRegExp("Top n key pushdown", TopNKeyOperator.getOperatorName() + "%"), new TopNKeyPushdownProcessor()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java index 19910a341e0..ddd657e5552 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java @@ -17,13 +17,14 @@ */ package org.apache.hadoop.hive.ql.plan; -import org.apache.hadoop.hive.ql.optimizer.topnkey.CommonKeyPrefix; -import org.apache.hadoop.hive.ql.plan.Explain.Level; - import java.util.ArrayList; import java.util.List; import java.util.Objects; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.optimizer.topnkey.CommonKeyPrefix; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + /** * TopNKeyDesc. * @@ -38,19 +39,30 @@ private String nullOrder; private List keyColumns; private List partitionKeyColumns; + private float efficiencyThreshold; + private long checkEfficiencyNumBatches; + private long checkEfficiencyNumRows; + private int maxNumberOfPartitions; public TopNKeyDesc() { } public TopNKeyDesc( - final int topN, - final String columnSortOrder, - final String nullOrder, - final List keyColumns, - final List partitionKeyColumns) { + final int topN, + final String columnSortOrder, + final String nullOrder, + final List keyColumns, + final List partitionKeyColumns, + float efficiencyThreshold, + long checkEfficiencyNumBatches, + int maxNumberOfPartitions) { this.topN = topN; this.keyColumns = new ArrayList<>(keyColumns.size()); + this.efficiencyThreshold = efficiencyThreshold; + this.checkEfficiencyNumBatches = checkEfficiencyNumBatches; + this.checkEfficiencyNumRows = checkEfficiencyNumBatches * VectorizedRowBatch.DEFAULT_SIZE; + this.maxNumberOfPartitions = maxNumberOfPartitions; StringBuilder sortOrder = new StringBuilder(columnSortOrder.length()); StringBuilder nullSortOrder = new StringBuilder(nullOrder.length()); this.partitionKeyColumns = new ArrayList<>(partitionKeyColumns.size()); @@ -81,6 +93,22 @@ public int getTopN() { return topN; } + public float getEfficiencyThreshold() { + return efficiencyThreshold; + } + + public long getCheckEfficiencyNumBatches() { + return checkEfficiencyNumBatches; + } + + public long getCheckEfficiencyNumRows() { + return checkEfficiencyNumRows; + } + + public int getMaxNumberOfPartitions() { + return maxNumberOfPartitions; + } + public void setTopN(int topN) { this.topN = topN; } @@ -198,7 +226,8 @@ public TopNKeyDescExplainVectorization getTopNKeyVectorization() { public TopNKeyDesc combine(CommonKeyPrefix commonKeyPrefix) { return new TopNKeyDesc(topN, commonKeyPrefix.getMappedOrder(), commonKeyPrefix.getMappedNullOrder(), commonKeyPrefix.getMappedColumns(), - commonKeyPrefix.getMappedColumns().subList(0, partitionKeyColumns.size())); + commonKeyPrefix.getMappedColumns().subList(0, partitionKeyColumns.size()), + efficiencyThreshold, checkEfficiencyNumBatches, maxNumberOfPartitions); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java index 95cd45978a8..a91bc7354a7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java @@ -17,21 +17,30 @@ */ package org.apache.hadoop.hive.ql.exec; +import static org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator.checkTopNFilterEfficiency; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Objects; +import java.util.Set; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Unit test of TopNKeyFilter. */ public class TestTopNKeyFilter { - + private static final Logger LOG = LoggerFactory.getLogger(TestTopNKeyFilter.class.getName()); public static final Comparator TEST_KEY_WRAPPER_COMPARATOR = Comparator.comparingInt(o -> o.keyValue); @Test @@ -69,6 +78,51 @@ public void testMembersOfTopNKeysStillCanBeForwardedAfterNonTopNKeysTried() { assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true)); } + @Test + public void testEfficiencyWhenEverythingIsForwarded() { + TopNKeyFilter topNKeyFilter = new TopNKeyFilter(2, TEST_KEY_WRAPPER_COMPARATOR); + assertThat(topNKeyFilter.canForward(new TestKeyWrapper(5)), is(true)); + assertThat(topNKeyFilter.canForward(new TestKeyWrapper(4)), is(true)); + assertThat(topNKeyFilter.canForward(new TestKeyWrapper(3)), is(true)); + assertThat(topNKeyFilter.canForward(new TestKeyWrapper(2)), is(true)); + assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true)); + assertThat(topNKeyFilter.forwardingRatio(), is(1.0f)); + } + + @Test + public void testEfficiencyWhenOnlyOneIsForwarded() { + TopNKeyFilter topNKeyFilter = new TopNKeyFilter(1, TEST_KEY_WRAPPER_COMPARATOR); + assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true)); + assertThat(topNKeyFilter.canForward(new TestKeyWrapper(2)), is(false)); + assertThat(topNKeyFilter.canForward(new TestKeyWrapper(3)), is(false)); + assertThat(topNKeyFilter.canForward(new TestKeyWrapper(4)), is(false)); + assertThat(topNKeyFilter.canForward(new TestKeyWrapper(5)), is(false)); + assertThat(topNKeyFilter.forwardingRatio(), is(1/5f)); + } + + @Test + public void testDisabling() { + TopNKeyFilter efficientFilter = new TopNKeyFilter(1, TEST_KEY_WRAPPER_COMPARATOR); + efficientFilter.canForward(new TestKeyWrapper(1)); + efficientFilter.canForward(new TestKeyWrapper(2)); + efficientFilter.canForward(new TestKeyWrapper(3)); + + TopNKeyFilter inefficientFilter = new TopNKeyFilter(1, TEST_KEY_WRAPPER_COMPARATOR); + inefficientFilter.canForward(new TestKeyWrapper(3)); + inefficientFilter.canForward(new TestKeyWrapper(2)); + inefficientFilter.canForward(new TestKeyWrapper(1)); + + Map filters = new HashMap() {{ + put(new TestKeyWrapper(100), efficientFilter); + put(new TestKeyWrapper(200), inefficientFilter); + }}; + + Set disabled = new HashSet<>(); + checkTopNFilterEfficiency(filters, disabled, 0.6f, LOG); + assertThat(disabled, hasSize(1)); + assertThat(disabled, hasItem(new TestKeyWrapper(200))); + } + /** * Test implementation of KeyWrapper. */