diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 54e0eb8..8764cd7 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1120,6 +1120,7 @@ public static final int QOS_THRESHOLD = 10; public static final int ADMIN_QOS = 100; public static final int HIGH_QOS = 200; + public static final int LARGEQUERY_QOS = 4; public static final int SYSTEMTABLE_QOS = HIGH_QOS; /** Directory under /hbase where archived hfiles are stored */ diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/AggregateServiceJudger.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/AggregateServiceJudger.java new file mode 100644 index 0000000..34d2554 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/AggregateServiceJudger.java @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor.example; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.coprocessor.CoprocessorJudger; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * Coprocessor judger to decide if the client call to AggregateService + * involes large query + */ +@InterfaceAudience.Private +public class AggregateServiceJudger implements CoprocessorJudger { + private static final Log LOG = LogFactory.getLog(AggregateServiceJudger.class.getName()); + + @Override + public boolean isLargeQuery(ByteString msg) { + try { + AggregateRequest request = AggregateRequest.parseFrom(msg.toByteArray()); + Scan scan = request.getScan(); + String startRow = scan.getStartRow().toStringUtf8(); + String stopRow = scan.getStopRow().toStringUtf8(); + if ("".equals(startRow) || "".equals(stopRow)) { + return true; + } + } catch (InvalidProtocolBufferException e) { + LOG.info(e.getMessage(), e); + } + return false; + } + +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index 534331a..6c225b8 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -61,6 +61,9 @@ String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue"; String REPLICATION_QUEUE_DESC = "Number of calls in the replication call queue waiting to be run"; + String LARGEQUERY_QUEUE_NAME = "numCallsInLargeQueryQueue"; + String LARGEQUERY_QUEUE_DESC = + "Number of calls in the largequery call queue."; String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run"; String WRITE_QUEUE_NAME = "numCallsInWriteQueue"; String WRITE_QUEUE_DESC = "Number of calls in the write call queue; " + diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java index b272cd0..68667e2 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java @@ -26,6 +26,8 @@ int getReplicationQueueLength(); + int getLargeQueryQueueLength(); + int getPriorityQueueLength(); int getNumOpenConnections(); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index eee641a..34a9784 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -149,6 +149,8 @@ wrapper.getGeneralQueueLength()) .addGauge(Interns.info(REPLICATION_QUEUE_NAME, REPLICATION_QUEUE_DESC), wrapper.getReplicationQueueLength()) + .addGauge(Interns.info(LARGEQUERY_QUEUE_NAME, + LARGEQUERY_QUEUE_DESC), wrapper.getLargeQueryQueueLength()) .addGauge(Interns.info(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_DESC), wrapper.getPriorityQueueLength()) .addGauge(Interns.info(NUM_OPEN_CONNECTIONS_NAME, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorJudger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorJudger.java new file mode 100644 index 0000000..c7a0851 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorJudger.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; + +/** + * Coprocessor judger to decide if the client call involes large query + */ +@InterfaceAudience.Private +public interface CoprocessorJudger { + /** + * Judge if the Coprocessor call is a large query + */ + public boolean isLargeQuery(ByteString request); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index 4ebfcd9..2c9392a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -105,6 +105,11 @@ } @Override + public int getLargeQueryQueueLength() { + return 0; + } + + @Override public int getActiveRpcHandlerCount() { return executor.getActiveCount(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java index 4afcc33..ac227be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java @@ -56,6 +56,14 @@ } @Override + public int getLargeQueryQueueLength() { + if (!isServerStarted() || this.server.getScheduler() == null) { + return 0; + } + return server.getScheduler().getLargeQueryQueueLength(); + } + + @Override public int getPriorityQueueLength() { if (!isServerStarted() || this.server.getScheduler() == null) { return 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index fff8373..d741dca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -74,6 +74,9 @@ /** Retrieves length of the replication queue for metrics. */ public abstract int getReplicationQueueLength(); + /** Retrieves length of the replication queue for metrics. */ + public abstract int getLargeQueryQueueLength(); + /** Retrieves the number of active handler. */ public abstract int getActiveRpcHandlerCount(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 900861b..75951cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.ipc; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -35,11 +37,14 @@ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver { + private static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class); private int port; + private boolean isolateLargeQuery = false; private final PriorityFunction priority; private final RpcExecutor callExecutor; private final RpcExecutor priorityExecutor; private final RpcExecutor replicationExecutor; + private final RpcExecutor largeQueryExecutor; /** What level a high priority call is at. */ private final int highPriorityLevel; @@ -68,6 +73,8 @@ int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength); + this.isolateLargeQuery = conf.getBoolean("hbase.ipc.server.largequery.isolate", false); + int largeQueryHandlerCount = conf.getInt("hbase.regionserver.largequery.handler.count", 10); this.priority = priority; this.highPriorityLevel = highPriorityLevel; this.abortable = server; @@ -97,6 +104,9 @@ this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor( "replication.FPBQ", replicationHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxQueueLength, priority, conf, abortable) : null; + this.largeQueryExecutor = largeQueryHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor( + "LargeQuery.FPBQ", largeQueryHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, + maxQueueLength, priority, conf, abortable) : null; } @@ -117,6 +127,7 @@ */ @Override public void onConfigurationChange(Configuration conf) { + this.isolateLargeQuery = conf.getBoolean("hbase.ipc.server.largequery.isolate", false); callExecutor.resizeQueues(conf); if (priorityExecutor != null) { priorityExecutor.resizeQueues(conf); @@ -124,7 +135,9 @@ if (replicationExecutor != null) { replicationExecutor.resizeQueues(conf); } - + if (largeQueryExecutor != null) { + largeQueryExecutor.resizeQueues(conf); + } String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); if (RpcExecutor.isCodelQueueType(callQueueType)) { @@ -142,6 +155,7 @@ callExecutor.start(port); if (priorityExecutor != null) priorityExecutor.start(port); if (replicationExecutor != null) replicationExecutor.start(port); + if (largeQueryExecutor != null) largeQueryExecutor.start(port); } @Override @@ -149,6 +163,7 @@ callExecutor.stop(); if (priorityExecutor != null) priorityExecutor.stop(); if (replicationExecutor != null) replicationExecutor.stop(); + if (largeQueryExecutor != null) largeQueryExecutor.stop(); } @Override @@ -162,6 +177,12 @@ return priorityExecutor.dispatch(callTask); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { return replicationExecutor.dispatch(callTask); + } else if (isolateLargeQuery && largeQueryExecutor != null + && level == HConstants.LARGEQUERY_QOS) { + if(LOG.isDebugEnabled()) { + LOG.debug(call.toString() + " treat as large Query!"); + } + return largeQueryExecutor.dispatch(callTask); } else { return callExecutor.dispatch(callTask); } @@ -183,10 +204,16 @@ } @Override + public int getLargeQueryQueueLength() { + return largeQueryExecutor == null ? 0 : largeQueryExecutor.getQueueLength(); + } + + @Override public int getActiveRpcHandlerCount() { return callExecutor.getActiveHandlerCount() + (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) + - (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()); + (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()) + + (largeQueryExecutor == null ? 0 : largeQueryExecutor.getActiveHandlerCount()); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index 086cd1e..37a329e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -20,14 +20,15 @@ import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.coprocessor.CoprocessorJudger; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; +import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; @@ -36,9 +37,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; - +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; @@ -93,7 +97,11 @@ private final Map> argumentToClassMap = new HashMap<>(); private final Map, Method>> methodMap = new HashMap<>(); - private final float scanVirtualTimeWeight; + private float scanVirtualTimeWeight; + // if Client call ResultScanner.next() more than maxSeq, think it as a large query + private long maxSeq; + private OffPeakHours offpeak; + private CoprocessorJudger judger; /** * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of @@ -148,6 +156,14 @@ Configuration conf = rpcServices.getConfiguration(); scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f); + // if Client call ResultScanner.next() more than maxSeq, think it as a large query + maxSeq = conf.getLong("hbase.ipc.server.largequery.maxseq", 20); + offpeak = OffPeakHours.getInstance(conf); + Class judgerClass = conf.getClass( + "hbase.regionserver.largequery.judger.class", null, CoprocessorJudger.class); + if (judgerClass != null) { + judger = ReflectionUtils.newInstance(judgerClass); + } } private String capitalize(final String s) { @@ -233,11 +249,28 @@ return HConstants.NORMAL_QOS; } + boolean isOffpeakHour = offpeak.isOffPeakHour(); if (param instanceof ScanRequest) { // scanner methods... ScanRequest request = (ScanRequest)param; + long seqId = request.getNextCallSeq(); + if(!isOffpeakHour && seqId > maxSeq) { + return HConstants.LARGEQUERY_QOS; + } + if (!request.hasScannerId()) { + // request in offpeak hours, return NORMAL_QOS + if (isOffpeakHour) { + return HConstants.NORMAL_QOS; + } + Scan scan = request.getScan(); + String startRow = scan.getStartRow().toStringUtf8(); + String stopRow = scan.getStopRow().toStringUtf8(); + if ("".equals(startRow) || "".equals(stopRow)) { + return HConstants.LARGEQUERY_QOS; + } return HConstants.NORMAL_QOS; } + RegionScanner scanner = rpcServices.getScanner(request.getScannerId()); if (scanner != null && scanner.getRegionInfo().isSystemTable()) { if (LOG.isTraceEnabled()) { @@ -246,8 +279,20 @@ } return HConstants.SYSTEMTABLE_QOS; } + + if (!isOffpeakHour && scanner != null && scanner.isLargeQuery()){ + return HConstants.LARGEQUERY_QOS; + } } + if (judger != null && !isOffpeakHour + && param instanceof CoprocessorServiceRequest) { + CoprocessorServiceRequest request = (CoprocessorServiceRequest)param; + CoprocessorServiceCall call = request.getCall(); + if (judger.isLargeQuery(call.getRequest())){ + return HConstants.LARGEQUERY_QOS; + } + } return HConstants.NORMAL_QOS; } @@ -279,4 +324,11 @@ void setRegionServer(final HRegionServer hrs) { this.rpcServices = hrs.getRSRpcServices(); } + + public void setConf(Configuration conf) { + scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f); + maxSeq = conf.getLong("hbase.ipc.server.largequery.maxseq", 20); + LOG.info("hbase.ipc.server.largequery.maxseq changed to " + maxSeq); + offpeak = OffPeakHours.getInstance(conf); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 50a6cd8..c39879f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -626,6 +626,7 @@ final RegionServerServices rsServices; private RegionServerAccounting rsAccounting; private long flushCheckInterval; + private boolean largeQueryEnableCache = true; // flushPerChanges is to prevent too many changes in memstore private long flushPerChanges; private long blockingMemStoreSize; @@ -5844,7 +5845,9 @@ protected Cell joinedContinuationRow = null; private boolean filterClosed = false; + protected final byte[] startRow; protected final byte[] stopRow; + private boolean largeQuery = false; protected final boolean includeStopRow; protected final HRegion region; protected final CellComparator comparator; @@ -5881,9 +5884,19 @@ */ defaultScannerContext = ScannerContext.newBuilder() .setBatchLimit(scan.getBatch()).build(); + this.startRow = scan.getStartRow(); this.stopRow = scan.getStopRow(); this.includeStopRow = scan.includeStopRow(); - + boolean hasStartRow = startRow != null && + !Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW); + boolean hasStopRow = stopRow != null && + !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW); + if (!scan.isGetScan() && (!hasStartRow || !hasStopRow)) { + this.largeQuery = true; + if (!largeQueryEnableCache) { + scan.setCacheBlocks(false); + } + } // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); @@ -6504,6 +6517,11 @@ // This is the RPC callback method executed. We do the close in of the scanner in this // callback this.close(); + } + + @Override + public boolean isLargeQuery(){ + return largeQuery; } } @@ -8139,7 +8157,7 @@ */ @Override public void onConfigurationChange(Configuration conf) { - // Do nothing for now. + this.largeQueryEnableCache = conf.getBoolean("hbase.regionserver.largequery.cacheblock", true); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 707321d..1ee596b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1237,6 +1237,9 @@ if (rpcServer instanceof ConfigurationObserver) { ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf); } + if (priority instanceof AnnotationReadingPriorityFunction) { + ((AnnotationReadingPriorityFunction)priority).setConf(newConf); + } } protected PriorityFunction createPriority() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index 8d8c051..9c9199d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -117,6 +117,11 @@ throws IOException; /** + * if Scan has no StartKey or EndKey, consider it as largeQuery + */ + boolean isLargeQuery(); + + /** * Empty implementation to provide compatibility for user migrating from 1.X * @see HBASE-16626 */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java index d5f3358..d706c1b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java @@ -1551,6 +1551,11 @@ public void shipped() throws IOException { this.delegate.shipped(); } + + @Override + public boolean isLargeQuery() { + return false; + } } public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 7013c8c..e9acc89 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -142,6 +142,11 @@ } @Override + public boolean isLargeQuery() { + return false; + } + + @Override public void shipped() throws IOException { this.delegate.shipped(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java index 3c8f114..1872368 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java @@ -45,6 +45,11 @@ } @Override + public int getLargeQueryQueueLength() { + return delegate.getLargeQueryQueueLength(); + } + + @Override public int getPriorityQueueLength() { return delegate.getPriorityQueueLength(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java index 3b787a9..03942ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java @@ -35,6 +35,11 @@ } @Override + public int getLargeQueryQueueLength() { + return 100; + } + + @Override public int getPriorityQueueLength() { return 104; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java index 4c4bae4..ff0d4c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java @@ -139,7 +139,7 @@ ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); int qos = priority.getPriority(header, scanRequest, User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})); - assertTrue ("" + qos, qos == HConstants.NORMAL_QOS); + assertTrue ("" + qos, qos == HConstants.LARGEQUERY_QOS); //build a scan request with scannerID scanBuilder = ScanRequest.newBuilder(); @@ -163,5 +163,12 @@ Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false); assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest, User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); + + scanBuilder = ScanRequest.newBuilder(); + scanBuilder.setScannerId(12345); + scanBuilder.setNextCallSeq(30); + scanRequest = scanBuilder.build(); + assertEquals(HConstants.LARGEQUERY_QOS, priority.getPriority(header, scanRequest, + User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); } }