From 2ed6f2acc0700d296e403721b4ab1f13a95e4d08 Mon Sep 17 00:00:00 2001 From: Sakthi Date: Mon, 4 Mar 2019 23:02:52 -0800 Subject: [PATCH] HBASE-21991: Fix MetaMetrics issues - [Race condition, Faulty remove logic], few improvements --- .../hbase/coprocessor/MetaTableMetrics.java | 113 +++++++++--------- .../hadoop/hbase/util/LossyCounting.java | 10 +- .../coprocessor/TestMetaTableMetrics.java | 111 +++++++++++++++++ .../hadoop/hbase/util/TestLossyCounting.java | 10 +- 4 files changed, 180 insertions(+), 64 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java index d08bae6762c030da0842ae84182bca0c0fbeb873..1f412cc41ada52b799c6f2bfc890fc274454f7b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java @@ -50,8 +50,8 @@ public class MetaTableMetrics implements RegionCoprocessor { private ExampleRegionObserverMeta observer; private Map> requestsMap; - private RegionCoprocessorEnvironment regionCoprocessorEnv; - private LossyCounting clientMetricsLossyCounting; + private MetricRegistry registry; + private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting; private boolean active = false; enum MetaTableOps { @@ -94,11 +94,11 @@ public class MetaTableMetrics implements RegionCoprocessor { if (!active || !isMetaTableOp(e)) { return; } - tableMetricRegisterAndMark(e, row); - clientMetricRegisterAndMark(e); - regionMetricRegisterAndMark(e, row); - opMetricRegisterAndMark(e, row); - opWithClientMetricRegisterAndMark(e, row); + tableMetricRegisterAndMark(row); + clientMetricRegisterAndMark(); + regionMetricRegisterAndMark(row); + opMetricRegisterAndMark(row); + opWithClientMetricRegisterAndMark(row); } private void markMeterIfPresent(String requestMeter) { @@ -106,21 +106,23 @@ public class MetaTableMetrics implements RegionCoprocessor { return; } - if (requestsMap.containsKey(requestMeter) && requestsMap.get(requestMeter).isPresent()) { - Meter metric = (Meter) requestsMap.get(requestMeter).get(); - metric.mark(); + synchronized (requestsMap) { + if (requestsMap.containsKey(requestMeter) && requestsMap.get(requestMeter).isPresent()) { + Meter metric = (Meter) requestsMap.get(requestMeter).get(); + metric.mark(); + } } } - private void registerMeterIfNotPresent(ObserverContext e, - String requestMeter) { + private void registerMeterIfNotPresent(String requestMeter) { if (requestMeter.isEmpty()) { return; } - if (!requestsMap.containsKey(requestMeter)) { - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); - registry.meter(requestMeter); - requestsMap.put(requestMeter, registry.get(requestMeter)); + synchronized (requestsMap) { + if (!requestsMap.containsKey(requestMeter)) { + registry.meter(requestMeter); + requestsMap.put(requestMeter, registry.get(requestMeter)); + } } } @@ -129,32 +131,36 @@ public class MetaTableMetrics implements RegionCoprocessor { * By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate) * e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept * also, all kept elements have frequency higher than e * N. (N is total count) - * @param e Region coprocessor environment * @param requestMeter meter to be registered * @param lossyCounting lossyCounting object for one type of meters. */ - private void registerLossyCountingMeterIfNotPresent( - ObserverContext e, - String requestMeter, LossyCounting lossyCounting) { + private void registerLossyCountingMeterIfNotPresent(String requestMeter, + LossyCounting lossyCounting) { + if (requestMeter.isEmpty()) { return; } - Set metersToBeRemoved = lossyCounting.addByOne(requestMeter); - if(!requestsMap.containsKey(requestMeter) && metersToBeRemoved.contains(requestMeter)){ - for(String meter: metersToBeRemoved) { - //cleanup requestsMap according swept data from lossy count; + synchronized (requestsMap) { + Set metersToBeRemoved = lossyCounting.addByOne(requestMeter); + + boolean isNewMeter = !requestsMap.containsKey(requestMeter); + boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter); + if (isNewMeter) { + if (requestMeterRemoved) { + // if the new metric is swept off by lossyCounting then don't add in the map + metersToBeRemoved.remove(requestMeter); + } else { + // else register the new metric and add in the map + registry.meter(requestMeter); + requestsMap.put(requestMeter, registry.get(requestMeter)); + } + } + + for (String meter : metersToBeRemoved) { + //cleanup requestsMap according to the swept data from lossy count; requestsMap.remove(meter); - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); registry.remove(meter); } - // newly added meter is swept by lossy counting cleanup. No need to put it into requestsMap. - return; - } - - if (!requestsMap.containsKey(requestMeter)) { - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); - registry.meter(requestMeter); - requestsMap.put(requestMeter, registry.get(requestMeter)); } } @@ -191,49 +197,45 @@ public class MetaTableMetrics implements RegionCoprocessor { .equals(e.getEnvironment().getRegionInfo().getTable()); } - private void clientMetricRegisterAndMark(ObserverContext e) { + private void clientMetricRegisterAndMark() { // Mark client metric String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : ""; String clientRequestMeter = clientRequestMeterName(clientIP); - registerLossyCountingMeterIfNotPresent(e, clientRequestMeter, clientMetricsLossyCounting); + registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting); markMeterIfPresent(clientRequestMeter); } - private void tableMetricRegisterAndMark(ObserverContext e, - Row op) { + private void tableMetricRegisterAndMark(Row op) { // Mark table metric String tableName = getTableNameFromOp(op); String tableRequestMeter = tableMeterName(tableName); - registerAndMarkMeterIfNotPresent(e, tableRequestMeter); + registerAndMarkMeterIfNotPresent(tableRequestMeter); } - private void regionMetricRegisterAndMark(ObserverContext e, - Row op) { + private void regionMetricRegisterAndMark(Row op) { // Mark region metric String regionId = getRegionIdFromOp(op); String regionRequestMeter = regionMeterName(regionId); - registerAndMarkMeterIfNotPresent(e, regionRequestMeter); + registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting); + markMeterIfPresent(regionRequestMeter); } - private void opMetricRegisterAndMark(ObserverContext e, - Row op) { + private void opMetricRegisterAndMark(Row op) { // Mark access type ["get", "put", "delete"] metric String opMeterName = opMeterName(op); - registerAndMarkMeterIfNotPresent(e, opMeterName); + registerAndMarkMeterIfNotPresent(opMeterName); } - private void opWithClientMetricRegisterAndMark(ObserverContext e, - Object op) { + private void opWithClientMetricRegisterAndMark(Object op) { // // Mark client + access type metric String opWithClientMeterName = opWithClientMeterName(op); - registerAndMarkMeterIfNotPresent(e, opWithClientMeterName); + registerAndMarkMeterIfNotPresent(opWithClientMeterName); } // Helper function to register and mark meter if not present - private void registerAndMarkMeterIfNotPresent(ObserverContext e, - String name) { - registerMeterIfNotPresent(e, name); + private void registerAndMarkMeterIfNotPresent(String name) { + registerMeterIfNotPresent(name); markMeterIfPresent(name); } @@ -291,12 +293,12 @@ public class MetaTableMetrics implements RegionCoprocessor { if (clientIP.isEmpty()) { return ""; } - return String.format("MetaTable_client_%s_request", clientIP); + return String.format("MetaTable_client_%s_lossy_request", clientIP); } private String regionMeterName(String regionId) { // Extract meter name containing the region ID - return String.format("MetaTable_region_%s_request", regionId); + return String.format("MetaTable_region_%s_lossy_request", regionId); } } @@ -312,9 +314,11 @@ public class MetaTableMetrics implements RegionCoprocessor { && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() .equals(TableName.META_TABLE_NAME)) { - regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; + RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; + registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); requestsMap = new ConcurrentHashMap<>(); - clientMetricsLossyCounting = new LossyCounting(); + clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics"); + regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics"); // only be active mode when this region holds meta table. active = true; } @@ -324,7 +328,6 @@ public class MetaTableMetrics implements RegionCoprocessor { public void stop(CoprocessorEnvironment env) throws IOException { // since meta region can move around, clear stale metrics when stop. if (requestsMap != null) { - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); for (String meterName : requestsMap.keySet()) { registry.remove(meterName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java index 839bb90acf4cb765ab5f62d99df60ba104dde9b7..63d939a92331dad0f68e8b3fb68ac25a03144d20 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java @@ -51,9 +51,11 @@ public class LossyCounting { private double errorRate; private Map data; private long totalDataCount; + private String name; - public LossyCounting(double errorRate) { + public LossyCounting(double errorRate, String name) { this.errorRate = errorRate; + this.name = name; if (errorRate < 0.0 || errorRate > 1.0) { throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); } @@ -64,8 +66,8 @@ public class LossyCounting { calculateCurrentTerm(); } - public LossyCounting() { - this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02)); + public LossyCounting(String name) { + this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),name); } public Set addByOne(String key) { @@ -93,7 +95,7 @@ public class LossyCounting { for(String key : dataToBeSwept) { data.remove(key); } - LOG.debug(String.format("Swept %d elements.", dataToBeSwept.size())); + LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size())); return dataToBeSwept; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java index bbbeb9e5273d6e474a41f455bf3ff98951f0cca9..9e3ef6ae2c2ef856c847c1c0f0d6d4ab64f9d6a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.JMXListener; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -73,6 +75,12 @@ public class TestMetaTableMetrics { private static Configuration conf = null; private static int connectorPort = 61120; + Connection connection; + final byte[] cf = Bytes.toBytes("info"); + final byte[] col = Bytes.toBytes("any"); + byte[] tablename; + final int nthreads = 20; + @BeforeClass public static void setupBeforeClass() throws Exception { @@ -224,4 +232,107 @@ public class TestMetaTableMetrics { assertEquals(5L, putWithClientMetricsCount); } + @Test(timeout = 30000) + public void testConcurrentAccess() throws IOException { + try{ + connection = UTIL.getConnection(); + putData("hbase:meta",3000); + getData("hbase:meta", 3000); + } catch (Exception e){ + e.printStackTrace(); + } finally { + connection.close(); + } + } + + public void putData(String tn, int nrows) throws InterruptedException { + LOG.info(String.format("Putting %d rows in %s",nrows, tn)); + tablename = Bytes.toBytes(tn); + Thread[] threads = new Thread[nthreads]; + for(int i=1;i<=nthreads;i++){ + int start = (nrows / nthreads)*(i-1)+1; + int end = (nrows / nthreads)*i; + threads[i-1] = new PutThread(start,end); + } + startThreadsAndWaitToJoin(threads); + } + + public void getData(String tn, int nrows) throws InterruptedException { + LOG.info(String.format("Getting %d rows from %s", nrows, tn)); + tablename = Bytes.toBytes(tn); + Thread[] threads = new Thread[nthreads]; + for(int i=1;i<=nthreads;i++){ + /*int start = (nrows / nthreads)*(i-1)+1; + int end = (nrows / nthreads)*i;*/ + threads[i-1] = new GetThread(1,nrows); + } + startThreadsAndWaitToJoin(threads); + } + private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException { + for(int i=1;i<=nthreads;i++){ + threads[i-1].start(); + } + for(int i=1;i<=nthreads;i++){ + threads[i-1].join(); + } + } + + class PutThread extends Thread { + int start; + int end; + + public PutThread(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public void run() { + try { + Table table = connection.getTable(TableName.valueOf(tablename)); + try { + for (int i = start; i <= end; i++) { + Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); + p.addColumn(cf, col, Bytes.toBytes("Value")); + table.put(p); + } + } catch (Throwable e) { + throw e; + } finally { + table.close(); + } + } catch (IOException exception) { + exception.printStackTrace(); + } + } + } + + class GetThread extends Thread { + int start; + int end; + + public GetThread(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public void run() { + try { + Table table = connection.getTable(TableName.valueOf(tablename)); + try { + for (int i = start; i <= end; i++) { + Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); + table.get(get); + } + } catch (Throwable e) { + throw e; + } finally { + table.close(); + } + } catch (IOException exception) { + exception.printStackTrace(); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java index 11758be7f5e1c765d130ff0ca84477b0cf93727b..38a884b8f12bf67926da66f0cde6f8ce26185f22 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java @@ -38,15 +38,15 @@ public class TestLossyCounting { @Test public void testBucketSize() { - LossyCounting lossyCounting = new LossyCounting(0.01); + LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize"); assertEquals(100L, lossyCounting.getBucketSize()); - LossyCounting lossyCounting2 = new LossyCounting(); + LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2"); assertEquals(50L, lossyCounting2.getBucketSize()); } @Test public void testAddByOne() { - LossyCounting lossyCounting = new LossyCounting(0.01); + LossyCounting lossyCounting = new LossyCounting(0.01,"testAddByOne"); for(int i = 0; i < 100; i++){ String key = "" + i; lossyCounting.addByOne(key); @@ -60,7 +60,7 @@ public class TestLossyCounting { @Test public void testSweep1() { - LossyCounting lossyCounting = new LossyCounting(0.01); + LossyCounting lossyCounting = new LossyCounting(0.01,"testSweep1"); for(int i = 0; i < 400; i++){ String key = "" + i; lossyCounting.addByOne(key); @@ -71,7 +71,7 @@ public class TestLossyCounting { @Test public void testSweep2() { - LossyCounting lossyCounting = new LossyCounting(0.1); + LossyCounting lossyCounting = new LossyCounting(0.1,"testSweep2"); for(int i = 0; i < 10; i++){ String key = "" + i; lossyCounting.addByOne(key); -- 2.17.2 (Apple Git-113)