From 9cf2af037fa66ecc96ee82c6d7e1fb52bf9d3caf Mon Sep 17 00:00:00 2001 From: Jonathan Lawlor Date: Thu, 16 Apr 2015 13:04:39 -0700 Subject: [PATCH] HBASE-5980 Scanner responses from RS should include metrics on rows/KVs filtered --- .../hadoop/hbase/client/ScannerCallable.java | 24 +- .../hbase/client/metrics/AbstractScanMetrics.java | 103 ++++ .../hadoop/hbase/client/metrics/ScanMetrics.java | 53 +- .../client/metrics/ServerSideScanMetrics.java | 42 ++ .../hadoop/hbase/protobuf/RequestConverter.java | 16 +- .../hadoop/hbase/protobuf/ResponseConverter.java | 26 + .../hbase/protobuf/generated/ClientProtos.java | 588 +++++++++++++++++---- hbase-protocol/src/main/protobuf/Client.proto | 7 + .../apache/hadoop/hbase/regionserver/HRegion.java | 29 +- .../hbase/regionserver/NoLimitScannerContext.java | 2 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 23 + .../regionserver/ReversedRegionScannerImpl.java | 8 +- .../hadoop/hbase/regionserver/ScannerContext.java | 31 +- .../TestServerSideScanMetricsFromClientSide.java | 208 ++++++++ hbase-shell/src/main/ruby/hbase.rb | 2 + hbase-shell/src/main/ruby/hbase/table.rb | 13 +- hbase-shell/src/main/ruby/shell/commands/scan.rb | 8 +- hbase-shell/src/main/ruby/shell/formatter.rb | 29 + 18 files changed, 1024 insertions(+), 188 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/AbstractScanMetrics.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 714c9fe77836363a324420105929e41a34f1b009..0d1ed781c50135751b46ad6806d08ed180994a30 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.io.InterruptedIOException; import java.net.UnknownHostException; +import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -193,7 +195,9 @@ public class ScannerCallable extends RegionServerCallable { ScanRequest request = null; try { incRPCcallsMetrics(); - request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); + request = + RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, + this.scanMetrics != null); ScanResponse response = null; controller = controllerFactory.newController(); controller.setPriority(getTableName()); @@ -222,6 +226,7 @@ public class ScannerCallable extends RegionServerCallable { + rows + " rows from scanner=" + scannerId); } } + updateServerSideMetrics(response); // moreResults is only used for the case where a filter exhausts all elements if (response.hasMoreResults() && !response.getMoreResults()) { scannerId = -1L; @@ -317,6 +322,21 @@ public class ScannerCallable extends RegionServerCallable { } } + /** + * Use the scan metrics returned by the server to add to the identically named counters in the + * client side metrics. If a counter does not exist with the same name as the server side metric, + * the attempt to increase the counter will fail. + * @param response + */ + private void updateServerSideMetrics(ScanResponse response) { + if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return; + + Map serverMetrics = ResponseConverter.getScanMetrics(response); + for (Entry entry : serverMetrics.entrySet()) { + this.scanMetrics.addToCounter(entry.getKey(), entry.getValue()); + } + } + private void close() { if (this.scannerId == -1L) { return; @@ -324,7 +344,7 @@ public class ScannerCallable extends RegionServerCallable { try { incRPCcallsMetrics(); ScanRequest request = - RequestConverter.buildScanRequest(this.scannerId, 0, true); + RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); try { getStub().scan(null, request); } catch (ServiceException se) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/AbstractScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/AbstractScanMetrics.java new file mode 100644 index 0000000000000000000000000000000000000000..3ed2f00cac745d4be8bc48c234a03d108de250fd --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/AbstractScanMetrics.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.metrics; + + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.collect.ImmutableMap; + +/** + * Helper class for scan metrics + */ +@InterfaceAudience.Private +public abstract class AbstractScanMetrics { + /** + * Hash to hold the String -> Atomic Long mappings for each metric + */ + private final Map counters = new HashMap(); + + /** + * Create a new counter with the specified name + * @param counterName + * @return {@link AtomicLong} instance for the counter with counterName + */ + protected AtomicLong createCounter(String counterName) { + AtomicLong c = new AtomicLong(0); + counters.put(counterName, c); + return c; + } + + /** + * @param counterName + * @param value + */ + public void setCounter(String counterName, long value) { + AtomicLong c = this.counters.get(counterName); + if (c != null) { + c.set(value); + } + } + + /** + * @param counterName + * @return true if a counter exists with the counterName + */ + public boolean hasCounter(String counterName) { + return this.counters.containsKey(counterName); + } + + /** + * @param counterName + * @return {@link AtomicLong} instance for this counter name, null if counter does not exist. + */ + public AtomicLong getCounter(String counterName) { + return this.counters.get(counterName); + } + + /** + * @param counterName + * @param delta + */ + public void addToCounter(String counterName, long delta) { + AtomicLong c = this.counters.get(counterName); + if (c != null) { + c.addAndGet(delta); + } + } + + /** + * Get all of the values since the last time this function was called. Calling this function will + * reset all AtomicLongs in the instance back to 0. + * @return A Map of String -> Long for metrics + */ + public Map getMetricsMap() { + // Create a builder + ImmutableMap.Builder builder = ImmutableMap.builder(); + // For every entry add the value and reset the AtomicLong back to zero + for (Map.Entry e : this.counters.entrySet()) { + builder.put(e.getKey(), e.getValue().getAndSet(0)); + } + // Build the immutable map so that people can't mess around with it. + return builder.build(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java index 35c66678c938a9b0d18f203e97e84fef56a7b35f..ec2c937c834fd4935462b5d089ccfdcb8f47dfcd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java @@ -18,41 +18,32 @@ package org.apache.hadoop.hbase.client.metrics; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import com.google.common.collect.ImmutableMap; - /** - * Provides client-side metrics related to scan operations. + * Provides metrics related to scan operations (both server side and client side metrics). + *

* The data can be passed to mapreduce framework or other systems. * We use atomic longs so that one thread can increment, * while another atomically resets to zero after the values are reported * to hadoop's counters. - * + *

* Some of these metrics are general for any client operation such as put * However, there is no need for this. So they are defined under scan operation * for now. */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class ScanMetrics { - - /** - * Hash to hold the String -> Atomic Long mappings. - */ - private final Map counters = new HashMap(); +public class ScanMetrics extends ServerSideScanMetrics { - // AtomicLongs to hold the metrics values. These are all updated through ClientScanner and - // ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the + // AtomicLongs to hold the metrics values. These are all updated through ClientScanner and + // ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the // values after progress is passed to hadoop's counters. - /** * number of RPC calls */ @@ -103,36 +94,4 @@ public class ScanMetrics { */ public ScanMetrics() { } - - private AtomicLong createCounter(String counterName) { - AtomicLong c = new AtomicLong(0); - counters.put(counterName, c); - return c; - } - - public void setCounter(String counterName, long value) { - AtomicLong c = this.counters.get(counterName); - if (c != null) { - c.set(value); - } - } - - /** - * Get all of the values since the last time this function was called. - * - * Calling this function will reset all AtomicLongs in the instance back to 0. - * - * @return A Map of String -> Long for metrics - */ - public Map getMetricsMap() { - //Create a builder - ImmutableMap.Builder builder = ImmutableMap.builder(); - //For every entry add the value and reset the AtomicLong back to zero - for (Map.Entry e : this.counters.entrySet()) { - builder.put(e.getKey(), e.getValue().getAndSet(0)); - } - //Build the immutable map so that people can't mess around with it. - return builder.build(); - } - } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java new file mode 100644 index 0000000000000000000000000000000000000000..13165a492a930f36f363cf4700fc4267483a400b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.metrics; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Provides server side metrics related to scan operations. + */ +@InterfaceAudience.Private +public class ServerSideScanMetrics extends AbstractScanMetrics { + public static final String COUNT_OF_ROWS_SCANNED_KEY = "ROWS_SCANNED"; + public static final String COUNT_OF_ROWS_FILTERED_KEY = "ROWS_FILTERED"; + + /** + * number of rows filtered during scan RPC + */ + public final AtomicLong countOfRowsFiltered = createCounter(COUNT_OF_ROWS_FILTERED_KEY); + + /** + * number of rows scanned during scan RPC. Not every row scanned will be returned to the client + * since rows may be filtered. + */ + public final AtomicLong countOfRowsScanned = createCounter(COUNT_OF_ROWS_SCANNED_KEY); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 16c3dbfbc7bf3061bbb9931e3024f9d43076d1ae..2a59e1995d6276acba1f297320595830f4b8ef5d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -61,6 +60,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; @@ -478,9 +478,8 @@ public final class RequestConverter { * @return a scan request * @throws IOException */ - public static ScanRequest buildScanRequest(final byte[] regionName, - final Scan scan, final int numberOfRows, - final boolean closeScanner) throws IOException { + public static ScanRequest buildScanRequest(final byte[] regionName, final Scan scan, + final int numberOfRows, final boolean closeScanner) throws IOException { ScanRequest.Builder builder = ScanRequest.newBuilder(); RegionSpecifier region = buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -489,6 +488,7 @@ public final class RequestConverter { builder.setRegion(region); builder.setScan(ProtobufUtil.toScan(scan)); builder.setClientHandlesPartials(true); + builder.setTrackScanMetrics(scan != null && scan.isScanMetricsEnabled()); return builder.build(); } @@ -500,13 +500,14 @@ public final class RequestConverter { * @param closeScanner * @return a scan request */ - public static ScanRequest buildScanRequest(final long scannerId, - final int numberOfRows, final boolean closeScanner) { + public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, + final boolean closeScanner, final boolean trackMetrics) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); builder.setScannerId(scannerId); builder.setClientHandlesPartials(true); + builder.setTrackScanMetrics(trackMetrics); return builder.build(); } @@ -520,13 +521,14 @@ public final class RequestConverter { * @return a scan request */ public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, - final boolean closeScanner, final long nextCallSeq) { + final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); builder.setScannerId(scannerId); builder.setNextCallSeq(nextCallSeq); builder.setClientHandlesPartials(true); + builder.setTrackScanMetrics(trackMetrics); return builder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 65eaddec8f8e8daa16e6eacf819bf540d75ee4af..cef71472d8c8ebba065ec97e28428565584e47ae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.protobuf; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair; +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; @@ -375,4 +379,26 @@ public final class ResponseConverter { } return results; } + + public static Map getScanMetrics(ScanResponse response) { + Map metricMap = new HashMap(); + if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) { + return metricMap; + } + + ScanMetrics metrics = response.getScanMetrics(); + int numberOfMetrics = metrics.getMetricsCount(); + for (int i = 0; i < numberOfMetrics; i++) { + NameInt64Pair metricPair = metrics.getMetrics(i); + if (metricPair != null) { + String name = metricPair.getName(); + Long value = metricPair.getValue(); + if (name != null && value != null) { + metricMap.put(name, value); + } + } + } + + return metricMap; + } } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 60ab6515d8de46190146469b2e8a65e9e0bc1826..000382acd2c9d16d1b8b4b16be416d9e3055478e 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -16433,6 +16433,16 @@ public final class ClientProtos { * optional bool client_handles_partials = 7; */ boolean getClientHandlesPartials(); + + // optional bool track_scan_metrics = 8; + /** + * optional bool track_scan_metrics = 8; + */ + boolean hasTrackScanMetrics(); + /** + * optional bool track_scan_metrics = 8; + */ + boolean getTrackScanMetrics(); } /** * Protobuf type {@code ScanRequest} @@ -16549,6 +16559,11 @@ public final class ClientProtos { clientHandlesPartials_ = input.readBool(); break; } + case 64: { + bitField0_ |= 0x00000080; + trackScanMetrics_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -16713,6 +16728,22 @@ public final class ClientProtos { return clientHandlesPartials_; } + // optional bool track_scan_metrics = 8; + public static final int TRACK_SCAN_METRICS_FIELD_NUMBER = 8; + private boolean trackScanMetrics_; + /** + * optional bool track_scan_metrics = 8; + */ + public boolean hasTrackScanMetrics() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool track_scan_metrics = 8; + */ + public boolean getTrackScanMetrics() { + return trackScanMetrics_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -16721,6 +16752,7 @@ public final class ClientProtos { closeScanner_ = false; nextCallSeq_ = 0L; clientHandlesPartials_ = false; + trackScanMetrics_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -16767,6 +16799,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeBool(7, clientHandlesPartials_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeBool(8, trackScanMetrics_); + } getUnknownFields().writeTo(output); } @@ -16804,6 +16839,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(7, clientHandlesPartials_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(8, trackScanMetrics_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -16862,6 +16901,11 @@ public final class ClientProtos { result = result && (getClientHandlesPartials() == other.getClientHandlesPartials()); } + result = result && (hasTrackScanMetrics() == other.hasTrackScanMetrics()); + if (hasTrackScanMetrics()) { + result = result && (getTrackScanMetrics() + == other.getTrackScanMetrics()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -16903,6 +16947,10 @@ public final class ClientProtos { hash = (37 * hash) + CLIENT_HANDLES_PARTIALS_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getClientHandlesPartials()); } + if (hasTrackScanMetrics()) { + hash = (37 * hash) + TRACK_SCAN_METRICS_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getTrackScanMetrics()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -17049,6 +17097,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000020); clientHandlesPartials_ = false; bitField0_ = (bitField0_ & ~0x00000040); + trackScanMetrics_ = false; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -17113,6 +17163,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000040; } result.clientHandlesPartials_ = clientHandlesPartials_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000080; + } + result.trackScanMetrics_ = trackScanMetrics_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -17150,6 +17204,9 @@ public final class ClientProtos { if (other.hasClientHandlesPartials()) { setClientHandlesPartials(other.getClientHandlesPartials()); } + if (other.hasTrackScanMetrics()) { + setTrackScanMetrics(other.getTrackScanMetrics()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -17588,6 +17645,39 @@ public final class ClientProtos { return this; } + // optional bool track_scan_metrics = 8; + private boolean trackScanMetrics_ ; + /** + * optional bool track_scan_metrics = 8; + */ + public boolean hasTrackScanMetrics() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool track_scan_metrics = 8; + */ + public boolean getTrackScanMetrics() { + return trackScanMetrics_; + } + /** + * optional bool track_scan_metrics = 8; + */ + public Builder setTrackScanMetrics(boolean value) { + bitField0_ |= 0x00000080; + trackScanMetrics_ = value; + onChanged(); + return this; + } + /** + * optional bool track_scan_metrics = 8; + */ + public Builder clearTrackScanMetrics() { + bitField0_ = (bitField0_ & ~0x00000080); + trackScanMetrics_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanRequest) } @@ -17806,6 +17896,32 @@ public final class ClientProtos { * */ boolean getMoreResultsInRegion(); + + // optional .ScanMetrics scan_metrics = 9; + /** + * optional .ScanMetrics scan_metrics = 9; + * + *

+     * TODO: fill out docs
+     * 
+ */ + boolean hasScanMetrics(); + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+     * TODO: fill out docs
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics(); + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+     * TODO: fill out docs
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder(); } /** * Protobuf type {@code ScanResponse} @@ -17939,6 +18055,19 @@ public final class ClientProtos { moreResultsInRegion_ = input.readBool(); break; } + case 74: { + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder subBuilder = null; + if (((bitField0_ & 0x00000020) == 0x00000020)) { + subBuilder = scanMetrics_.toBuilder(); + } + scanMetrics_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(scanMetrics_); + scanMetrics_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000020; + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -18252,6 +18381,40 @@ public final class ClientProtos { return moreResultsInRegion_; } + // optional .ScanMetrics scan_metrics = 9; + public static final int SCAN_METRICS_FIELD_NUMBER = 9; + private org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics scanMetrics_; + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+     * TODO: fill out docs
+     * 
+ */ + public boolean hasScanMetrics() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+     * TODO: fill out docs
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics() { + return scanMetrics_; + } + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+     * TODO: fill out docs
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder() { + return scanMetrics_; + } + private void initFields() { cellsPerResult_ = java.util.Collections.emptyList(); scannerId_ = 0L; @@ -18261,6 +18424,7 @@ public final class ClientProtos { stale_ = false; partialFlagPerResult_ = java.util.Collections.emptyList(); moreResultsInRegion_ = false; + scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -18298,6 +18462,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeBool(8, moreResultsInRegion_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeMessage(9, scanMetrics_); + } getUnknownFields().writeTo(output); } @@ -18346,6 +18513,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(8, moreResultsInRegion_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(9, scanMetrics_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -18400,6 +18571,11 @@ public final class ClientProtos { result = result && (getMoreResultsInRegion() == other.getMoreResultsInRegion()); } + result = result && (hasScanMetrics() == other.hasScanMetrics()); + if (hasScanMetrics()) { + result = result && getScanMetrics() + .equals(other.getScanMetrics()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -18445,6 +18621,10 @@ public final class ClientProtos { hash = (37 * hash) + MORE_RESULTS_IN_REGION_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getMoreResultsInRegion()); } + if (hasScanMetrics()) { + hash = (37 * hash) + SCAN_METRICS_FIELD_NUMBER; + hash = (53 * hash) + getScanMetrics().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -18553,6 +18733,7 @@ public final class ClientProtos { private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { getResultsFieldBuilder(); + getScanMetricsFieldBuilder(); } } private static Builder create() { @@ -18581,6 +18762,12 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000040); moreResultsInRegion_ = false; bitField0_ = (bitField0_ & ~0x00000080); + if (scanMetricsBuilder_ == null) { + scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance(); + } else { + scanMetricsBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000100); return this; } @@ -18648,6 +18835,14 @@ public final class ClientProtos { to_bitField0_ |= 0x00000010; } result.moreResultsInRegion_ = moreResultsInRegion_; + if (((from_bitField0_ & 0x00000100) == 0x00000100)) { + to_bitField0_ |= 0x00000020; + } + if (scanMetricsBuilder_ == null) { + result.scanMetrics_ = scanMetrics_; + } else { + result.scanMetrics_ = scanMetricsBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18725,6 +18920,9 @@ public final class ClientProtos { if (other.hasMoreResultsInRegion()) { setMoreResultsInRegion(other.getMoreResultsInRegion()); } + if (other.hasScanMetrics()) { + mergeScanMetrics(other.getScanMetrics()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -19561,6 +19759,159 @@ public final class ClientProtos { return this; } + // optional .ScanMetrics scan_metrics = 9; + private org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder> scanMetricsBuilder_; + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+       * TODO: fill out docs
+       * 
+ */ + public boolean hasScanMetrics() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+       * TODO: fill out docs
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics() { + if (scanMetricsBuilder_ == null) { + return scanMetrics_; + } else { + return scanMetricsBuilder_.getMessage(); + } + } + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+       * TODO: fill out docs
+       * 
+ */ + public Builder setScanMetrics(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics value) { + if (scanMetricsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + scanMetrics_ = value; + onChanged(); + } else { + scanMetricsBuilder_.setMessage(value); + } + bitField0_ |= 0x00000100; + return this; + } + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+       * TODO: fill out docs
+       * 
+ */ + public Builder setScanMetrics( + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder builderForValue) { + if (scanMetricsBuilder_ == null) { + scanMetrics_ = builderForValue.build(); + onChanged(); + } else { + scanMetricsBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000100; + return this; + } + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+       * TODO: fill out docs
+       * 
+ */ + public Builder mergeScanMetrics(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics value) { + if (scanMetricsBuilder_ == null) { + if (((bitField0_ & 0x00000100) == 0x00000100) && + scanMetrics_ != org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance()) { + scanMetrics_ = + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.newBuilder(scanMetrics_).mergeFrom(value).buildPartial(); + } else { + scanMetrics_ = value; + } + onChanged(); + } else { + scanMetricsBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000100; + return this; + } + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+       * TODO: fill out docs
+       * 
+ */ + public Builder clearScanMetrics() { + if (scanMetricsBuilder_ == null) { + scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance(); + onChanged(); + } else { + scanMetricsBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000100); + return this; + } + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+       * TODO: fill out docs
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder getScanMetricsBuilder() { + bitField0_ |= 0x00000100; + onChanged(); + return getScanMetricsFieldBuilder().getBuilder(); + } + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+       * TODO: fill out docs
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder() { + if (scanMetricsBuilder_ != null) { + return scanMetricsBuilder_.getMessageOrBuilder(); + } else { + return scanMetrics_; + } + } + /** + * optional .ScanMetrics scan_metrics = 9; + * + *
+       * TODO: fill out docs
+       * 
+ */ + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder> + getScanMetricsFieldBuilder() { + if (scanMetricsBuilder_ == null) { + scanMetricsBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder>( + scanMetrics_, + getParentForChildren(), + isClean()); + scanMetrics_ = null; + } + return scanMetricsBuilder_; + } + // @@protoc_insertion_point(builder_scope:ScanResponse) } @@ -32632,121 +32983,123 @@ public final class ClientProtos { static { java.lang.String[] descriptorData = { "\n\014Client.proto\032\013HBase.proto\032\014Filter.prot" + - "o\032\nCell.proto\032\020Comparator.proto\"\037\n\016Autho" + - "rizations\022\r\n\005label\030\001 \003(\t\"$\n\016CellVisibili" + - "ty\022\022\n\nexpression\030\001 \002(\t\"+\n\006Column\022\016\n\006fami" + - "ly\030\001 \002(\014\022\021\n\tqualifier\030\002 \003(\014\"\324\002\n\003Get\022\013\n\003r" + - "ow\030\001 \002(\014\022\027\n\006column\030\002 \003(\0132\007.Column\022!\n\tatt" + - "ribute\030\003 \003(\0132\016.NameBytesPair\022\027\n\006filter\030\004" + - " \001(\0132\007.Filter\022\036\n\ntime_range\030\005 \001(\0132\n.Time" + - "Range\022\027\n\014max_versions\030\006 \001(\r:\0011\022\032\n\014cache_" + - "blocks\030\007 \001(\010:\004true\022\023\n\013store_limit\030\010 \001(\r\022", - "\024\n\014store_offset\030\t \001(\r\022\035\n\016existence_only\030" + - "\n \001(\010:\005false\022!\n\022closest_row_before\030\013 \001(\010" + - ":\005false\022)\n\013consistency\030\014 \001(\0162\014.Consisten" + - "cy:\006STRONG\"z\n\006Result\022\023\n\004cell\030\001 \003(\0132\005.Cel" + - "l\022\035\n\025associated_cell_count\030\002 \001(\005\022\016\n\006exis" + - "ts\030\003 \001(\010\022\024\n\005stale\030\004 \001(\010:\005false\022\026\n\007partia" + - "l\030\005 \001(\010:\005false\"A\n\nGetRequest\022 \n\006region\030\001" + - " \002(\0132\020.RegionSpecifier\022\021\n\003get\030\002 \002(\0132\004.Ge" + - "t\"&\n\013GetResponse\022\027\n\006result\030\001 \001(\0132\007.Resul" + - "t\"\200\001\n\tCondition\022\013\n\003row\030\001 \002(\014\022\016\n\006family\030\002", - " \002(\014\022\021\n\tqualifier\030\003 \002(\014\022\"\n\014compare_type\030" + - "\004 \002(\0162\014.CompareType\022\037\n\ncomparator\030\005 \002(\0132" + - "\013.Comparator\"\265\006\n\rMutationProto\022\013\n\003row\030\001 " + - "\001(\014\0220\n\013mutate_type\030\002 \001(\0162\033.MutationProto" + - ".MutationType\0220\n\014column_value\030\003 \003(\0132\032.Mu" + - "tationProto.ColumnValue\022\021\n\ttimestamp\030\004 \001" + - "(\004\022!\n\tattribute\030\005 \003(\0132\016.NameBytesPair\022:\n" + - "\ndurability\030\006 \001(\0162\031.MutationProto.Durabi" + - "lity:\013USE_DEFAULT\022\036\n\ntime_range\030\007 \001(\0132\n." + - "TimeRange\022\035\n\025associated_cell_count\030\010 \001(\005", - "\022\r\n\005nonce\030\t \001(\004\032\347\001\n\013ColumnValue\022\016\n\006famil" + - "y\030\001 \002(\014\022B\n\017qualifier_value\030\002 \003(\0132).Mutat" + - "ionProto.ColumnValue.QualifierValue\032\203\001\n\016" + - "QualifierValue\022\021\n\tqualifier\030\001 \001(\014\022\r\n\005val" + - "ue\030\002 \001(\014\022\021\n\ttimestamp\030\003 \001(\004\022.\n\013delete_ty" + - "pe\030\004 \001(\0162\031.MutationProto.DeleteType\022\014\n\004t" + - "ags\030\005 \001(\014\"W\n\nDurability\022\017\n\013USE_DEFAULT\020\000" + - "\022\014\n\010SKIP_WAL\020\001\022\r\n\tASYNC_WAL\020\002\022\014\n\010SYNC_WA" + - "L\020\003\022\r\n\tFSYNC_WAL\020\004\">\n\014MutationType\022\n\n\006AP" + - "PEND\020\000\022\r\n\tINCREMENT\020\001\022\007\n\003PUT\020\002\022\n\n\006DELETE", - "\020\003\"p\n\nDeleteType\022\026\n\022DELETE_ONE_VERSION\020\000" + - "\022\034\n\030DELETE_MULTIPLE_VERSIONS\020\001\022\021\n\rDELETE" + - "_FAMILY\020\002\022\031\n\025DELETE_FAMILY_VERSION\020\003\"\207\001\n" + - "\rMutateRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" + - "pecifier\022 \n\010mutation\030\002 \002(\0132\016.MutationPro" + - "to\022\035\n\tcondition\030\003 \001(\0132\n.Condition\022\023\n\013non" + - "ce_group\030\004 \001(\004\"<\n\016MutateResponse\022\027\n\006resu" + - "lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\271\003\n" + - "\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattrib" + - "ute\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003", - " \001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007" + - ".Filter\022\036\n\ntime_range\030\006 \001(\0132\n.TimeRange\022" + - "\027\n\014max_versions\030\007 \001(\r:\0011\022\032\n\014cache_blocks" + - "\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017max_" + - "result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(\r\022\024" + - "\n\014store_offset\030\014 \001(\r\022&\n\036load_column_fami" + - "lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" + - "eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" + - "\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" + - "\"\277\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio", - "nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" + - "er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" + - "lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" + - "\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\311\001\n\014S" + - "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" + - "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" + - "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" + - "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" + - "\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\"\263" + - "\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132", - "\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132 " + - ".BulkLoadHFileRequest.FamilyPath\022\026\n\016assi" + - "gn_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family" + - "\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRes" + - "ponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServ" + - "iceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002" + - "(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014" + - "\"9\n\030CoprocessorServiceResult\022\035\n\005value\030\001 " + - "\001(\0132\016.NameBytesPair\"d\n\031CoprocessorServic" + - "eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi", - "er\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCal" + - "l\"]\n\032CoprocessorServiceResponse\022 \n\006regio" + - "n\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\013" + - "2\016.NameBytesPair\"{\n\006Action\022\r\n\005index\030\001 \001(" + - "\r\022 \n\010mutation\030\002 \001(\0132\016.MutationProto\022\021\n\003g" + - "et\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.C" + - "oprocessorServiceCall\"Y\n\014RegionAction\022 \n" + - "\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomi" + - "c\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"D\n\017Regi" + - "onLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\r", - "heapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrExcep" + - "tion\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Re" + - "sult\022!\n\texception\030\003 \001(\0132\016.NameBytesPair\022" + - "1\n\016service_result\030\004 \001(\0132\031.CoprocessorSer" + - "viceResult\022#\n\tloadStats\030\005 \001(\0132\020.RegionLo" + - "adStats\"f\n\022RegionActionResult\022-\n\021resultO" + - "rException\030\001 \003(\0132\022.ResultOrException\022!\n\t" + - "exception\030\002 \001(\0132\016.NameBytesPair\"f\n\014Multi" + - "Request\022#\n\014regionAction\030\001 \003(\0132\r.RegionAc" + - "tion\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 ", - "\001(\0132\n.Condition\"S\n\rMultiResponse\022/\n\022regi" + - "onActionResult\030\001 \003(\0132\023.RegionActionResul" + - "t\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006S" + - "TRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClientService\022" + - " \n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mut" + - "ate\022\016.MutateRequest\032\017.MutateResponse\022#\n\004" + - "Scan\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBul" + - "kLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bulk" + - "LoadHFileResponse\022F\n\013ExecService\022\032.Copro" + - "cessorServiceRequest\032\033.CoprocessorServic", - "eResponse\022R\n\027ExecRegionServerService\022\032.C" + - "oprocessorServiceRequest\032\033.CoprocessorSe" + - "rviceResponse\022&\n\005Multi\022\r.MultiRequest\032\016." + - "MultiResponseBB\n*org.apache.hadoop.hbase" + - ".protobuf.generatedB\014ClientProtosH\001\210\001\001\240\001" + - "\001" + "o\032\nCell.proto\032\020Comparator.proto\032\017MapRedu" + + "ce.proto\"\037\n\016Authorizations\022\r\n\005label\030\001 \003(" + + "\t\"$\n\016CellVisibility\022\022\n\nexpression\030\001 \002(\t\"" + + "+\n\006Column\022\016\n\006family\030\001 \002(\014\022\021\n\tqualifier\030\002" + + " \003(\014\"\324\002\n\003Get\022\013\n\003row\030\001 \002(\014\022\027\n\006column\030\002 \003(" + + "\0132\007.Column\022!\n\tattribute\030\003 \003(\0132\016.NameByte" + + "sPair\022\027\n\006filter\030\004 \001(\0132\007.Filter\022\036\n\ntime_r" + + "ange\030\005 \001(\0132\n.TimeRange\022\027\n\014max_versions\030\006" + + " \001(\r:\0011\022\032\n\014cache_blocks\030\007 \001(\010:\004true\022\023\n\013s", + "tore_limit\030\010 \001(\r\022\024\n\014store_offset\030\t \001(\r\022\035" + + "\n\016existence_only\030\n \001(\010:\005false\022!\n\022closest" + + "_row_before\030\013 \001(\010:\005false\022)\n\013consistency\030" + + "\014 \001(\0162\014.Consistency:\006STRONG\"z\n\006Result\022\023\n" + + "\004cell\030\001 \003(\0132\005.Cell\022\035\n\025associated_cell_co" + + "unt\030\002 \001(\005\022\016\n\006exists\030\003 \001(\010\022\024\n\005stale\030\004 \001(\010" + + ":\005false\022\026\n\007partial\030\005 \001(\010:\005false\"A\n\nGetRe" + + "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + + "\021\n\003get\030\002 \002(\0132\004.Get\"&\n\013GetResponse\022\027\n\006res" + + "ult\030\001 \001(\0132\007.Result\"\200\001\n\tCondition\022\013\n\003row\030", + "\001 \002(\014\022\016\n\006family\030\002 \002(\014\022\021\n\tqualifier\030\003 \002(\014" + + "\022\"\n\014compare_type\030\004 \002(\0162\014.CompareType\022\037\n\n" + + "comparator\030\005 \002(\0132\013.Comparator\"\265\006\n\rMutati" + + "onProto\022\013\n\003row\030\001 \001(\014\0220\n\013mutate_type\030\002 \001(" + + "\0162\033.MutationProto.MutationType\0220\n\014column" + + "_value\030\003 \003(\0132\032.MutationProto.ColumnValue" + + "\022\021\n\ttimestamp\030\004 \001(\004\022!\n\tattribute\030\005 \003(\0132\016" + + ".NameBytesPair\022:\n\ndurability\030\006 \001(\0162\031.Mut" + + "ationProto.Durability:\013USE_DEFAULT\022\036\n\nti" + + "me_range\030\007 \001(\0132\n.TimeRange\022\035\n\025associated", + "_cell_count\030\010 \001(\005\022\r\n\005nonce\030\t \001(\004\032\347\001\n\013Col" + + "umnValue\022\016\n\006family\030\001 \002(\014\022B\n\017qualifier_va" + + "lue\030\002 \003(\0132).MutationProto.ColumnValue.Qu" + + "alifierValue\032\203\001\n\016QualifierValue\022\021\n\tquali" + + "fier\030\001 \001(\014\022\r\n\005value\030\002 \001(\014\022\021\n\ttimestamp\030\003" + + " \001(\004\022.\n\013delete_type\030\004 \001(\0162\031.MutationProt" + + "o.DeleteType\022\014\n\004tags\030\005 \001(\014\"W\n\nDurability" + + "\022\017\n\013USE_DEFAULT\020\000\022\014\n\010SKIP_WAL\020\001\022\r\n\tASYNC" + + "_WAL\020\002\022\014\n\010SYNC_WAL\020\003\022\r\n\tFSYNC_WAL\020\004\">\n\014M" + + "utationType\022\n\n\006APPEND\020\000\022\r\n\tINCREMENT\020\001\022\007", + "\n\003PUT\020\002\022\n\n\006DELETE\020\003\"p\n\nDeleteType\022\026\n\022DEL" + + "ETE_ONE_VERSION\020\000\022\034\n\030DELETE_MULTIPLE_VER" + + "SIONS\020\001\022\021\n\rDELETE_FAMILY\020\002\022\031\n\025DELETE_FAM" + + "ILY_VERSION\020\003\"\207\001\n\rMutateRequest\022 \n\006regio" + + "n\030\001 \002(\0132\020.RegionSpecifier\022 \n\010mutation\030\002 " + + "\002(\0132\016.MutationProto\022\035\n\tcondition\030\003 \001(\0132\n" + + ".Condition\022\023\n\013nonce_group\030\004 \001(\004\"<\n\016Mutat" + + "eResponse\022\027\n\006result\030\001 \001(\0132\007.Result\022\021\n\tpr" + + "ocessed\030\002 \001(\010\"\271\003\n\004Scan\022\027\n\006column\030\001 \003(\0132\007" + + ".Column\022!\n\tattribute\030\002 \003(\0132\016.NameBytesPa", + "ir\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_row\030\004 \001(\014\022" + + "\027\n\006filter\030\005 \001(\0132\007.Filter\022\036\n\ntime_range\030\006" + + " \001(\0132\n.TimeRange\022\027\n\014max_versions\030\007 \001(\r:\001" + + "1\022\032\n\014cache_blocks\030\010 \001(\010:\004true\022\022\n\nbatch_s" + + "ize\030\t \001(\r\022\027\n\017max_result_size\030\n \001(\004\022\023\n\013st" + + "ore_limit\030\013 \001(\r\022\024\n\014store_offset\030\014 \001(\r\022&\n" + + "\036load_column_families_on_demand\030\r \001(\010\022\r\n" + + "\005small\030\016 \001(\010\022\027\n\010reversed\030\017 \001(\010:\005false\022)\n" + + "\013consistency\030\020 \001(\0162\014.Consistency:\006STRONG" + + "\022\017\n\007caching\030\021 \001(\r\"\333\001\n\013ScanRequest\022 \n\006reg", + "ion\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(" + + "\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_o" + + "f_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rn" + + "ext_call_seq\030\006 \001(\004\022\037\n\027client_handles_par" + + "tials\030\007 \001(\010\022\032\n\022track_scan_metrics\030\010 \001(\010\"" + + "\355\001\n\014ScanResponse\022\030\n\020cells_per_result\030\001 \003" + + "(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003" + + " \001(\010\022\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Res" + + "ult\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_r" + + "esult\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 ", + "\001(\010\022\"\n\014scan_metrics\030\t \001(\0132\014.ScanMetrics\"" + + "\263\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\013" + + "2\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132" + + " .BulkLoadHFileRequest.FamilyPath\022\026\n\016ass" + + "ign_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006famil" + + "y\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRe" + + "sponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorSer" + + "viceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 " + + "\002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(" + + "\014\"9\n\030CoprocessorServiceResult\022\035\n\005value\030\001", + " \001(\0132\016.NameBytesPair\"d\n\031CoprocessorServi" + + "ceRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" + + "ier\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCa" + + "ll\"]\n\032CoprocessorServiceResponse\022 \n\006regi" + + "on\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(" + + "\0132\016.NameBytesPair\"{\n\006Action\022\r\n\005index\030\001 \001" + + "(\r\022 \n\010mutation\030\002 \001(\0132\016.MutationProto\022\021\n\003" + + "get\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027." + + "CoprocessorServiceCall\"Y\n\014RegionAction\022 " + + "\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atom", + "ic\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"D\n\017Reg" + + "ionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n" + + "\rheapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrExce" + + "ption\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.R" + + "esult\022!\n\texception\030\003 \001(\0132\016.NameBytesPair" + + "\0221\n\016service_result\030\004 \001(\0132\031.CoprocessorSe" + + "rviceResult\022#\n\tloadStats\030\005 \001(\0132\020.RegionL" + + "oadStats\"f\n\022RegionActionResult\022-\n\021result" + + "OrException\030\001 \003(\0132\022.ResultOrException\022!\n" + + "\texception\030\002 \001(\0132\016.NameBytesPair\"f\n\014Mult", + "iRequest\022#\n\014regionAction\030\001 \003(\0132\r.RegionA" + + "ction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003" + + " \001(\0132\n.Condition\"S\n\rMultiResponse\022/\n\022reg" + + "ionActionResult\030\001 \003(\0132\023.RegionActionResu" + + "lt\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006" + + "STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClientService" + + "\022 \n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mu" + + "tate\022\016.MutateRequest\032\017.MutateResponse\022#\n" + + "\004Scan\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBu" + + "lkLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bul", + "kLoadHFileResponse\022F\n\013ExecService\022\032.Copr" + + "ocessorServiceRequest\032\033.CoprocessorServi" + + "ceResponse\022R\n\027ExecRegionServerService\022\032." + + "CoprocessorServiceRequest\032\033.CoprocessorS" + + "erviceResponse\022&\n\005Multi\022\r.MultiRequest\032\016" + + ".MultiResponseBB\n*org.apache.hadoop.hbas" + + "e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" + + "\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -32842,13 +33195,13 @@ public final class ClientProtos { internal_static_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "TrackScanMetrics", }); internal_static_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_ScanResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanResponse_descriptor, - new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", }); + new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "ScanMetrics", }); internal_static_BulkLoadHFileRequest_descriptor = getDescriptor().getMessageTypes().get(14); internal_static_BulkLoadHFileRequest_fieldAccessorTable = new @@ -32943,6 +33296,7 @@ public final class ClientProtos { org.apache.hadoop.hbase.protobuf.generated.FilterProtos.getDescriptor(), org.apache.hadoop.hbase.protobuf.generated.CellProtos.getDescriptor(), org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos.getDescriptor(), + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.getDescriptor(), }, assigner); } diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index e0c370b3c4f5d18d123d3e1b1013b0266eb4ad9e..2ec8384dad7d3b8e80ce9b4ee885e712fc86f8c9 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -28,6 +28,7 @@ import "HBase.proto"; import "Filter.proto"; import "Cell.proto"; import "Comparator.proto"; +import "MapReduce.proto"; /** * The protocol buffer version of Authorizations. @@ -275,6 +276,7 @@ message ScanRequest { optional bool close_scanner = 5; optional uint64 next_call_seq = 6; optional bool client_handles_partials = 7; + optional bool track_scan_metrics = 8; } /** @@ -313,6 +315,11 @@ message ScanResponse { // reasons such as the size in bytes or quantity of results accumulated. This field // will true when more results exist in the current region. optional bool more_results_in_region = 8; + + // This field is filled in if the client has requested that scan metrics be tracked. + // The metrics tracked here are sent back to the client to be tracked together with + // the existing client side metrics. + optional ScanMetrics scan_metrics = 9; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e082698f7209a6926f4765b0a6d19584c871c381..8c4353f036e09dbebab98078b530a407385df207 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5518,7 +5518,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Check if rowkey filter wants to exclude this row. If so, loop to next. // Technically, if we hit limits before on this row, we don't need this call. if (filterRowKey(currentRow, offset, length)) { - boolean moreRows = nextRow(currentRow, offset, length); + incrementCountOfRowsFilteredMetric(scannerContext); + boolean moreRows = nextRow(scannerContext, currentRow, offset, length); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -5562,9 +5563,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) { + if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { + incrementCountOfRowsFilteredMetric(scannerContext); results.clear(); - boolean moreRows = nextRow(currentRow, offset, length); + boolean moreRows = nextRow(scannerContext, currentRow, offset, length); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -5614,7 +5616,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Double check to prevent empty rows from appearing in result. It could be // the case when SingleColumnValueExcludeFilter is used. if (results.isEmpty()) { - boolean moreRows = nextRow(currentRow, offset, length); + incrementCountOfRowsFilteredMetric(scannerContext); + boolean moreRows = nextRow(scannerContext, currentRow, offset, length); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -5622,6 +5625,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // We are done. Return the result. + incrementCountOfRowsScannedMetric(scannerContext); if (stopRow) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } else { @@ -5630,6 +5634,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { + if (scannerContext == null || !scannerContext.isTrackingMetrics()) return; + + scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); + } + + protected void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { + if (scannerContext == null || !scannerContext.isTrackingMetrics()) return; + + scannerContext.getMetrics().countOfRowsScanned.incrementAndGet(); + } + /** * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines * both filterRow & filterRow(List kvs) functions. While 0.94 code or older, it may @@ -5649,7 +5665,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi && filter.filterRowKey(row, offset, length); } - protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException { + protected boolean nextRow(ScannerContext scannerContext, byte[] currentRow, int offset, + short length) throws IOException { assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read."; Cell next; while ((next = this.storeHeap.peek()) != null && @@ -5657,6 +5674,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.storeHeap.next(MOCKED_LIST); } resetFilters(); + incrementCountOfRowsScannedMetric(scannerContext); + // Calling the hook in CP which allows it to do a fast forward return this.region.getCoprocessorHost() == null || this.region.getCoprocessorHost() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java index 1484e8072e93c54e55538865775fce84c373cde5..3290972aed6731acb3ac0eed5683f3028e12ddd0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; public class NoLimitScannerContext extends ScannerContext { public NoLimitScannerContext() { - super(false, null); + super(false, null, false); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 15bf2cb3af13f5492200d1a8bfdc7938cfc9ec69..39822c3e03542942df8580ef920c9eb8fba8f012 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Set; import java.util.TreeSet; @@ -142,9 +143,11 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResul import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -2248,11 +2251,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final LimitScope sizeScope = allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; + boolean trackMetrics = + request.hasTrackScanMetrics() && request.getTrackScanMetrics(); + // Configure with limits for this RPC. Set keep progress true since size progress // towards size limit should be kept between calls to nextRaw ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); contextBuilder.setSizeLimit(sizeScope, maxResultSize); contextBuilder.setBatchLimit(scanner.getBatch()); + contextBuilder.setTrackMetrics(trackMetrics); ScannerContext scannerContext = contextBuilder.build(); while (i < rows) { @@ -2294,6 +2301,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // We didn't get a single batch builder.setMoreResultsInRegion(false); } + + // Check to see if the client requested that we track metrics server side. If the + // client requested metrics, retrieve the metrics from the scanner context. + if (trackMetrics) { + Map metrics = scannerContext.getMetrics().getMetricsMap(); + ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); + NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); + + for (Entry entry : metrics.entrySet()) { + pairBuilder.setName(entry.getKey()); + pairBuilder.setValue(entry.getValue()); + metricBuilder.addMetrics(pairBuilder.build()); + } + + builder.setScanMetrics(metricBuilder.build()); + } } region.updateReadRequestsCount(i); region.getMetrics().updateScanNext(totalCellSize); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index feda6993379d90a98e93aa972a87e3151ccbd2f1..3a8138a832cbc2175159fe2f3d30e2aadbe2b8ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; @@ -63,13 +63,15 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { } @Override - protected boolean nextRow(byte[] currentRow, int offset, short length) - throws IOException { + protected boolean nextRow(ScannerContext scannerContext, byte[] currentRow, int offset, + short length) throws IOException { assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; byte row[] = new byte[length]; System.arraycopy(currentRow, offset, row, 0, length); this.storeHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row)); resetFilters(); + incrementCountOfRowsScannedMetric(scannerContext); + // Calling the hook in CP which allows it to do a fast forward if (this.region.getCoprocessorHost() != null) { return this.region.getCoprocessorHost().postScannerFilterRow(this, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 6e487ca7a0c94eb47c35a4992f4dfdc696bfeb39..f3e8003a9a55db0feaf8125d8ce0dc12aa0c2fa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; /** * ScannerContext instances encapsulate limit tracking AND progress towards those limits during @@ -96,7 +97,12 @@ public class ScannerContext { boolean keepProgress; private static boolean DEFAULT_KEEP_PROGRESS = false; - ScannerContext(boolean keepProgress, LimitFields limitsToCopy) { + /** + * Tracks the relevant server side metrics during scans. null when metrics should not be tracked + */ + final ServerSideScanMetrics metrics; + + ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) { this.limits = new LimitFields(); if (limitsToCopy != null) this.limits.copy(limitsToCopy); @@ -105,6 +111,21 @@ public class ScannerContext { this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; + this.metrics = trackMetrics ? new ServerSideScanMetrics() : null; + } + + boolean isTrackingMetrics() { + return this.metrics != null; + } + + /** + * Get the metrics instance. Should only be called after a call to {@link #isTrackingMetrics()} + * has been made to confirm that metrics are indeed being tracked. + * @return {@link ServerSideScanMetrics} instance that is tracking metrics for this scan + */ + ServerSideScanMetrics getMetrics() { + assert isTrackingMetrics(); + return this.metrics; } /** @@ -285,6 +306,7 @@ public class ScannerContext { public static final class Builder { boolean keepProgress = DEFAULT_KEEP_PROGRESS; + boolean trackMetrics = false; LimitFields limits = new LimitFields(); private Builder() { @@ -299,6 +321,11 @@ public class ScannerContext { return this; } + public Builder setTrackMetrics(boolean trackMetrics) { + this.trackMetrics = trackMetrics; + return this; + } + public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) { limits.setSize(sizeLimit); limits.setSizeScope(sizeScope); @@ -311,7 +338,7 @@ public class ScannerContext { } public ScannerContext build() { - return new ScannerContext(keepProgress, limits); + return new ScannerContext(keepProgress, limits, trackMetrics); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java new file mode 100644 index 0000000000000000000000000000000000000000..01af5e3201c36012c93c05d6179c9ec791976e8e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestServerSideScanMetricsFromClientSide { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static Table TABLE = null; + + /** + * Table configuration + */ + private static TableName TABLE_NAME = TableName.valueOf("testTable"); + + private static int NUM_ROWS = 10; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); + + // Should keep this value below 10 to keep generation of expected kv's simple. If above 10 then + // table/row/cf1/... will be followed by table/row/cf10/... instead of table/row/cf2/... which + // breaks the simple generation of expected kv's + private static int NUM_FAMILIES = 1; + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); + + private static int NUM_QUALIFIERS = 1; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); + + private static int VALUE_SIZE = 10; + private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + private static int NUM_COLS = NUM_FAMILIES * NUM_QUALIFIERS; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + } + + static Table createTestTable(TableName name, byte[][] rows, byte[][] families, + byte[][] qualifiers, byte[] cellValue) throws IOException { + Table ht = TEST_UTIL.createTable(name, families); + List puts = createPuts(rows, families, qualifiers, cellValue); + ht.put(puts); + + return ht; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Make puts to put the input value into each combination of row, family, and qualifier + * @param rows + * @param families + * @param qualifiers + * @param value + * @return + * @throws IOException + */ + static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] value) throws IOException { + Put put; + ArrayList puts = new ArrayList<>(); + + for (int row = 0; row < rows.length; row++) { + put = new Put(rows[row]); + for (int fam = 0; fam < families.length; fam++) { + for (int qual = 0; qual < qualifiers.length; qual++) { + KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); + put.add(kv); + } + } + puts.add(put); + } + + return puts; + } + + @Test + public void testRowsSeenMetric() throws Exception { + // Base scan configuration + Scan baseScan; + baseScan = new Scan(); + baseScan.setScanMetricsEnabled(true); + + Scan scan; + scan = new Scan(baseScan); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, NUM_ROWS); + + for (int i = 0; i < ROWS.length - 1; i++) { + scan = new Scan(baseScan); + scan.setStartRow(ROWS[0]); + scan.setStopRow(ROWS[i + 1]); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, i + 1); + } + + for (int i = ROWS.length - 1; i > 0; i--) { + scan = new Scan(baseScan); + scan.setStartRow(ROWS[i - 1]); + scan.setStopRow(ROWS[ROWS.length - 1]); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length - i); + } + + // The filter should filter out all rows, but we still expect to see every row. + Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator("xyz".getBytes())); + scan = new Scan(baseScan); + scan.setFilter(filter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length); + + } + + @Test + public void testRowsFilteredMetric() throws Exception { + // Base scan configuration + Scan baseScan; + baseScan = new Scan(); + baseScan.setScanMetricsEnabled(true); + + Scan scan; + scan = new Scan(baseScan); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY, 0); + + Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator("xyz".getBytes())); + scan = new Scan(baseScan); + scan.setFilter(filter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY, ROWS.length); + + filter = new FirstKeyOnlyFilter(); + scan = new Scan(baseScan); + scan.setFilter(filter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY, 0); + + filter = new ColumnPrefixFilter(QUALIFIERS[0]); + scan = new Scan(baseScan); + scan.setFilter(filter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY, 0); + + // Set column prefix to garbage that won't match anything. + filter = new ColumnPrefixFilter("xyz".getBytes()); + scan = new Scan(baseScan); + scan.setFilter(filter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY, ROWS.length); + } + + /** + * Run the scan to completetion and check the metric against the specified value + * @param scan + * @param metricKey + * @param expectedValue + * @throws Exception + */ + public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { + assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); + ResultScanner scanner = TABLE.getScanner(scan); + + // Iterate through all the results + for (Result r : scanner) { + } + scanner.close(); + ScanMetrics metrics = scan.getScanMetrics(); + assertTrue("Metrics are null", metrics != null); + assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); + final long actualMetricValue = metrics.getCounter(metricKey).get(); + assertEquals("Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + + actualMetricValue, expectedValue, actualMetricValue); + + } +} diff --git a/hbase-shell/src/main/ruby/hbase.rb b/hbase-shell/src/main/ruby/hbase.rb index f181edabded44334d249bd29e74d6ea3662828d3..0330e523d6fdd9621e67464d06150b29d153edc3 100644 --- a/hbase-shell/src/main/ruby/hbase.rb +++ b/hbase-shell/src/main/ruby/hbase.rb @@ -49,6 +49,8 @@ module HBaseConstants METHOD = "METHOD" MAXLENGTH = "MAXLENGTH" CACHE_BLOCKS = "CACHE_BLOCKS" + GET_ALL_METRICS = "GET_ALL_METRICS" + GET_METRICS = "GET_METRICS" REVERSED = "REVERSED" REPLICATION_SCOPE = "REPLICATION_SCOPE" INTERVAL = 'INTERVAL' diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb index 9a71fa5b18275bac4ebb906b9a7e931d49d3a6d0..99f1a0ce9347cc9305658ab468c756a0b698318c 100644 --- a/hbase-shell/src/main/ruby/hbase/table.rb +++ b/hbase-shell/src/main/ruby/hbase/table.rb @@ -407,6 +407,8 @@ EOF def _hash_to_scan(args) if args.any? + enablemetrics = args["GET_ALL_METRICS"].nil? ? false : args["GET_ALL_METRICS"] + enablemetrics = enablemetrics || !args["GET_METRICS"].nil? filter = args["FILTER"] startrow = args["STARTROW"] || '' stoprow = args["STOPROW"] @@ -453,6 +455,7 @@ EOF scan.setFilter(org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter)) end + scan.setScanMetricsEnabled(enablemetrics) if enablemetrics scan.setTimeStamp(timestamp) if timestamp scan.setCacheBlocks(cache_blocks) scan.setReversed(reversed) @@ -476,8 +479,10 @@ EOF #---------------------------------------------------------------------------------------------- # Scans whole table or a range of keys and returns rows matching specific criteria - def _scan_internal(args = {}) - raise(ArgumentError, "Arguments should be a Hash") unless args.kind_of?(Hash) + def _scan_internal(args = {}, scan = nil) + raise(ArgumentError, "Args should be a Hash") unless args.kind_of?(Hash) + raise(ArgumentError, "Scan argument should be org.apache.hadoop.hbase.client.Scan") \ + unless scan == nil || scan.kind_of?(org.apache.hadoop.hbase.client.Scan) limit = args.delete("LIMIT") || -1 maxlength = args.delete("MAXLENGTH") || -1 @@ -487,7 +492,8 @@ EOF @converters.clear() # Start the scanner - scanner = @table.getScanner(_hash_to_scan(args)) + scan = scan == nil ? _hash_to_scan(args) : scan + scanner = @table.getScanner(scan) iter = scanner.iterator # Iterate results @@ -517,6 +523,7 @@ EOF # One more row processed count += 1 end + scanner.close() return ((block_given?) ? count : res) end diff --git a/hbase-shell/src/main/ruby/shell/commands/scan.rb b/hbase-shell/src/main/ruby/shell/commands/scan.rb index c6aba9c9e0aab4a5769216e2317b65ed78d2300a..0cc32d4cbdb589a32df8554c7c5b4254faa322f1 100644 --- a/hbase-shell/src/main/ruby/shell/commands/scan.rb +++ b/hbase-shell/src/main/ruby/shell/commands/scan.rb @@ -100,12 +100,18 @@ EOF now = Time.now formatter.header(["ROW", "COLUMN+CELL"]) + scan = table._hash_to_scan(args) #actually do the scanning - count = table._scan_internal(args) do |row, cells| + count = table._scan_internal(args, scan) do |row, cells| formatter.row([ row, cells ]) end formatter.footer(now, count) + + # if scan metrics were enabled, print them after the results + if (scan != nil && scan.isScanMetricsEnabled()) + formatter.scan_metrics(scan.getScanMetrics(), args["GET_METRICS"]) + end end end end diff --git a/hbase-shell/src/main/ruby/shell/formatter.rb b/hbase-shell/src/main/ruby/shell/formatter.rb index 36aaf76aea7eea4db0659ad511052c0c3b4d0a01..37f43df5d93e4eb9bf86589775a950e36153d883 100644 --- a/hbase-shell/src/main/ruby/shell/formatter.rb +++ b/hbase-shell/src/main/ruby/shell/formatter.rb @@ -112,6 +112,35 @@ module Shell @row_count += 1 end + # Output the scan metrics. Can be filtered to output only those metrics whose keys exists + # in the metric_filter + def scan_metrics(scan_metrics = nil, metric_filter = []) + return if scan_metrics == nil + raise(ArgumentError, "Argument should be org.apache.hadoop.hbase.client.metrics.ScanMetrics") \ + unless scan_metrics.kind_of?(org.apache.hadoop.hbase.client.metrics.ScanMetrics) + # prefix output with empty line + @out.puts + # save row count to restore after printing metrics (metrics should not count towards row count) + saved_row_count = @row_count + iter = scan_metrics.getMetricsMap().entrySet().iterator() + metric_hash = Hash.new() + # put keys in hash so they can be sorted easily + while iter.hasNext + metric = iter.next + metric_hash[metric.getKey.to_s] = metric.getValue.to_s + end + # print in alphabetical order + row(["METRIC", "VALUE"], false) + metric_hash.sort.map do |key, value| + if (not metric_filter or metric_filter.length == 0 or metric_filter.include?(key)) + row([key, value]) + end + end + + @row_count = saved_row_count + return + end + def split(width, str) if width == 0 return [str] -- 1.9.3 (Apple Git-50)