diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java index f746c98..1040a6f 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java @@ -58,6 +58,13 @@ public interface MetricsTableAggregateSource extends BaseSource { MetricsTableSource getOrCreateTableSource(String table, MetricsTableWrapperAggregate wrapper); /** + * Registers MetricsTableSource for table. No op if one MetricsTableSource exists for the table + * @param table + * @param wrapper + */ + void register(String table, MetricsTableWrapperAggregate wrapper); + + /** * Remove a table's source. This is called when regions of a table are closed. * * @param table The table name diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java index 5133a96..e42a4ad 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; @@ -34,7 +36,7 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl implements MetricsTableAggregateSource { private static final Logger LOG = LoggerFactory.getLogger(MetricsTableAggregateSourceImpl.class); - private ConcurrentHashMap tableSources = new ConcurrentHashMap<>(); + private Map tableSources = new HashMap<>(); public MetricsTableAggregateSourceImpl() { this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); @@ -47,8 +49,21 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl super(metricsName, metricsDescription, metricsContext, metricsJmxContext); } - private void register(MetricsTableSource source) { - source.registerMetrics(); + private MetricsTableSource registerHelper(String table, MetricsTableWrapperAggregate wrapper) { + synchronized (tableSources) { + if (tableSources.containsKey(table)) return tableSources.get(table); + MetricsTableSource newSource = CompatibilitySingletonFactory + .getInstance(MetricsRegionServerSourceFactory.class).createTable(table, wrapper); + newSource.registerMetrics(); + tableSources.put(table, newSource); + return newSource; + } + } + + @Override + public void register(String table, MetricsTableWrapperAggregate wrapper) { + if (tableSources.containsKey(table)) return; + registerHelper(table, wrapper); } @Override @@ -72,13 +87,7 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl if (source != null) { return source; } - MetricsTableSource newSource = CompatibilitySingletonFactory - .getInstance(MetricsRegionServerSourceFactory.class).createTable(table, wrapper); - return tableSources.computeIfAbsent(table, k -> { - // register the new metrics now - newSource.registerMetrics(); - return newSource; - }); + return registerHelper(table, wrapper); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index aec94d4..920523a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -40,9 +40,11 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; @@ -59,6 +61,7 @@ import org.apache.hadoop.hbase.CacheEvictionStats; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -285,6 +288,7 @@ public class HRegionServer extends HasThread implements */ protected final Map onlineRegions = new ConcurrentHashMap<>(); + private final java.util.concurrent.ExecutorService executorForMetrics; /** * Map of encoded region names to the DataNode locations they should be hosted on * We store the value as InetSocketAddress since this is used only in HDFS @@ -606,6 +610,9 @@ public class HRegionServer extends HasThread implements this.configurationManager = new ConfigurationManager(); setupWindows(getConfiguration(), getConfigurationManager()); + executorForMetrics = Executors.newSingleThreadExecutor( + new DaemonThreadFactory("metrics registration")); + // Some unit tests don't need a cluster, so no zookeeper at all if (!conf.getBoolean("hbase.testing.nocluster", false)) { // Open connection to zookeeper and set primary watcher @@ -2765,6 +2772,13 @@ public class HRegionServer extends HasThread implements public void addRegion(HRegion region) { this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); configurationManager.registerObserver(region); + executorForMetrics.submit(new Callable() { + @Override + public Void call() { + metricsRegionServer.register(region.getRegionInfo().getTable().getNameAsString()); + return null; + } + }); } private void addRegion(SortedMap> sortedRegions, HRegion region, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index 21534ce..e67794b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -99,6 +99,10 @@ public class MetricsRegionServer { return regionServerWrapper; } + public void register(String table) { + metricsTable.register(table); + } + public void updatePutBatch(TableName tn, long t) { if (tableMetrics != null && tn != null) { tableMetrics.updatePut(tn, t); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java index a3f0dff..1d8a1a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java @@ -36,6 +36,10 @@ public class MetricsTable { return wrapper; } + void register(String table) { + tableSourceAgg.register(table, wrapper); + } + public MetricsTableAggregateSource getTableSourceAgg() { return tableSourceAgg; }