From fedc4928dfabc288ccfeaaf553dc22eae2d90781 Mon Sep 17 00:00:00 2001 From: Sakthi Date: Tue, 19 Mar 2019 17:16:23 -0700 Subject: [PATCH] HBASE-21991: Fix MetaMetrics issues - [Race condition, Faulty remove logic], few improvements --- .../hbase/coprocessor/MetaTableMetrics.java | 136 ++++++++++-------- .../hadoop/hbase/util/LossyCounting.java | 11 +- .../coprocessor/TestMetaTableMetrics.java | 100 +++++++++++++ .../hadoop/hbase/util/TestLossyCounting.java | 10 +- 4 files changed, 189 insertions(+), 68 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java index abef5aa8f55b712ba43d8c33e0b28ffa879b997e..203857deb1f004254e19b390fefe33ab9da19ec4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java @@ -35,12 +35,14 @@ import org.apache.hadoop.hbase.util.LossyCounting; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; + /** * A coprocessor that collects metrics from meta table. *

* These metrics will be available through the regular Hadoop metrics2 sinks (ganglia, opentsdb, * etc) as well as JMX output. *

+ * * @see MetaTableMetrics */ @@ -48,8 +50,8 @@ import com.google.common.collect.ImmutableMap; public class MetaTableMetrics extends BaseRegionObserver { private Map> requestsMap; - private RegionCoprocessorEnvironment regionCoprocessorEnv; - private LossyCounting clientMetricsLossyCounting; + private MetricRegistry registry; + private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting; private boolean active = false; enum MetaTableOps { @@ -57,11 +59,8 @@ public class MetaTableMetrics extends BaseRegionObserver { } private ImmutableMap opsNameMap = - ImmutableMap.builder() - .put(Put.class, MetaTableOps.PUT) - .put(Get.class, MetaTableOps.GET) - .put(Delete.class, MetaTableOps.DELETE) - .build(); + ImmutableMap.builder().put(Put.class, MetaTableOps.PUT) + .put(Get.class, MetaTableOps.GET).put(Delete.class, MetaTableOps.DELETE).build(); @Override public void preGetOp(ObserverContext e, Get get, List results) @@ -85,11 +84,11 @@ public class MetaTableMetrics extends BaseRegionObserver { if (!active || !isMetaTableOp(e)) { return; } - tableMetricRegisterAndMark(e, row); - clientMetricRegisterAndMark(e); - regionMetricRegisterAndMark(e, row); - opMetricRegisterAndMark(e, row); - opWithClientMetricRegisterAndMark(e, row); + tableMetricRegisterAndMark(row); + clientMetricRegisterAndMark(); + regionMetricRegisterAndMark(row); + opMetricRegisterAndMark(row); + opWithClientMetricRegisterAndMark(row); } private void markMeterIfPresent(String requestMeter) { @@ -97,19 +96,18 @@ public class MetaTableMetrics extends BaseRegionObserver { return; } - if (requestsMap.containsKey(requestMeter) && requestsMap.get(requestMeter).isPresent()) { - Meter metric = (Meter) requestsMap.get(requestMeter).get(); + Optional optionalMetric = requestsMap.get(requestMeter); + if (optionalMetric != null && optionalMetric.isPresent()) { + Meter metric = (Meter) optionalMetric.get(); metric.mark(); } } - private void registerMeterIfNotPresent(ObserverContext e, - String requestMeter) { + private void registerMeterIfNotPresent(String requestMeter) { if (requestMeter.isEmpty()) { return; } if (!requestsMap.containsKey(requestMeter)) { - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); registry.meter(requestMeter); requestsMap.put(requestMeter, registry.get(requestMeter)); } @@ -119,38 +117,43 @@ public class MetaTableMetrics extends BaseRegionObserver { * Registers and counts lossyCount for Meters that kept by lossy counting. * By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate) * e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept - * also, all kept elements have frequency higher than e * N. (N is total count) - * @param e Region coprocessor environment - * @param requestMeter meter to be registered + * also, all kept elements have frequency higher than e * N. (N is total count) + * + * @param requestMeter meter to be registered * @param lossyCounting lossyCounting object for one type of meters. */ - private void registerLossyCountingMeterIfNotPresent( - ObserverContext e, - String requestMeter, LossyCounting lossyCounting) { + private void registerLossyCountingMeterIfNotPresent(String requestMeter, + LossyCounting lossyCounting) { if (requestMeter.isEmpty()) { return; } - Set metersToBeRemoved = lossyCounting.addByOne(requestMeter); - if(!requestsMap.containsKey(requestMeter) && metersToBeRemoved.contains(requestMeter)){ - for(String meter: metersToBeRemoved) { - //cleanup requestsMap according swept data from lossy count; + synchronized (lossyCounting) { + Set metersToBeRemoved = lossyCounting.addByOne(requestMeter); + + boolean isNewMeter = !requestsMap.containsKey(requestMeter); + boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter); + if (isNewMeter) { + if (requestMeterRemoved) { + // if the new metric is swept off by lossyCounting then don't add in the map + metersToBeRemoved.remove(requestMeter); + } else { + // else register the new metric and add in the map + registry.meter(requestMeter); + requestsMap.put(requestMeter, registry.get(requestMeter)); + } + } + + for (String meter : metersToBeRemoved) { + //cleanup requestsMap according to the swept data from lossy count; requestsMap.remove(meter); - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); registry.remove(meter); } - // newly added meter is swept by lossy counting cleanup. No need to put it into requestsMap. - return; - } - - if (!requestsMap.containsKey(requestMeter)) { - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); - registry.meter(requestMeter); - requestsMap.put(requestMeter, registry.get(requestMeter)); } } /** * Get table name from Ops such as: get, put, delete. + * * @param op such as get, put or delete. */ private String getTableNameFromOp(Row op) { @@ -165,7 +168,8 @@ public class MetaTableMetrics extends BaseRegionObserver { /** * Get regionId from Ops such as: get, put, delete. - * @param op such as get, put or delete. + * + * @param op such as get, put or delete. */ private String getRegionIdFromOp(Row op) { String regionId = null; @@ -181,47 +185,60 @@ public class MetaTableMetrics extends BaseRegionObserver { return TableName.META_TABLE_NAME.equals(e.getEnvironment().getRegionInfo().getTable()); } - private void clientMetricRegisterAndMark(ObserverContext e) { + private void clientMetricRegisterAndMark() { // Mark client metric String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : ""; + if (clientIP == null || clientIP.isEmpty()) { + return; + } String clientRequestMeter = clientRequestMeterName(clientIP); - registerLossyCountingMeterIfNotPresent(e, clientRequestMeter, clientMetricsLossyCounting); + registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting); markMeterIfPresent(clientRequestMeter); } - private void tableMetricRegisterAndMark(ObserverContext e, Row op) { + private void tableMetricRegisterAndMark(Row op) { // Mark table metric String tableName = getTableNameFromOp(op); + if (tableName == null || tableName.isEmpty()) { + return; + } String tableRequestMeter = tableMeterName(tableName); - registerAndMarkMeterIfNotPresent(e, tableRequestMeter); + registerAndMarkMeterIfNotPresent(tableRequestMeter); } - private void regionMetricRegisterAndMark(ObserverContext e, - Row op) { + private void regionMetricRegisterAndMark(Row op) { // Mark region metric String regionId = getRegionIdFromOp(op); + if (regionId == null || regionId.isEmpty()) { + return; + } String regionRequestMeter = regionMeterName(regionId); - registerAndMarkMeterIfNotPresent(e, regionRequestMeter); + registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting); + markMeterIfPresent(regionRequestMeter); } - private void opMetricRegisterAndMark(ObserverContext e, Row op) { + private void opMetricRegisterAndMark(Row op) { // Mark access type ["get", "put", "delete"] metric String opMeterName = opMeterName(op); - registerAndMarkMeterIfNotPresent(e, opMeterName); + if (opMeterName == null || opMeterName.isEmpty()) { + return; + } + registerAndMarkMeterIfNotPresent(opMeterName); } - private void opWithClientMetricRegisterAndMark(ObserverContext e, - Object op) { - // // Mark client + access type metric + private void opWithClientMetricRegisterAndMark(Object op) { + // Mark client + access type metric String opWithClientMeterName = opWithClientMeterName(op); - registerAndMarkMeterIfNotPresent(e, opWithClientMeterName); + if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) { + return; + } + registerAndMarkMeterIfNotPresent(opWithClientMeterName); } // Helper function to register and mark meter if not present - private void registerAndMarkMeterIfNotPresent(ObserverContext e, - String name) { - registerMeterIfNotPresent(e, name); + private void registerAndMarkMeterIfNotPresent(String name) { + registerMeterIfNotPresent(name); markMeterIfPresent(name); } @@ -278,12 +295,12 @@ public class MetaTableMetrics extends BaseRegionObserver { if (clientIP.isEmpty()) { return ""; } - return String.format("MetaTable_client_%s_request", clientIP); + return String.format("MetaTable_client_%s_lossy_request", clientIP); } private String regionMeterName(String regionId) { // Extract meter name containing the region ID - return String.format("MetaTable_region_%s_request", regionId); + return String.format("MetaTable_region_%s_lossy_request", regionId); } @Override @@ -291,10 +308,12 @@ public class MetaTableMetrics extends BaseRegionObserver { if (env instanceof RegionCoprocessorEnvironment && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() - .equals(TableName.META_TABLE_NAME)) { - regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; + .equals(TableName.META_TABLE_NAME)) { + RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; + registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); requestsMap = new ConcurrentHashMap<>(); - clientMetricsLossyCounting = new LossyCounting(); + clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics"); + regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics"); // only be active mode when this region holds meta table. active = true; } @@ -304,7 +323,6 @@ public class MetaTableMetrics extends BaseRegionObserver { public void stop(CoprocessorEnvironment env) throws IOException { // since meta region can move around, clear stale metrics when stop. if (requestsMap != null) { - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); for (String meterName : requestsMap.keySet()) { registry.remove(meterName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java index d526d881d45ab6a2f32fd6bdf766cc617a91ef1e..712c22cfd74ac04408b429612194390e5d4046cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java @@ -50,9 +50,11 @@ public class LossyCounting { private double errorRate; private Map data; private long totalDataCount; + private String name; - public LossyCounting(double errorRate) { + public LossyCounting(double errorRate, String name) { this.errorRate = errorRate; + this.name = name; if (errorRate < 0.0 || errorRate > 1.0) { throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); } @@ -63,8 +65,9 @@ public class LossyCounting { calculateCurrentTerm(); } - public LossyCounting() { - this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02)); + public LossyCounting(String name) { + this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), + name); } public Set addByOne(String key) { @@ -95,7 +98,7 @@ public class LossyCounting { for(String key : dataToBeSwept) { data.remove(key); } - LOG.debug(String.format("Swept %d elements.", dataToBeSwept.size())); + LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size())); return dataToBeSwept; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java index 23dd5174d5548194fc2319c0f50fe0728df67f3a..97568fd4d2e70b91e047390ceae324867852929d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java @@ -13,6 +13,8 @@ package org.apache.hadoop.hbase.coprocessor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; @@ -35,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.JMXListener; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; @@ -63,6 +66,11 @@ public class TestMetaTableMetrics { private static Configuration conf = null; private static int connectorPort = 61120; + final byte[] cf = Bytes.toBytes("info"); + final byte[] col = Bytes.toBytes("any"); + byte[] tablename; + final int nthreads = 20; + @BeforeClass public static void setupBeforeClass() throws Exception { @@ -220,4 +228,96 @@ public class TestMetaTableMetrics { assertEquals(5L, putWithClientMetricsCount); } + @Test(timeout = 30000) + public void testConcurrentAccess() { + try { + tablename = Bytes.toBytes("hbase:meta"); + int numRows = 3000; + int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename)); + putData(numRows); + Thread.sleep(2000); + int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename)); + assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows); + getData(numRows); + } catch (InterruptedException e) { + LOG.info("Caught InterruptedException while testConcurrentAccess: " + e.getMessage()); + fail(); + } catch (IOException e) { + LOG.info("Caught IOException while testConcurrentAccess: " + e.getMessage()); + fail(); + } + } + + public void putData(int nrows) throws InterruptedException { + LOG.info(String.format("Putting %d rows in hbase:meta", nrows)); + Thread[] threads = new Thread[nthreads]; + for (int i = 1; i <= nthreads; i++) { + threads[i - 1] = new PutThread(1, nrows); + } + startThreadsAndWaitToJoin(threads); + } + + public void getData(int nrows) throws InterruptedException { + LOG.info(String.format("Getting %d rows from hbase:meta", nrows)); + Thread[] threads = new Thread[nthreads]; + for (int i = 1; i <= nthreads; i++) { + threads[i - 1] = new GetThread(1, nrows); + } + startThreadsAndWaitToJoin(threads); + } + + private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException { + for (int i = 1; i <= nthreads; i++) { + threads[i - 1].start(); + } + for (int i = 1; i <= nthreads; i++) { + threads[i - 1].join(); + } + } + + class PutThread extends Thread { + int start; + int end; + + public PutThread(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public void run() { + try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { + for (int i = start; i <= end; i++) { + Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); + p.addColumn(cf, col, Bytes.toBytes("Value" + i)); + table.put(p); + } + } catch (IOException e) { + LOG.info("Caught IOException while PutThread operation: " + e.getMessage()); + } + } + } + + class GetThread extends Thread { + int start; + int end; + + public GetThread(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public void run() { + try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { + for (int i = start; i <= end; i++) { + Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); + table.get(get); + } + } catch (IOException e) { + LOG.info("Caught IOException while GetThread operation: " + e.getMessage()); + } + } + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java index edef0bbd0268e00b9b4c1f48eb9db82e491680e3..0d41717fd26196ea763ca37995036665d261a730 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java @@ -33,15 +33,15 @@ public class TestLossyCounting { @Test public void testBucketSize() { - LossyCounting lossyCounting = new LossyCounting(0.01); + LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize"); assertEquals(100L, lossyCounting.getBucketSize()); - LossyCounting lossyCounting2 = new LossyCounting(); + LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2"); assertEquals(50L, lossyCounting2.getBucketSize()); } @Test public void testAddByOne() { - LossyCounting lossyCounting = new LossyCounting(0.01); + LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne"); for(int i = 0; i < 100; i++){ String key = "" + i; lossyCounting.addByOne(key); @@ -55,7 +55,7 @@ public class TestLossyCounting { @Test public void testSweep1() { - LossyCounting lossyCounting = new LossyCounting(0.01); + LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1"); for(int i = 0; i < 400; i++){ String key = "" + i; lossyCounting.addByOne(key); @@ -66,7 +66,7 @@ public class TestLossyCounting { @Test public void testSweep2() { - LossyCounting lossyCounting = new LossyCounting(0.1); + LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2"); for(int i = 0; i < 10; i++){ String key = "" + i; lossyCounting.addByOne(key); -- 2.17.2 (Apple Git-113)