From a90c9bcd60e96d79d3e6431701be8d789cbd07ed Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 24 Feb 2017 14:08:10 +0800 Subject: [PATCH] HBASE-17584 Expose ScanMetrics with ResultScanner rather than Scan --- .../hadoop/hbase/client/AbstractClientScanner.java | 8 +++----- .../hbase/client/AsyncTableResultScanner.java | 6 ++++++ .../apache/hadoop/hbase/client/ClientScanner.java | 11 +++++----- .../apache/hadoop/hbase/client/ResultScanner.java | 6 ++++++ .../java/org/apache/hadoop/hbase/client/Scan.java | 6 +++++- .../client/metrics/ServerSideScanMetrics.java | 13 ++++++++++-- .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 24 ++++++---------------- .../hadoop/hbase/rest/client/RemoteHTable.java | 6 ++++++ .../hbase/client/ClientSideRegionScanner.java | 1 - .../hbase/mapreduce/TableRecordReaderImpl.java | 4 ++-- .../TestServerSideScanMetricsFromClientSide.java | 14 ++++++------- .../hadoop/hbase/regionserver/RegionAsTable.java | 6 ++++++ 12 files changed, 64 insertions(+), 41 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java index 87304c3..ffb2fa1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java @@ -38,13 +38,11 @@ public abstract class AbstractClientScanner implements ResultScanner { } /** - * Used internally accumulating metrics on scan. To - * enable collection of metrics on a Scanner, call {@link Scan#setScanMetricsEnabled(boolean)}. - * These metrics are cleared at key transition points. Metrics are accumulated in the - * {@link Scan} object itself. - * @see Scan#getScanMetrics() + * Used internally accumulating metrics on scan. To enable collection of metrics on a Scanner, + * call {@link Scan#setScanMetricsEnabled(boolean)}. * @return Returns the running {@link ScanMetrics} instance or null if scan metrics not enabled. */ + @Override public ScanMetrics getScanMetrics() { return scanMetrics; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index 38d4b2c..eef797c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -30,6 +30,7 @@ import java.util.Queue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; /** * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically @@ -164,4 +165,9 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { synchronized boolean isSuspended() { return resumer != null; } + + @Override + public ScanMetrics getScanMetrics() { + throw new UnsupportedOperationException(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 47270a7..e4a18a4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.util.Bytes; /** @@ -299,15 +298,17 @@ public abstract class ClientScanner extends AbstractClientScanner { * for scan/map reduce scenarios, we will have multiple scans running at the same time. By * default, scan metrics are disabled; if the application wants to collect them, this behavior can * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} - *

- * This invocation clears the scan metrics. Metrics are aggregated in the Scan instance. */ protected void writeScanMetrics() { if (this.scanMetrics == null || scanMetricsPublished) { return; } - MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics); - scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray()); + // Publish ScanMetrics to the Scan Object. + // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not + // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published + // to Scan will be messed up. + scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, + ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray()); scanMetricsPublished = true; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java index e9cb476..8951e84 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java @@ -27,6 +27,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; /** * Interface for client-side scanning. Go to {@link Table} to obtain instances. @@ -116,4 +117,9 @@ public interface ResultScanner extends Closeable, Iterable { * @return true if the lease was successfully renewed, false otherwise. */ boolean renewLease(); + + /** + * @return the scan metrics, or {@code null} if we do not enable metrics. + */ + ScanMetrics getScanMetrics(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index c4b7044..b2a248b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -1081,9 +1081,13 @@ public class Scan extends Query { /** * @return Metrics on this Scan, if metrics were enabled. * @see #setScanMetricsEnabled(boolean) + * @deprecated Use {@link ResultScanner#getScanMetrics()} instead. And notice that, please do not + * use this method and {@link ResultScanner#getScanMetrics()} together, the metrics + * will be messed up. */ + @Deprecated public ScanMetrics getScanMetrics() { - byte [] bytes = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); + byte[] bytes = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); if (bytes == null) return null; return ProtobufUtil.toScanMetrics(bytes); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java index 46b67d4..9de7f3a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java @@ -106,11 +106,20 @@ public class ServerSideScanMetrics { * @return A Map of String -> Long for metrics */ public Map getMetricsMap() { + return getMetricsMap(true); + } + + /** + * Get all of the values. If reset is true, we will reset the all AtomicLongs back to 0. + * @param reset whether to reset the AtomicLongs to 0. + * @return A Map of String -> Long for metrics + */ + public Map getMetricsMap(boolean reset) { // Create a builder ImmutableMap.Builder builder = ImmutableMap.builder(); - // For every entry add the value and reset the AtomicLong back to zero for (Map.Entry e : this.counters.entrySet()) { - builder.put(e.getKey(), e.getValue().getAndSet(0)); + long value = reset ? e.getValue().getAndSet(0) : e.getValue().get(); + builder.put(e.getKey(), value); } // Build the immutable map so that people can't mess around with it. return builder.build(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 271a0de..9b03d9b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -20,11 +20,9 @@ package org.apache.hadoop.hbase.shaded.protobuf; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.InterruptedIOException; import java.lang.reflect.Constructor; import java.lang.reflect.Method; import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -74,7 +72,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLoadStats; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.SnapshotType; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; @@ -86,21 +83,17 @@ import org.apache.hadoop.hbase.io.LimitInputStream; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.quotas.QuotaScope; import org.apache.hadoop.hbase.quotas.QuotaType; import org.apache.hadoop.hbase.quotas.ThrottleType; import org.apache.hadoop.hbase.replication.ReplicationLoadSink; import org.apache.hadoop.hbase.replication.ReplicationLoadSource; -import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.visibility.Authorizations; import org.apache.hadoop.hbase.security.visibility.CellVisibility; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Parser; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcChannel; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Service; @@ -159,7 +152,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDe import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; @@ -168,10 +160,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescript import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DynamicClassLoader; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Methods; import org.apache.hadoop.hbase.util.VersionInfo; @@ -2034,12 +2026,11 @@ public final class ProtobufUtil { } public static ScanMetrics toScanMetrics(final byte[] bytes) { - Parser parser = MapReduceProtos.ScanMetrics.PARSER; MapReduceProtos.ScanMetrics pScanMetrics = null; try { - pScanMetrics = parser.parseFrom(bytes); + pScanMetrics = MapReduceProtos.ScanMetrics.parseFrom(bytes); } catch (InvalidProtocolBufferException e) { - //Ignored there are just no key values to add. + // Ignored there are just no key values to add. } ScanMetrics scanMetrics = new ScanMetrics(); if (pScanMetrics != null) { @@ -2052,15 +2043,12 @@ public final class ProtobufUtil { return scanMetrics; } - public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics) { + public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics, boolean reset) { MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder(); - Map metrics = scanMetrics.getMetricsMap(); + Map metrics = scanMetrics.getMetricsMap(reset); for (Entry e : metrics.entrySet()) { HBaseProtos.NameInt64Pair nameInt64Pair = - HBaseProtos.NameInt64Pair.newBuilder() - .setName(e.getKey()) - .setValue(e.getValue()) - .build(); + HBaseProtos.NameInt64Pair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build(); builder.addMetrics(nameInt64Pair); } return builder.build(); diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 51a75d7..a7db14b 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; @@ -642,6 +643,11 @@ public class RemoteHTable implements Table { public boolean renewLease() { throw new RuntimeException("renewLease() not supported"); } + + @Override + public ScanMetrics getScanMetrics() { + throw new RuntimeException("getScanMetrics() not supported"); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index dde2f10..4fab6a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -52,7 +52,6 @@ public class ClientSideRegionScanner extends AbstractClientScanner { public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { - // region is immutable, set isolation level scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 6f1d140..a8ed5f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -81,7 +81,7 @@ public class TableRecordReaderImpl { */ public void restart(byte[] firstRow) throws IOException { currentScan = new Scan(scan); - currentScan.setStartRow(firstRow); + currentScan.withStartRow(firstRow); currentScan.setScanMetricsEnabled(true); if (this.scanner != null) { if (logScannerActivity) { @@ -273,7 +273,7 @@ public class TableRecordReaderImpl { * @throws IOException */ private void updateCounters() throws IOException { - ScanMetrics scanMetrics = currentScan.getScanMetrics(); + ScanMetrics scanMetrics = scanner.getScanMetrics(); if (scanMetrics == null) { return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java index b516cbb..5a140af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java @@ -192,15 +192,15 @@ public class TestServerSideScanMetricsFromClientSide { for (int i = 0; i < ROWS.length - 1; i++) { scan = new Scan(baseScan); - scan.setStartRow(ROWS[0]); - scan.setStopRow(ROWS[i + 1]); + scan.withStartRow(ROWS[0]); + scan.withStopRow(ROWS[i + 1]); testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, i + 1); } for (int i = ROWS.length - 1; i > 0; i--) { scan = new Scan(baseScan); - scan.setStartRow(ROWS[i - 1]); - scan.setStopRow(ROWS[ROWS.length - 1]); + scan.withStartRow(ROWS[i - 1]); + scan.withStopRow(ROWS[ROWS.length - 1]); testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length - i); } @@ -318,12 +318,12 @@ public class TestServerSideScanMetricsFromClientSide { public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); ResultScanner scanner = TABLE.getScanner(scan); - // Iterate through all the results - for (Result r : scanner) { + while (scanner.next() != null) { + } scanner.close(); - ScanMetrics metrics = scan.getScanMetrics(); + ScanMetrics metrics = scanner.getScanMetrics(); assertTrue("Metrics are null", metrics != null); assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); final long actualMetricValue = metrics.getCounter(metricKey).get(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java index d2e78b7..87c27a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; @@ -172,6 +173,11 @@ public class RegionAsTable implements Table { public boolean renewLease() { throw new UnsupportedOperationException(); } + + @Override + public ScanMetrics getScanMetrics() { + throw new UnsupportedOperationException(); + } }; @Override -- 2.7.4